Orchestrator Pattern¶
An orchestrator routes a task to a chosen set of specialist agents, runs them in parallel under a semaphore, then correlates their outputs into a single summary. Compared with a swarm (Notebook 25), the decision of who does what is centralised here instead of emerging from capability tags.
This notebook covers:
Specialist— domain-focused agent with tools, system prompt, and a confidence threshold. Locus ships pre-built ones for logs, metrics, traces, and code.Orchestrator— registers specialists, emitsRoutingDecisionobjects, and runs the chosen ones concurrently behindmax_parallel_specialists(anasyncio.Semaphore).RoutingDecision— the typed object the planner returns: which specialists, which sub-task per specialist, and the reasoning.OrchestrationResult— each specialist's output, the decisions trail, and a correlated summary.
Prerequisites¶
- Notebook 08 (Agent basics).
- Notebook 25 (Swarm) for the unsupervised counterpoint.
Run¶
The default provider is OCI Generative AI. With ~/.oci/config
present the agents talk to a live OCI model; canonical picks are
openai.gpt-4.1 or meta.llama-3.3-70b-instruct. Set
LOCUS_MODEL_PROVIDER=mock for offline runs.
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 27: orchestrator — one supervisor, many specialists, parallel fan-out.
An orchestrator routes a task to a chosen set of specialist agents, runs
them in parallel under a semaphore, then correlates their outputs into a
single summary. Compared with a swarm (Notebook 25), the decision of who
does what is centralised here instead of emerging from capability tags.
- ``Specialist`` is a domain-focused agent with tools, a system prompt,
and a confidence threshold. Locus ships pre-built ones for logs,
metrics, traces, and code.
- ``Orchestrator`` registers specialists, emits ``RoutingDecision``
objects, and runs the chosen specialists concurrently behind
``max_parallel_specialists`` (an asyncio.Semaphore).
- ``RoutingDecision`` is the typed object the orchestrator's planner
returns — which specialists, which sub-task per specialist, and the
reasoning.
- The final ``OrchestrationResult`` carries each specialist's output,
the decisions trail, and a correlated summary.
Run it:
.venv/bin/python examples/notebook_32_orchestrator_pattern.py
The default provider is OCI Generative AI. With ``~/.oci/config``
present the agents talk to a live OCI model (canonical pick:
``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``). Set
``LOCUS_MODEL_PROVIDER=mock`` for offline runs.
This notebook uses Locus's "Model B" slot (``LOCUS_MODEL_ID_B``) for the
short commentary calls — set a cheaper model there to cut runtime; falls
back to Model A when unset.
Prerequisites:
- Notebook 08 (Agent basics).
- Notebook 25 (Swarm) for the unsupervised counterpoint.
"""
import asyncio
import time
from config import get_model, get_model_b, print_config
from locus.agent import Agent
from locus.multiagent import (
Orchestrator,
RoutingDecision,
Specialist,
create_code_analyst,
create_log_analyst,
create_metrics_analyst,
create_orchestrator,
create_trace_analyst,
)
from locus.tools.decorator import tool
def _llm_call(
prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
"""One model call with a timing/token banner — used for commentary."""
# Slot B (a cheaper/faster model) is enough for short commentary;
# falls back to slot A when LOCUS_MODEL_ID_B is unset.
agent = Agent(model=get_model_b(max_tokens=max_tokens), system_prompt=system)
t0 = time.perf_counter()
res = agent.run_sync(prompt)
dt = time.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {res.metrics.prompt_tokens}→{res.metrics.completion_tokens} tokens]"
)
return res.message.strip()
async def main():
print("=" * 60)
print("Notebook 27: orchestrator — supervisor + specialists + parallel fan-out")
print("=" * 60)
print()
print_config()
model = get_model()
# =========================================================================
# Part 1: pre-built specialists
# =========================================================================
print("\n=== Part 1: Pre-built Specialists ===\n")
log_analyst = create_log_analyst(model=model)
metrics_analyst = create_metrics_analyst(model=model)
trace_analyst = create_trace_analyst(model=model)
code_analyst = create_code_analyst(model=model)
print("Pre-built Specialists:")
for specialist in [log_analyst, metrics_analyst, trace_analyst, code_analyst]:
print(f" - {specialist.name}")
print(f" Type: {specialist.specialist_type}")
print(f" Description: {specialist.description[:60]}...")
print()
t0 = time.perf_counter()
p1 = await log_analyst.execute(task="In one sentence, summarise what a log analyst does.")
dt = time.perf_counter() - t0
print(f" [model call: {dt:.2f}s · log_analyst.execute()]")
if p1.output:
print(f" Output: {p1.output[:160]}")
# =========================================================================
# Part 2: build a custom specialist with its own tools
# =========================================================================
print("\n=== Part 2: Custom Specialists ===\n")
@tool(name="check_database", description="Check database health and connections")
async def check_database() -> str:
return "Database: 45/50 connections used, avg query time 250ms"
@tool(name="check_cache", description="Check cache hit rates")
async def check_cache() -> str:
return "Cache hit rate: 85%, memory usage: 2.1GB/4GB"
database_specialist = Specialist(
name="Database Specialist",
specialist_type="database_analyst",
description="Analyzes database performance, connections, and queries",
system_prompt="""You are a database specialist. Your expertise includes:
- Analyzing query performance
- Monitoring connection pools
- Identifying slow queries
- Recommending optimizations
When analyzing, look for connection leaks, slow queries, and lock contention.""",
tools=[check_database, check_cache],
max_iterations=5,
confidence_threshold=0.8,
model=model,
)
print(f"Custom Specialist: {database_specialist.name}")
print(f" Tools: {[t.name for t in database_specialist.tools]}")
print(
f"AI commentary: {_llm_call('In one sentence, why is a custom Specialist with domain tools better than a generic Agent for DB diagnostics?')}"
)
# =========================================================================
# Part 3: run one specialist on its own
# =========================================================================
print("\n=== Part 3: Executing a Specialist ===\n")
result = await database_specialist.execute(
task="Analyze current database performance and identify issues",
context={"incident_id": "INC-12345", "reported_issue": "Slow API responses"},
)
print("Specialist Result:")
print(f" Success: {result.success}")
print(f" Confidence: {result.confidence:.0%}")
print(f" Duration: {result.duration_ms:.0f}ms")
if result.output:
print(f" Output: {result.output[:300]}...")
# =========================================================================
# Part 4: wire the orchestrator
# =========================================================================
print("\n=== Part 4: Creating an Orchestrator ===\n")
orchestrator = create_orchestrator(
name="Incident Analysis Orchestrator",
specialists=[log_analyst, metrics_analyst, database_specialist],
model=model,
)
print(f"Orchestrator: {orchestrator.name}")
print("Registered specialists:")
for spec_id, spec in orchestrator.specialists.items():
print(f" - {spec.name} ({spec_id})")
# =========================================================================
# Part 5: orchestrator tuning knobs
# =========================================================================
print("\n=== Part 5: Orchestrator Configuration ===\n")
# max_parallel_specialists caps the asyncio.Semaphore that bounds
# fan-out. Drop to 1 to serialise (useful when debugging a flaky
# specialist).
orchestrator.max_parallel_specialists = 3
orchestrator.correlation_threshold = 0.7
print(f"Max parallel specialists: {orchestrator.max_parallel_specialists}")
print(f"Correlation threshold: {orchestrator.correlation_threshold}")
custom_orchestrator = Orchestrator(
name="Custom Orchestrator",
description="Orchestrates analysis with custom logic",
system_prompt="""You coordinate specialist agents for incident analysis.
When routing:
1. For performance issues -> metrics + database specialists
2. For error spikes -> log + trace specialists
3. For unknown issues -> all specialists
Prioritize based on urgency indicated in the task.""",
model=model,
)
custom_orchestrator.register_specialists([log_analyst, metrics_analyst])
print(f"\nCustom orchestrator with {len(custom_orchestrator.specialists)} specialists")
# =========================================================================
# Part 6: anatomy of a RoutingDecision
# =========================================================================
print("\n=== Part 6: Routing Decisions ===\n")
routing = RoutingDecision(
decision_type="invoke",
specialists=["log_analyst", "metrics_analyst"],
reasoning="Performance issue requires log and metrics analysis",
context={
"subtasks": {
"log_analyst": "Search for timeout errors in the last hour",
"metrics_analyst": "Check CPU and memory trends",
}
},
)
print("Routing Decision:")
print(f" Type: {routing.decision_type}")
print(f" Specialists: {routing.specialists}")
print(f" Reasoning: {routing.reasoning}")
print(f" Subtasks: {routing.context.get('subtasks', {})}")
# =========================================================================
# Part 7: end-to-end orchestration
# =========================================================================
print("\n=== Part 7: Full Orchestration ===\n")
orch_result = await orchestrator.execute(
task="API response times have increased from 200ms to 2000ms in the last 30 minutes",
context={"severity": "high", "affected_services": ["api-gateway", "user-service"]},
)
print("Orchestration Result:")
print(f" Success: {orch_result.success}")
print(f" Duration: {orch_result.duration_ms:.0f}ms")
# The three specialists ran concurrently behind the semaphore — not
# serially. Per-specialist budgets average ~5s, so parallel ~= 5s
# vs serial ~= 15s.
print(f" Parallel cap: max_parallel_specialists={orchestrator.max_parallel_specialists}")
print(f" Decisions made: {len(orch_result.decisions)}")
for i, decision in enumerate(orch_result.decisions):
print(f"\n Decision {i + 1}: {decision.decision_type}")
if decision.specialists:
print(f" Specialists: {decision.specialists}")
print("\nSpecialist Results:")
for spec_id, spec_result in orch_result.specialist_results.items():
status = "OK" if spec_result.success else f"ERROR: {spec_result.error}"
print(f" {spec_id}: {status}")
if spec_result.output:
print(f" Output preview: {spec_result.output[:100]}...")
if orch_result.summary:
print("\nFinal Summary:")
print(f" {orch_result.summary[:500]}...")
# =========================================================================
# Part 8: register a specialist at runtime
# =========================================================================
print("\n=== Part 8: Dynamic Specialist Registration ===\n")
network_specialist = Specialist(
name="Network Analyst",
specialist_type="network_analyst",
description="Analyzes network connectivity and latency",
system_prompt="You analyze network issues including DNS, latency, and connectivity.",
model=model,
)
orchestrator.register_specialist(network_specialist)
print(f"Added specialist: {network_specialist.name}")
print(f"Total specialists: {len(orchestrator.specialists)}")
t0 = time.perf_counter()
p8 = await network_specialist.execute(
task="In one short sentence, what would you check first if a service had intermittent timeouts?",
)
dt = time.perf_counter() - t0
print(f" [model call: {dt:.2f}s · network_specialist.execute()]")
if p8.output:
print(f" Output: {p8.output[:160]}")
# =========================================================================
# Part 9: common orchestration shapes
# =========================================================================
print("\n=== Part 9: Common Patterns ===\n")
print("Pattern 1: Parallel Analysis")
print(" Invoke multiple specialists at once, correlate, produce one summary.")
print()
print("Pattern 2: Sequential Refinement")
print(" Broad pass first, then route to a specific specialist based on findings.")
print()
print("Pattern 3: Hierarchical Routing")
print(" Top-level orchestrator routes to sub-orchestrators per domain.")
print()
print("Pattern 4: Consensus Analysis")
print(" Multiple specialists analyse the same data; flag disagreements.")
# =========================================================================
# Part 10: things to keep in mind
# =========================================================================
print("\n=== Part 10: Best Practices ===\n")
print("1. Give specialists focused, non-overlapping domains.")
print("2. Use clear specialist_type names — they show up in audit logs.")
print("3. Write domain-specific system prompts.")
print("4. Set max_parallel_specialists to match your provider quota.")
print("5. Include correlation logic in the summariser.")
print("6. Handle specialist failures gracefully — one bad specialist shouldn't kill the run.")
print("7. Track per-specialist confidence and duration.")
print("\n" + "=" * 60)
print("Next: Notebook 28 — Specialist Agents")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())