Skip to content

Tutorial 17: Orchestrator Pattern

This tutorial demonstrates the orchestrator pattern for coordinating multiple specialist agents.

Topics covered:

  1. Creating specialists with domain focus
  2. Building an orchestrator
  3. Routing decisions
  4. Parallel specialist execution
  5. Correlating and summarizing findings

Run with: python examples/tutorial_17_orchestrator_pattern.py

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 17: Orchestrator Pattern

This tutorial demonstrates the orchestrator pattern for coordinating
multiple specialist agents.

Topics covered:
1. Creating specialists with domain focus
2. Building an orchestrator
3. Routing decisions
4. Parallel specialist execution
5. Correlating and summarizing findings

Run with:
    python examples/tutorial_17_orchestrator_pattern.py
"""

import asyncio
import time

from config import get_model, get_model_b, print_config

from locus.agent import Agent
from locus.multiagent import (
    Orchestrator,
    RoutingDecision,
    Specialist,
    create_code_analyst,
    create_log_analyst,
    create_metrics_analyst,
    create_orchestrator,
    create_trace_analyst,
)
from locus.tools.decorator import tool


def _llm_call(
    prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
    """Helper: real model call with timing/token banner — used by every Part.
    Uses slot B (a faster model when configured) since the commentary
    calls don't need the heavy specialist model."""
    agent = Agent(model=get_model_b(max_tokens=max_tokens), system_prompt=system)
    t0 = time.perf_counter()
    res = agent.run_sync(prompt)
    dt = time.perf_counter() - t0
    print(
        f"  [model call: {dt:.2f}s · {res.metrics.prompt_tokens}{res.metrics.completion_tokens} tokens]"
    )
    return res.message.strip()


