Specialist Agents¶
Notebook 27 introduced the Specialist as the worker an orchestrator hands tasks to. This notebook dives into the Specialist itself: how to narrow a model's failure surface with a focused system prompt, a hand-picked tool set, optional playbooks, and a confidence threshold.
This notebook covers:
Specialist— a LocusAgentwith role metadata (specialist_type,description), a tool list, and aconfidence_threshold.Playbook+PlaybookStep— encode a procedure: preconditions, ordered steps with required tools and expected outputs, plus failure handling.specialist.select_playbook(task)— picks one playbook from a pool by matching the task description.- Pre-built helpers (
create_log_analyst,create_metrics_analyst,create_trace_analyst,create_code_analyst) for common observability domains.
Prerequisites¶
- Notebook 08 (Agent basics).
- Notebook 27 (Orchestrator) — Specialists are the workers it routes to.
Run¶
The default provider is OCI Generative AI. With ~/.oci/config
present the specialists talk to a live OCI model; canonical picks are
openai.gpt-4.1 or meta.llama-3.3-70b-instruct. Set
LOCUS_MODEL_PROVIDER=mock for offline runs.
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 28: specialist agents — domain focus, tools, and playbooks.
Notebook 27 introduced the Specialist as the worker an orchestrator
hands tasks to. This notebook dives into the Specialist itself: how to
narrow a model's failure surface with a focused system prompt, a
hand-picked tool set, optional playbooks, and a confidence threshold.
- A ``Specialist`` is a Locus ``Agent`` with role metadata
(``specialist_type``, ``description``), a tool list, and a
``confidence_threshold`` that gates "good enough" answers.
- ``Playbook`` and ``PlaybookStep`` encode standard procedures —
preconditions, ordered steps with required tools and expected
outputs, plus failure handling.
- A specialist can carry multiple playbooks; ``select_playbook(task)``
picks one based on task description matching.
- Pre-built helpers (``create_log_analyst``, ``create_metrics_analyst``,
``create_trace_analyst``, ``create_code_analyst``) ship with sensible
defaults for common observability domains.
Run it:
.venv/bin/python examples/notebook_33_specialist_agents.py
The default provider is OCI Generative AI. With ``~/.oci/config``
present the specialists talk to a live OCI model (canonical pick:
``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``). Set
``LOCUS_MODEL_PROVIDER=mock`` for offline runs.
Prerequisites:
- Notebook 08 (Agent basics).
- Notebook 27 (Orchestrator) — Specialists are the workers it routes to.
"""
import asyncio
import time
from config import get_model, print_config
from locus.agent import Agent
from locus.multiagent.specialist import (
Playbook,
PlaybookStep,
Specialist,
create_code_analyst,
create_log_analyst,
create_metrics_analyst,
create_trace_analyst,
)
from locus.tools.decorator import tool
def _llm_call(
prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
"""One model call with a timing/token banner — used for commentary."""
agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
t0 = time.perf_counter()
res = agent.run_sync(prompt)
dt = time.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {res.metrics.prompt_tokens}→{res.metrics.completion_tokens} tokens]"
)
return res.message.strip()
async def main():
print("=" * 60)
print("Notebook 28: specialist agents — domain focus, tools, playbooks")
print("=" * 60)
print()
print_config()
model = get_model()
# =========================================================================
# Part 1: anatomy of a Specialist
# =========================================================================
print("\n=== Part 1: Specialist Anatomy ===\n")
# A specialist pairs a focused system prompt with domain tools,
# optional playbooks, and a confidence threshold.
specialist = Specialist(
name="API Specialist",
specialist_type="api_analyst",
description="Analyzes API performance, errors, and patterns",
system_prompt="""You are an API analysis specialist. Your expertise:
1. Analyzing HTTP status codes and error rates
2. Identifying slow endpoints
3. Detecting anomalous traffic patterns
4. Recommending API optimizations
When analyzing:
- Check error rates by endpoint
- Look for latency outliers
- Identify authentication issues
- Note rate limiting triggers""",
max_iterations=10,
confidence_threshold=0.85,
model=model,
)
print(f"Specialist: {specialist.name}")
print(f" Type: {specialist.specialist_type}")
print(f" Max iterations: {specialist.max_iterations}")
print(f" Confidence threshold: {specialist.confidence_threshold}")
t0 = time.perf_counter()
p1 = await specialist.execute(task="In one sentence, what is your specialty?")
dt = time.perf_counter() - t0
print(f" [model call: {dt:.2f}s · specialist.execute()]")
if p1.output:
print(f" Smoke output: {p1.output[:160]}")
# =========================================================================
# Part 2: hand-pick tools for the domain
# =========================================================================
print("\n=== Part 2: Domain Tools ===\n")
@tool(name="get_endpoint_stats", description="Get statistics for an API endpoint")
async def get_endpoint_stats(endpoint: str) -> str:
return f"Endpoint {endpoint}: 1000 req/min, 2.5% error rate, p99=450ms"
@tool(name="get_error_breakdown", description="Get error breakdown by status code")
async def get_error_breakdown() -> str:
return "Errors: 400=15%, 401=5%, 403=2%, 500=75%, 503=3%"
@tool(name="get_top_slow_endpoints", description="Get slowest API endpoints")
async def get_top_slow_endpoints() -> str:
return "Slowest: /api/users (800ms), /api/search (650ms), /api/reports (500ms)"
specialist = Specialist(
name="API Specialist",
specialist_type="api_analyst",
description="Analyzes API performance, errors, and patterns",
system_prompt="You analyze API behavior and performance.",
tools=[get_endpoint_stats, get_error_breakdown, get_top_slow_endpoints],
model=model,
)
print(f"Tools available: {[t.name for t in specialist.tools]}")
print(
f"AI commentary: {_llm_call('In one sentence, why does giving a Specialist domain-specific tools dramatically narrow its failure surface?')}"
)
# =========================================================================
# Part 3: encode a procedure as a Playbook
# =========================================================================
print("\n=== Part 3: Specialist Playbooks ===\n")
api_debug_playbook = Playbook(
name="API Debug Procedure",
description="Standard procedure for debugging API issues",
preconditions=[
"Incident ticket exists",
"Basic metrics are accessible",
],
steps=[
PlaybookStep(
instruction="Check overall API health metrics",
required_tools=["get_endpoint_stats"],
expected_output="Current request rate and error percentages",
),
PlaybookStep(
instruction="Analyze error distribution",
required_tools=["get_error_breakdown"],
expected_output="Breakdown of errors by type",
on_failure="Escalate if unable to get error data",
),
PlaybookStep(
instruction="Identify slow endpoints",
required_tools=["get_top_slow_endpoints"],
expected_output="List of endpoints exceeding latency threshold",
),
],
success_criteria="Root cause identified or escalation path determined",
)
specialist.playbooks.append(api_debug_playbook)
print(f"Playbook: {api_debug_playbook.name}")
print(f" Preconditions: {api_debug_playbook.preconditions}")
print(f" Steps: {len(api_debug_playbook.steps)}")
print(f" Success criteria: {api_debug_playbook.success_criteria}")
# ``to_prompt()`` renders the playbook as the text block injected
# into the system prompt when the playbook is selected.
playbook_prompt = api_debug_playbook.to_prompt()
print("\nPlaybook prompt:")
print("-" * 40)
print(playbook_prompt[:500] + "...")
print(
f"AI commentary: {_llm_call('In one sentence, when does attaching a Playbook to a Specialist matter most?')}"
)
# =========================================================================
# Part 4: pick the right playbook from a pool
# =========================================================================
print("\n=== Part 4: Playbook Selection ===\n")
performance_playbook = Playbook(
name="Performance Optimization",
description="Procedure for optimizing API performance",
steps=[
PlaybookStep(instruction="Profile slow endpoints"),
PlaybookStep(instruction="Identify bottlenecks"),
PlaybookStep(instruction="Recommend optimizations"),
],
)
security_playbook = Playbook(
name="Security Analysis",
description="Procedure for analyzing security issues",
steps=[
PlaybookStep(instruction="Check authentication failures"),
PlaybookStep(instruction="Review access patterns"),
PlaybookStep(instruction="Identify suspicious activity"),
],
)
specialist.playbooks.extend([performance_playbook, security_playbook])
# select_playbook matches the task description against each playbook's
# name and description; it returns None if nothing fits.
tasks = [
"Debug the API errors we're seeing",
"Optimize the slow /api/search endpoint",
"Check for unauthorized access attempts",
]
for task in tasks:
selected = specialist.select_playbook(task)
if selected:
print(f"Task: '{task[:40]}...'")
print(f" Selected playbook: {selected.name}")
print(
f"AI commentary: {_llm_call('In one sentence, why is automatic playbook selection by task description risky and how do you mitigate it?')}"
)
# =========================================================================
# Part 5: drive the specialist end-to-end
# =========================================================================
print("\n=== Part 5: Executing Specialists ===\n")
result = await specialist.execute(
task="API error rates have spiked in the last hour. Investigate and identify the cause.",
context={
"incident_id": "INC-2024-001",
"affected_services": ["api-gateway"],
"start_time": "2024-01-15T10:00:00Z",
},
)
print("Execution Result:")
print(f" Success: {result.success}")
print(f" Confidence: {result.confidence:.0%}")
print(f" Duration: {result.duration_ms:.0f}ms")
if result.output:
print(f" Output: {result.output[:300]}...")
if result.error:
print(f" Error: {result.error}")
# =========================================================================
# Part 6: pre-built specialists for common domains
# =========================================================================
print("\n=== Part 6: Pre-built Specialists ===\n")
log_analyst = create_log_analyst(model=model)
metrics_analyst = create_metrics_analyst(model=model)
trace_analyst = create_trace_analyst(model=model)
code_analyst = create_code_analyst(model=model)
specialists = [log_analyst, metrics_analyst, trace_analyst, code_analyst]
print("Pre-built Specialists:")
for spec in specialists:
print(f"\n {spec.name}")
print(f" Type: {spec.specialist_type}")
prompt_preview = spec.system_prompt.split("\n")[0]
print(f" Focus: {prompt_preview[:60]}...")
t0 = time.perf_counter()
p6 = await metrics_analyst.execute(task="In one sentence, what does a metrics analyst do?")
dt = time.perf_counter() - t0
print(f"\n [model call: {dt:.2f}s · metrics_analyst.execute()]")
if p6.output:
print(f" Output: {p6.output[:160]}")
# =========================================================================
# Part 7: extend a pre-built specialist with your own tools
# =========================================================================
print("\n=== Part 7: Custom Tools Integration ===\n")
@tool(name="search_logs", description="Search logs for patterns")
async def search_logs(pattern: str, timerange: str = "1h") -> str:
return f"Found 42 matches for '{pattern}' in last {timerange}"
@tool(name="get_error_logs", description="Get recent error logs")
async def get_error_logs(limit: int = 10) -> str:
return f"Retrieved {limit} most recent error logs"
custom_log_analyst = create_log_analyst(
model=model,
tools=[search_logs, get_error_logs],
)
print(f"Custom log analyst tools: {[t.name for t in custom_log_analyst.tools]}")
log_result = await custom_log_analyst.execute(
task="Search for NullPointerException errors in the last hour",
)
print("Log analysis result:")
print(f" Confidence: {log_result.confidence:.0%}")
if log_result.output:
print(f" Output: {log_result.output[:200]}...")
# =========================================================================
# Part 8: how confidence is estimated
# =========================================================================
print("\n=== Part 8: Confidence Estimation ===\n")
# The built-in estimator scans the response for hedging vs.
# certainty markers — a rough proxy that you'd typically replace
# with a domain-specific scorer in production.
responses = [
("definitely the root cause", "High confidence markers"),
("might be related to", "Low confidence markers"),
("confirmed by the logs", "Verification markers"),
("unclear what is causing", "Uncertainty markers"),
]
print("Confidence markers in responses:")
for response, description in responses:
confidence = specialist._estimate_confidence(response)
print(f" '{response}' -> {confidence:.0%} ({description})")
print(
f"AI commentary: {_llm_call('In one sentence, why is keyword-based confidence estimation only a rough proxy?')}"
)
# =========================================================================
# Part 9: common specialist shapes
# =========================================================================
print("\n=== Part 9: Specialist Patterns ===\n")
print("Pattern 1: Domain Expert")
print(" Focused prompt + domain tools + high confidence threshold.")
print()
print("Pattern 2: Procedure Follower")
print(" Playbook-driven; each step validated against expected output.")
print()
print("Pattern 3: Adaptive Analyst")
print(" Multiple playbooks; the right one selected per task.")
print()
print("Pattern 4: Pipeline Stage")
print(" Drops into a larger workflow; structured output, context in/out.")
print(
f"AI suggestion: {_llm_call('Suggest one extra Specialist pattern not in the four listed above. One short sentence.')}"
)
# =========================================================================
# Part 10: assemble an incident-response team
# =========================================================================
print("\n=== Part 10: Specialist Teams ===\n")
def create_incident_response_team(model):
"""One triage specialist plus three pre-built analysts."""
return {
"triage": Specialist(
name="Triage Specialist",
specialist_type="triage",
description="Initial incident assessment and severity classification",
system_prompt="Assess incidents and determine severity and routing.",
model=model,
),
"logs": create_log_analyst(model=model),
"metrics": create_metrics_analyst(model=model),
"code": create_code_analyst(model=model),
}
team = create_incident_response_team(model)
print("Incident Response Team:")
for role, spec in team.items():
print(f" {role}: {spec.name}")
t0 = time.perf_counter()
p10 = await team["triage"].execute(
task="In one sentence, classify this incident: 'API p99 jumped from 30ms to 800ms after a deploy.'",
)
dt = time.perf_counter() - t0
print(f" [model call: {dt:.2f}s · triage.execute()]")
if p10.output:
print(f" Triage verdict: {p10.output[:160]}")
# =========================================================================
print("\n" + "=" * 60)
print("Next: Notebook 29 — A2A Protocol")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())