Skip to content

Tutorial 11: Swarm Multi-Agent

This tutorial covers:

  • Creating self-organizing agent swarms
  • Shared context for inter-agent communication
  • Task queues and dynamic allocation
  • Capability-based agent selection

Prerequisites: Tutorial 10 (Advanced Patterns) Difficulty: Advanced

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 11: Swarm Multi-Agent

This tutorial covers:
- Creating self-organizing agent swarms
- Shared context for inter-agent communication
- Task queues and dynamic allocation
- Capability-based agent selection

Prerequisites: Tutorial 10 (Advanced Patterns)
Difficulty: Advanced
"""

import asyncio
import time

# Import shared config for model
from config import get_model, print_config

from locus.agent import Agent
from locus.multiagent.swarm import (
    SharedContext,
    Swarm,
    SwarmTask,
    create_swarm,
    create_swarm_agent,
)


def _llm_call(
    prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
    """Helper: real model call with timing/token banner — used by every Part."""
    agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
    t0 = time.perf_counter()
    res = agent.run_sync(prompt)
    dt = time.perf_counter() - t0
    print(
        f"  [model call: {dt:.2f}s · {res.metrics.prompt_tokens}{res.metrics.completion_tokens} tokens]"
    )
    return res.message.strip()


# =============================================================================
# Part 1: Creating Swarm Agents
# =============================================================================


def example_create_agents():
    """Create specialized swarm agents."""
    print("=== Part 1: Creating Swarm Agents ===\n")
    print(
        f"AI rationale: {_llm_call('In one sentence, when is a swarm of small specialised agents a better fit than one generalist agent?')}"
    )

    # Agents have names, capabilities, and system prompts
    researcher = create_swarm_agent(
        name="Researcher",
        capabilities=["research", "analyze", "investigate"],
        system_prompt="You are a research specialist. Find and analyze information.",
    )

    writer = create_swarm_agent(
        name="Writer",
        capabilities=["write", "summarize", "document"],
        system_prompt="You are a writing specialist. Create clear documentation.",
    )

    reviewer = create_swarm_agent(
        name="Reviewer",
        capabilities=["review", "validate", "check"],
        system_prompt="You are a quality reviewer. Verify accuracy and completeness.",
    )

    print("Created agents:")
    for agent in [researcher, writer, reviewer]:
        print(f"  - {agent.name}: capabilities = {agent.capabilities}")
    print()

    return researcher, writer, reviewer


# =============================================================================
# Part 2: Shared Context
# =============================================================================


async def example_shared_context():
    """Demonstrate shared context for inter-agent communication."""
    print("=== Part 2: Shared Context ===\n")
    print(
        f"AI rationale: {_llm_call('In one sentence, why does a swarm need SharedContext for messages and discoveries?')}"
    )

    context = SharedContext()

    # Agents can add findings
    await context.add_finding(
        key="api_docs",
        value="The API uses REST with JSON responses",
        agent_id="agent_1",
    )

    # Agents can post to the blackboard for others to read
    await context.post_to_blackboard(
        key="need_help",
        message="Need someone to review the authentication section",
        agent_id="agent_1",
    )

    # Agents can record task results
    await context.record_task_result(
        task_id="task_001",
        result="Completed analysis of the codebase structure",
    )

    print("Current context:")
    print(context.get_summary())
    print()


# =============================================================================
# Part 3: Task Queue
# =============================================================================


def example_task_queue():
    """Demonstrate the task queue system."""
    print("=== Part 3: Task Queue ===\n")
    print(
        f"AI rationale: {_llm_call('In one sentence, why is task-queue routing useful for a heterogeneous swarm?')}"
    )

    swarm = Swarm(name="Research Team")

    # Add tasks with different priorities
    task1 = swarm.add_task("Research the API documentation", priority=5)
    task2 = swarm.add_task("Write a summary report", priority=3)
    task3 = swarm.add_task("Review the findings for accuracy", priority=2)
    task4 = swarm.add_task("Investigate security concerns", priority=10)  # Highest

    print("Task queue (sorted by priority):")
    for task in swarm.task_queue:
        print(f"  [{task.priority}] {task.description} (status: {task.status})")
    print()

    return swarm


# =============================================================================
# Part 4: Capability-Based Assignment
# =============================================================================


def example_capability_matching():
    """Show how agents are matched to tasks based on capabilities."""
    print("=== Part 4: Capability-Based Assignment ===\n")
    print(
        f"AI rationale: {_llm_call('In one sentence, why is capability-based agent selection better than random round-robin?')}"
    )

    researcher = create_swarm_agent(
        name="Researcher",
        capabilities=["research", "analyze"],
    )

    writer = create_swarm_agent(
        name="Writer",
        capabilities=["write", "document"],
    )

    # ``SwarmTask`` carries a typed capability registry alongside the
    # description: ``required_tags`` are the tags an agent MUST advertise
    # to claim the task (set-membership), and ``preferred_tags`` boost
    # the priority score without being a hard requirement. Tasks that
    # don't declare tags fall through to the legacy substring match
    # against the description, so pre-tag swarms keep working.
    tasks = [
        SwarmTask(
            description="Research the competitor landscape",
            required_tags=["research"],
            preferred_tags=["analyze"],
        ),
        SwarmTask(
            description="Write documentation for the API",
            required_tags=["write", "document"],
        ),
        SwarmTask(description="Analyze the performance data", required_tags=["analyze"]),
        SwarmTask(description="Create a summary document"),  # tagless → substring fallback
    ]

    print("Task-Agent matching:")
    for task in tasks:
        print(f"\n  Task: {task.description}")
        print(f"    required_tags={task.required_tags} preferred_tags={task.preferred_tags}")
        print(f"    Researcher can handle: {researcher.can_handle(task)}")
        print(f"    Writer can handle: {writer.can_handle(task)}")
        print(f"    Researcher priority: {researcher.priority_for_task(task):.2f}")
        print(f"    Writer priority: {writer.priority_for_task(task):.2f}")
    print()


# =============================================================================
# Part 5: Simple Swarm Execution
# =============================================================================


async def example_simple_swarm():
    """Execute a simple swarm — and verify the provider is reachable."""
    print("=== Part 5: Simple Swarm Execution ===\n")
    rationale_prompt = (
        "In one sentence, what does 'simple swarm execution' mean and when is it enough?"
    )
    print(f"AI rationale: {_llm_call(rationale_prompt)}")

    # Create a swarm with mock execution
    swarm = Swarm(name="Demo Swarm")

    # Create agents
    agent1 = create_swarm_agent(
        name="Analyst",
        capabilities=["analyze"],
        system_prompt="You analyze data.",
    )

    agent2 = create_swarm_agent(
        name="Reporter",
        capabilities=["report"],
        system_prompt="You create reports.",
    )

    swarm.add_agent(agent1)
    swarm.add_agent(agent2)

    # Add tasks
    swarm.add_task("Analyze the sales data", priority=5)
    swarm.add_task("Report on the findings", priority=3)

    print(f"Swarm '{swarm.name}' configured:")
    print(f"  Agents: {[a.name for a in swarm.agents]}")
    print(f"  Tasks: {len(swarm.task_queue)}")
    print()

    # Note: Without a model, agents can't actually work
    # This demonstrates the structure
    print("Note: Full execution requires a configured model.")
    print("See Part 6 for execution with a model.")
    print()


# =============================================================================
# Part 6: Full Swarm with Model
# =============================================================================


async def example_full_swarm():
    """Execute a swarm with a real model."""
    print("=== Part 6: Full Swarm with Model ===\n")

    # The swarm asks each agent for a structured
    # `### Findings / ### Analysis / ### Blackboard` response, so we need
    # enough completion budget to leave room for substantive content.
    model = get_model(max_tokens=2000)

    # Create swarm with model
    swarm = create_swarm(
        name="Analysis Team",
        agents=[
            create_swarm_agent(
                name="Researcher",
                capabilities=["research", "investigate", "find"],
                system_prompt="You are a research specialist. Find relevant information.",
            ),
            create_swarm_agent(
                name="Analyst",
                capabilities=["analyze", "evaluate", "assess"],
                system_prompt="You analyze and evaluate findings critically.",
            ),
            create_swarm_agent(
                name="Writer",
                capabilities=["write", "summarize", "document"],
                system_prompt="You write clear, concise summaries.",
            ),
        ],
        model=model,
    )

    # Execute on a task
    print("Executing swarm on: 'Analyze the benefits of async programming'")
    print("This may take a moment...\n")

    result = await swarm.execute(
        initial_task=(
            "Research, analyze, and write a brief summary of the benefits "
            "of async programming in Python."
        ),
        decompose_tasks=True,  # Let the swarm break this into capability-matched subtasks
    )

    print("Swarm completed!")
    print(f"  Success: {result.success}")
    print(f"  Completed tasks: {len(result.completed_tasks)}")
    print(f"  Failed tasks: {len(result.failed_tasks)}")
    print(f"  Duration: {result.duration_ms:.0f}ms")

    if result.completed_tasks:
        print("\nCompleted subtasks:")
        for t in result.completed_tasks[:5]:
            assigned = t.claimed_by or "unassigned"
            print(f"  - [{assigned}] {t.description[:80]}")
            preview = (t.result or "<empty>").strip().splitlines()[0:6]
            for line in preview:
                print(f"      {line[:120]}")
            if not t.result:
                print("      (no .result text — model returned empty)")
    if result.failed_tasks:
        print("\nFailed subtasks:")
        for t in result.failed_tasks[:5]:
            print(f"  - {t.description[:80]} (reason: {t.error or 'no agent matched'})")
    if result.summary:
        print(f"\nSummary:\n{result.summary[:500]}...")
    print()


# =============================================================================
# Part 7: Swarm Patterns
# =============================================================================


def example_swarm_patterns():
    """Common swarm patterns and configurations."""
    print("=== Part 7: Swarm Patterns ===\n")
    print(
        f"AI rationale: {_llm_call('In one sentence, when is a Specialist Team swarm preferable to a Pipeline swarm?')}"
    )

    print("Pattern 1: Specialist Team")
    print("-" * 40)
    specialist_team = create_swarm(
        name="Specialist Team",
        agents=[
            create_swarm_agent("Frontend Dev", ["frontend", "UI", "React"]),
            create_swarm_agent("Backend Dev", ["backend", "API", "database"]),
            create_swarm_agent("DevOps", ["deploy", "infrastructure", "CI/CD"]),
        ],
    )
    print("  Agents with distinct, non-overlapping capabilities")
    print("  Each task goes to the most qualified agent")
    print()

    print("Pattern 2: Redundant Team")
    print("-" * 40)
    redundant_team = create_swarm(
        name="Redundant Team",
        agents=[
            create_swarm_agent("Analyst A", ["analyze", "research"]),
            create_swarm_agent("Analyst B", ["analyze", "research"]),
            create_swarm_agent("Analyst C", ["analyze", "research"]),
        ],
    )
    print("  Agents with overlapping capabilities")
    print("  Tasks distributed for parallel processing")
    print()

    print("Pattern 3: Pipeline Team")
    print("-" * 40)
    pipeline_team = create_swarm(
        name="Pipeline Team",
        agents=[
            create_swarm_agent("Gatherer", ["gather", "collect", "fetch"]),
            create_swarm_agent("Processor", ["process", "transform", "clean"]),
            create_swarm_agent("Presenter", ["present", "format", "display"]),
        ],
    )
    print("  Agents form a processing pipeline")
    print("  Tasks chain from one agent to the next")
    print()


# =============================================================================
# Main
# =============================================================================


async def main():
    """Run all tutorial parts."""
    print("=" * 60)
    print("Tutorial 11: Swarm Multi-Agent")
    print("=" * 60)
    print()

    print_config()
    print()

    example_create_agents()
    await example_shared_context()
    example_task_queue()
    example_capability_matching()
    await example_simple_swarm()
    await example_full_swarm()
    example_swarm_patterns()

    print("=" * 60)
    print("Next: Tutorial 12 - MCP Integration")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())