async def main():
    print("=" * 60)
    print("Tutorial 17: Orchestrator Pattern")
    print("=" * 60)
    print()
    print_config()

    model = get_model()

    # =========================================================================
    # Part 1: Pre-built Specialists
    # =========================================================================
    print("\n=== Part 1: Pre-built Specialists ===\n")

    # Locus provides pre-built specialists for common domains
    log_analyst = create_log_analyst(model=model)
    metrics_analyst = create_metrics_analyst(model=model)
    trace_analyst = create_trace_analyst(model=model)
    code_analyst = create_code_analyst(model=model)

    print("Pre-built Specialists:")
    for specialist in [log_analyst, metrics_analyst, trace_analyst, code_analyst]:
        print(f"  - {specialist.name}")
        print(f"    Type: {specialist.specialist_type}")
        print(f"    Description: {specialist.description[:60]}...")
        print()

    # Run one of them on a tiny task — proves the pre-built specialists hit OCI.
    t0 = time.perf_counter()
    p1 = await log_analyst.execute(task="In one sentence, summarise what a log analyst does.")
    dt = time.perf_counter() - t0
    print(f"  [model call: {dt:.2f}s · log_analyst.execute()]")
    if p1.output:
        print(f"  Output: {p1.output[:160]}")

    # =========================================================================
    # Part 2: Custom Specialists
    # =========================================================================
    print("\n=== Part 2: Custom Specialists ===\n")

    # Create custom tools for the specialist
    @tool(name="check_database", description="Check database health and connections")
    async def check_database() -> str:
        return "Database: 45/50 connections used, avg query time 250ms"

    @tool(name="check_cache", description="Check cache hit rates")
    async def check_cache() -> str:
        return "Cache hit rate: 85%, memory usage: 2.1GB/4GB"

    # Create a custom specialist
    database_specialist = Specialist(
        name="Database Specialist",
        specialist_type="database_analyst",
        description="Analyzes database performance, connections, and queries",
        system_prompt="""You are a database specialist. Your expertise includes:
- Analyzing query performance
- Monitoring connection pools
- Identifying slow queries
- Recommending optimizations

When analyzing, look for connection leaks, slow queries, and lock contention.""",
        tools=[check_database, check_cache],
        max_iterations=5,
        confidence_threshold=0.8,
        model=model,
    )

    print(f"Custom Specialist: {database_specialist.name}")
    print(f"  Tools: {[t.name for t in database_specialist.tools]}")
    print(
        f"AI commentary: {_llm_call('In one sentence, why is a custom Specialist with domain tools better than a generic Agent for DB diagnostics?')}"
    )

    # =========================================================================
    # Part 3: Executing a Specialist
    # =========================================================================
    print("\n=== Part 3: Executing a Specialist ===\n")

    result = await database_specialist.execute(
        task="Analyze current database performance and identify issues",
        context={"incident_id": "INC-12345", "reported_issue": "Slow API responses"},
    )

    print("Specialist Result:")
    print(f"  Success: {result.success}")
    print(f"  Confidence: {result.confidence:.0%}")
    print(f"  Duration: {result.duration_ms:.0f}ms")
    if result.output:
        print(f"  Output: {result.output[:300]}...")

    # =========================================================================
    # Part 4: Creating an Orchestrator
    # =========================================================================
    print("\n=== Part 4: Creating an Orchestrator ===\n")

    # Create orchestrator with specialists
    orchestrator = create_orchestrator(
        name="Incident Analysis Orchestrator",
        specialists=[log_analyst, metrics_analyst, database_specialist],
        model=model,
    )

    print(f"Orchestrator: {orchestrator.name}")
    print("Registered specialists:")
    for spec_id, spec in orchestrator.specialists.items():
        print(f"  - {spec.name} ({spec_id})")
    # AI commentary call dropped — Part 7's full orchestration run\n    # exercises the same Orchestrator code path live.\n\n    # =========================================================================\n    # Part 5: Orchestrator Configuration\n    # =========================================================================\n    print("\n=== Part 5: Orchestrator Configuration ===\n")\n\n    # Configure orchestrator behavior. ``max_parallel_specialists``\n    # caps the asyncio.Semaphore that bounds the parallel fan-out —\n    # the orchestrator runs every routed specialist concurrently\n    # behind this gate (per-specialist exception isolation, retry on\n    # the empty-completion blip). Drop to 1 if you want the old\n    # serialised behaviour (e.g. to debug a flaky specialist).\n    orchestrator.max_parallel_specialists = 3\n    orchestrator.correlation_threshold = 0.7  # Correlation confidence\n\n    print(f"Max parallel specialists: {orchestrator.max_parallel_specialists}")\n    print(f"Correlation threshold: {orchestrator.correlation_threshold}")\n\n    # Custom system prompt for orchestration\n    custom_orchestrator = Orchestrator(\n        name="Custom Orchestrator",\n        description="Orchestrates analysis with custom logic",\n        system_prompt="""You coordinate specialist agents for incident analysis.\n\nWhen routing:\n1. For performance issues -> metrics + database specialists\n2. For error spikes -> log + trace specialists\n3. For unknown issues -> all specialists\n\nPrioritize based on urgency indicated in the task.""",\n        model=model,\n    )\n    custom_orchestrator.register_specialists([log_analyst, metrics_analyst])\n\n    print(f"\nCustom orchestrator with {len(custom_orchestrator.specialists)} specialists")\n    # The custom_orchestrator object above is the demo itself — no\n    # need for a separate LLM commentary call.\n\n    # =========================================================================\n    # Part 6: Routing Decisions\n    # =========================================================================\n    print("\n=== Part 6: Routing Decisions ===\n")\n\n    # Routing decisions determine which specialists to invoke\n    routing = RoutingDecision(\n        decision_type="invoke",\n        specialists=["log_analyst", "metrics_analyst"],\n        reasoning="Performance issue requires log and metrics analysis",\n        context={\n            "subtasks": {\n                "log_analyst": "Search for timeout errors in the last hour",\n                "metrics_analyst": "Check CPU and memory trends",\n            }\n        },\n    )\n\n    print("Routing Decision:")\n    print(f"  Type: {routing.decision_type}")\n    print(f"  Specialists: {routing.specialists}")\n    print(f"  Reasoning: {routing.reasoning}")\n    print(f"  Subtasks: {routing.context.get('subtasks', {})}")\n    # RoutingDecision is a typed object — the field set above is the demo.\n\n    # =========================================================================\n    # Part 7: Full Orchestration\n    # =========================================================================\n    print("\n=== Part 7: Full Orchestration ===\n")\n\n    # Execute the full orchestration workflow\n    orch_result = await orchestrator.execute(\n        task="API response times have increased from 200ms to 2000ms in the last 30 minutes",\n        context={"severity": "high", "affected_services": ["api-gateway", "user-service"]},\n    )\n\n    print("Orchestration Result:")\n    print(f"  Success: {orch_result.success}")\n    print(f"  Duration: {orch_result.duration_ms:.0f}ms")\n    # max_parallel_specialists=3 means the three routed specialists\n    # ran concurrently behind an asyncio.Semaphore, not back-to-back.\n    # With per-specialist budgets averaging ~5s, parallel finishes in\n    # ~5s; serial would take ~15s.\n    print(f"  Parallel cap: max_parallel_specialists={orchestrator.max_parallel_specialists}")\n    print(f"  Decisions made: {len(orch_result.decisions)}")\n\n    for i, decision in enumerate(orch_result.decisions):\n        print(f"\n  Decision {i + 1}: {decision.decision_type}")\n        if decision.specialists:\n            print(f"    Specialists: {decision.specialists}")\n\n    print("\nSpecialist Results:")\n    for spec_id, spec_result in orch_result.specialist_results.items():\n        status = "OK" if spec_result.success else f"ERROR: {spec_result.error}"\n        print(f"  {spec_id}: {status}")\n        if spec_result.output:\n            print(f"    Output preview: {spec_result.output[:100]}...")\n\n    if orch_result.summary:\n        print("\nFinal Summary:")\n        print(f"  {orch_result.summary[:500]}...")\n\n    # =========================================================================\n    # Part 8: Adding Specialists Dynamically\n    # =========================================================================\n    print("\n=== Part 8: Dynamic Specialist Registration ===\n")\n\n    # Specialists can be added at runtime\n    network_specialist = Specialist(\n        name="Network Analyst",\n        specialist_type="network_analyst",\n        description="Analyzes network connectivity and latency",\n        system_prompt="You analyze network issues including DNS, latency, and connectivity.",\n        model=model,\n    )\n\n    orchestrator.register_specialist(network_specialist)\n    print(f"Added specialist: {network_specialist.name}")\n    print(f"Total specialists: {len(orchestrator.specialists)}")\n    # Run the just-registered specialist on a one-shot task.\n    t0 = time.perf_counter()\n    p8 = await network_specialist.execute(\n        task="In one short sentence, what would you check first if a service had intermittent timeouts?",\n    )\n    dt = time.perf_counter() - t0\n    print(f"  [model call: {dt:.2f}s · network_specialist.execute()]")\n    if p8.output:\n        print(f"  Output: {p8.output[:160]}")\n\n    # =========================================================================\n    # Part 9: Orchestrator Patterns\n    # =========================================================================\n    print("\n=== Part 9: Common Patterns ===\n")\n\n    print("Pattern 1: Parallel Analysis")\n    print("  - Invoke multiple specialists simultaneously")\n    print("  - Correlate findings")\n    print("  - Produce unified summary")\n    print()\n\n    print("Pattern 2: Sequential Refinement")\n    print("  - Start with broad analysis")\n    print("  - Route to specific specialist based on findings")\n    print("  - Iterate until confident")\n    print()\n\n    print("Pattern 3: Hierarchical Routing")\n    print("  - High-level orchestrator routes to sub-orchestrators")\n    print("  - Each sub-orchestrator manages domain specialists")\n    print()\n\n    print("Pattern 4: Consensus Analysis")\n    print("  - Multiple specialists analyze the same data")\n    print("  - Compare and validate findings")\n    print("  - Flag disagreements for human review")\n\n    # =========================================================================\n    # Part 10: Best Practices\n    # =========================================================================\n    print("\n=== Part 10: Best Practices ===\n")\n\n    print("1. Give specialists focused, non-overlapping domains")\n    print("2. Use clear naming for specialist types")\n    print("3. Provide domain-specific system prompts")\n    print("4. Set appropriate parallel limits")\n    print("5. Include correlation logic in summarization")\n    print("6. Handle specialist failures gracefully")\n    print("7. Track specialist performance metrics")\n\n    # =========================================================================\n    print("\n" + "=" * 60)\n    print("Next: Tutorial 18 - Specialist Agents")\n    print("=" * 60)\n\n\nif __name__ == "__main__":\n    asyncio.run(main())\n