Swarm Multi-Agent¶
A swarm is a pool of agents pulling tasks from a shared queue. No supervisor decides who does what — each task finds the worker whose declared capabilities fit best, and the swarm runs them in parallel where it can.
This notebook covers:
create_swarm_agent— agents advertise free-form capability tags.SwarmTask—required_tagsare a hard filter;preferred_tagsboost score. Tagless tasks fall back to substring matching against the description.SharedContext— the blackboard agents use to leave findings, messages, and recorded results for each other.Swarm.execute(initial_task, decompose_tasks=True)— break a high-level brief into capability-matched subtasks and run them.- Three common shapes: specialist team, redundant team, pipeline.
Prerequisites¶
- Notebook 08 (Agent basics).
- Notebook 27 (Orchestrator pattern) if you want the supervised counterpoint to a swarm.
Run¶
The default provider is OCI Generative AI. With ~/.oci/config
present the swarm talks to a live OCI model; canonical picks are
openai.gpt-4.1 or meta.llama-3.3-70b-instruct. Set
LOCUS_MODEL_PROVIDER=mock for offline runs.
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 25: swarm — capability-matched workers claiming from a shared queue.
A swarm is a pool of agents that pull tasks from a shared queue based on
declared capabilities. No supervisor decides who does what — each task
finds the worker whose tags fit best, and the swarm runs them in parallel
where it can.
- Each agent advertises ``capabilities`` (free-form string tags).
- Tasks carry ``required_tags`` (hard filter) and ``preferred_tags``
(priority boost). Tagless tasks fall back to substring matching against
the description so older swarms keep working.
- A ``SharedContext`` (findings, blackboard messages, task results) lets
agents leave notes for each other without a central coordinator.
- ``Swarm.execute`` decomposes the initial task into capability-matched
subtasks and runs them.
Run it:
.venv/bin/python examples/notebook_30_swarm_multiagent.py
The default provider is OCI Generative AI. With ``~/.oci/config`` present
the swarm talks to a live OCI model (canonical pick:
``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``). Set
``LOCUS_MODEL_PROVIDER=mock`` for offline runs.
Prerequisites:
- Notebook 08 (Agent basics).
- Notebook 27 (Orchestrator) if you want the supervised counterpoint.
"""
import asyncio
import time
from config import get_model, print_config
from locus.agent import Agent
from locus.multiagent.swarm import (
SharedContext,
Swarm,
SwarmTask,
create_swarm,
create_swarm_agent,
)
def _llm_call(
prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
"""One model call with a timing/token banner so each Part shows its work."""
agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
t0 = time.perf_counter()
res = agent.run_sync(prompt)
dt = time.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {res.metrics.prompt_tokens}→{res.metrics.completion_tokens} tokens]"
)
return res.message.strip()
# =============================================================================
# Part 1: declare the workers
# =============================================================================
def example_create_agents():
"""Create three specialist swarm agents."""
print("=== Part 1: Creating Swarm Agents ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is a swarm of small specialised agents a better fit than one generalist agent?')}"
)
researcher = create_swarm_agent(
name="Researcher",
capabilities=["research", "analyze", "investigate"],
system_prompt="You are a research specialist. Find and analyze information.",
)
writer = create_swarm_agent(
name="Writer",
capabilities=["write", "summarize", "document"],
system_prompt="You are a writing specialist. Create clear documentation.",
)
reviewer = create_swarm_agent(
name="Reviewer",
capabilities=["review", "validate", "check"],
system_prompt="You are a quality reviewer. Verify accuracy and completeness.",
)
print("Created agents:")
for agent in [researcher, writer, reviewer]:
print(f" - {agent.name}: capabilities = {agent.capabilities}")
print()
return researcher, writer, reviewer
# =============================================================================
# Part 2: the blackboard
# =============================================================================
async def example_shared_context():
"""SharedContext — the blackboard agents use to leave notes for each other."""
print("=== Part 2: Shared Context ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why does a swarm need SharedContext for messages and discoveries?')}"
)
context = SharedContext()
await context.add_finding(
key="api_docs",
value="The API uses REST with JSON responses",
agent_id="agent_1",
)
await context.post_to_blackboard(
key="need_help",
message="Need someone to review the authentication section",
agent_id="agent_1",
)
await context.record_task_result(
task_id="task_001",
result="Completed analysis of the codebase structure",
)
print("Current context:")
print(context.get_summary())
print()
# =============================================================================
# Part 3: priority queue
# =============================================================================
def example_task_queue():
"""Tasks are pulled in priority order — highest first."""
print("=== Part 3: Task Queue ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why is task-queue routing useful for a heterogeneous swarm?')}"
)
swarm = Swarm(name="Research Team")
task1 = swarm.add_task("Research the API documentation", priority=5)
task2 = swarm.add_task("Write a summary report", priority=3)
task3 = swarm.add_task("Review the findings for accuracy", priority=2)
task4 = swarm.add_task("Investigate security concerns", priority=10)
print("Task queue (sorted by priority):")
for task in swarm.task_queue:
print(f" [{task.priority}] {task.description} (status: {task.status})")
print()
return swarm
# =============================================================================
# Part 4: capability matching
# =============================================================================
def example_capability_matching():
"""How tasks are scored against agent capabilities."""
print("=== Part 4: Capability-Based Assignment ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why is capability-based agent selection better than random round-robin?')}"
)
researcher = create_swarm_agent(
name="Researcher",
capabilities=["research", "analyze"],
)
writer = create_swarm_agent(
name="Writer",
capabilities=["write", "document"],
)
# required_tags are a hard filter (set membership). preferred_tags
# only boost the score. Tasks without any tags fall back to substring
# matching against the description — so older tagless swarms still
# route correctly.
tasks = [
SwarmTask(
description="Research the competitor landscape",
required_tags=["research"],
preferred_tags=["analyze"],
),
SwarmTask(
description="Write documentation for the API",
required_tags=["write", "document"],
),
SwarmTask(description="Analyze the performance data", required_tags=["analyze"]),
SwarmTask(description="Create a summary document"),
]
print("Task-Agent matching:")
for task in tasks:
print(f"\n Task: {task.description}")
print(f" required_tags={task.required_tags} preferred_tags={task.preferred_tags}")
print(f" Researcher can handle: {researcher.can_handle(task)}")
print(f" Writer can handle: {writer.can_handle(task)}")
print(f" Researcher priority: {researcher.priority_for_task(task):.2f}")
print(f" Writer priority: {writer.priority_for_task(task):.2f}")
print()
# =============================================================================
# Part 5: a swarm without a model — just the shape
# =============================================================================
async def example_simple_swarm():
"""Stand up a swarm without a model — useful when you only want the shape."""
print("=== Part 5: Simple Swarm Execution ===\n")
rationale_prompt = (
"In one sentence, what does 'simple swarm execution' mean and when is it enough?"
)
print(f"AI rationale: {_llm_call(rationale_prompt)}")
swarm = Swarm(name="Demo Swarm")
agent1 = create_swarm_agent(
name="Analyst",
capabilities=["analyze"],
system_prompt="You analyze data.",
)
agent2 = create_swarm_agent(
name="Reporter",
capabilities=["report"],
system_prompt="You create reports.",
)
swarm.add_agent(agent1)
swarm.add_agent(agent2)
swarm.add_task("Analyze the sales data", priority=5)
swarm.add_task("Report on the findings", priority=3)
print(f"Swarm '{swarm.name}' configured:")
print(f" Agents: {[a.name for a in swarm.agents]}")
print(f" Tasks: {len(swarm.task_queue)}")
print()
print("Note: full execution requires a configured model. See Part 6.")
print()
# =============================================================================
# Part 6: live run against a real model
# =============================================================================
async def example_full_swarm():
"""Run the swarm end-to-end against a real model."""
print("=== Part 6: Full Swarm with Model ===\n")
# Each agent emits a structured `### Findings / ### Analysis /
# ### Blackboard` block — leave enough completion budget for it.
model = get_model(max_tokens=2000)
swarm = create_swarm(
name="Analysis Team",
agents=[
create_swarm_agent(
name="Researcher",
capabilities=["research", "investigate", "find"],
system_prompt="You are a research specialist. Find relevant information.",
),
create_swarm_agent(
name="Analyst",
capabilities=["analyze", "evaluate", "assess"],
system_prompt="You analyze and evaluate findings critically.",
),
create_swarm_agent(
name="Writer",
capabilities=["write", "summarize", "document"],
system_prompt="You write clear, concise summaries.",
),
],
model=model,
)
print("Executing swarm on: 'Analyze the benefits of async programming'")
print("This may take a moment...\n")
result = await swarm.execute(
initial_task=(
"Research, analyze, and write a brief summary of the benefits "
"of async programming in Python."
),
decompose_tasks=True,
)
print("Swarm completed!")
print(f" Success: {result.success}")
print(f" Completed tasks: {len(result.completed_tasks)}")
print(f" Failed tasks: {len(result.failed_tasks)}")
print(f" Duration: {result.duration_ms:.0f}ms")
if result.completed_tasks:
print("\nCompleted subtasks:")
for t in result.completed_tasks[:5]:
assigned = t.claimed_by or "unassigned"
print(f" - [{assigned}] {t.description[:80]}")
preview = (t.result or "<empty>").strip().splitlines()[0:6]
for line in preview:
print(f" {line[:120]}")
if not t.result:
print(" (no .result text — model returned empty)")
if result.failed_tasks:
print("\nFailed subtasks:")
for t in result.failed_tasks[:5]:
print(f" - {t.description[:80]} (reason: {t.error or 'no agent matched'})")
if result.summary:
print(f"\nSummary:\n{result.summary[:500]}...")
print()
# =============================================================================
# Part 7: three common swarm shapes
# =============================================================================
def example_swarm_patterns():
"""Three common swarm shapes: specialist team, redundant team, pipeline."""
print("=== Part 7: Swarm Patterns ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is a Specialist Team swarm preferable to a Pipeline swarm?')}"
)
print("Pattern 1: Specialist Team")
print("-" * 40)
specialist_team = create_swarm(
name="Specialist Team",
agents=[
create_swarm_agent("Frontend Dev", ["frontend", "UI", "React"]),
create_swarm_agent("Backend Dev", ["backend", "API", "database"]),
create_swarm_agent("DevOps", ["deploy", "infrastructure", "CI/CD"]),
],
)
print(" Distinct, non-overlapping capabilities; each task goes to its expert.")
print()
print("Pattern 2: Redundant Team")
print("-" * 40)
redundant_team = create_swarm(
name="Redundant Team",
agents=[
create_swarm_agent("Analyst A", ["analyze", "research"]),
create_swarm_agent("Analyst B", ["analyze", "research"]),
create_swarm_agent("Analyst C", ["analyze", "research"]),
],
)
print(" Overlapping capabilities; tasks fan out for parallel processing.")
print()
print("Pattern 3: Pipeline Team")
print("-" * 40)
pipeline_team = create_swarm(
name="Pipeline Team",
agents=[
create_swarm_agent("Gatherer", ["gather", "collect", "fetch"]),
create_swarm_agent("Processor", ["process", "transform", "clean"]),
create_swarm_agent("Presenter", ["present", "format", "display"]),
],
)
print(" Agents form a processing chain; output of one feeds the next.")
print()
# =============================================================================
# Main
# =============================================================================
async def main():
print("=" * 60)
print("Notebook 25: swarm — capability-matched workers, shared queue")
print("=" * 60)
print()
print_config()
print()
example_create_agents()
await example_shared_context()
example_task_queue()
example_capability_matching()
await example_simple_swarm()
await example_full_swarm()
example_swarm_patterns()
print("=" * 60)
print("Next: Notebook 26 — Agent Handoff")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())