Tutorial 51: Cognitive router — bounded graph generation¶
locus.router is a meta-orchestration layer that compiles natural language
onto existing locus primitives. The LLM never picks topology — it fills a typed
GoalFrame; the router picks a protocol from a typed registry and the compiler
emits a real Agent / SequentialPipeline / ParallelPipeline /
LoopAgent from a curated registry.
Pipeline::
natural-language input
│
▼
Agent(output_schema=GoalFrame) ← LLM fills typed schema only
│ GoalFrame(primary_goal, domain, complexity, risk, …)
▼
ProtocolRegistry.select(frame) ← typed filter + ranking
│ Protocol (e.g. "specialist_fanout")
▼
PolicyGate.check(frame, protocol) ← allow | require_approval | deny
│
▼
CognitiveCompiler.compile(…) ← emits Runnable adapter
│ wraps real Agent / Pipeline / Orchestrator
▼
runnable.execute(task)
│
▼
RunnableResult(text, protocol_id, frame)
This tutorial covers:
- Defining a small capability set (annotated tools).
- Registering all 8 built-in protocols.
- Loading
SKILL.mdpackages and tagging them by domain so every emitted Agent gets the right catalog at runtime. - Standing up a
Routerwith aGoalFrameextractor and aCognitiveCompiler. - Dispatching five distinct inputs that hit five different protocols
(
direct_response/plan_execute_validate/specialist_fanout/debate/codegen_test_validate) and printing which protocol fired, the compiled runtime shape, and the result.
Why this is differentiated:
- The LLM never touches orchestration topology — it fills exactly one typed schema. Everything downstream (protocol selection, policy gating, compilation) is rule-based and reproducible.
- Eight built-in protocols cover the cardinal shapes: single-Agent, 3-stage SequentialPipeline, ParallelPipeline fanout, debate + judge, PASS/FAIL LoopAgent, approval-gated execution, A2A delegation, one-tool handoff chain.
- Adding a domain (observability, codegen, support) is one
CapabilityIndexswap — no new protocol needed.
Run::
python examples/tutorial_51_cognitive_router.py
Difficulty: Intermediate Prerequisites: tutorial_01_basic_agent (Agent), tutorial_11_swarm_multiagent (Orchestrator), tutorial_13_structured_output (structured output)
See also¶
- Tutorial 59 — emergent routing —
opt-in
LLMProtocolPickerfor deployments where the rule-based ranker doesn't resolve protocol ambiguity well.
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Tutorial 51: cognitive router — bounded graph generation.
``locus.router`` is a meta-orchestration layer that compiles natural
language onto existing locus primitives. The LLM never picks topology —
it fills a typed ``GoalFrame``; the router selects a protocol
deterministically and the compiler emits a real ``Agent`` /
``SequentialPipeline`` / ``ParallelPipeline`` / ``LoopAgent`` from a
curated registry.
This tutorial covers:
1. Defining a small capability set (annotated tools).
2. Registering all 8 builtin protocols.
3. Loading SKILL.md packages from ``examples/skills/`` and tagging
them by domain so the compiler attaches the right ones to every
emitted Agent.
4. Standing up a Router with an ``Agent(output_schema=GoalFrame)``
extractor and a ``CognitiveCompiler``.
5. Dispatching five distinct user inputs that hit five different
protocols (answer / plan / diagnose / debate / codegen) and
printing which protocol fired, the runtime shape it compiled to,
and the result.
Difficulty: intermediate. Prerequisites: tutorials 01 (Agent), 11
(Orchestrator), and 13 (structured output).
Run with:
python examples/tutorial_51_cognitive_router.py
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from config import get_model
from locus import Agent, tool
from locus.router import (
CapabilityIndex,
CognitiveCompiler,
Complexity,
GoalFrame,
PolicyGate,
ProtocolRegistry,
Risk,
Router,
SkillIndex,
TaskType,
builtin_protocols,
)
from locus.router.runtime import FrameExtractionError
from locus.skills import Skill
from locus.tools.registry import create_registry
# Per-protocol-id description of the runtime shape the builder emits.
# Reproduces the same strings the workbench shows in the Protocols tab
# so the tutorial stays a single source of truth.
RUNTIME_SHAPES: dict[str, str] = {
"direct_response": "Agent (single call)",
"plan_execute_validate": "SequentialPipeline of 3 Agents (planner → executor → validator)",
"specialist_fanout": "ParallelPipeline of N tool-bound Agents",
"debate": "ParallelPipeline of 2 debaters + judge Agent",
"codegen_test_validate": "LoopAgent (PASS/FAIL stop)",
"approval_gated_execution": "Agent wrapped in approval interrupt",
"a2a_delegate": "A2AClient.invoke against remote endpoint",
"handoff_chain": "SequentialPipeline of one-tool Agents",
}
# ---------------------------------------------------------------------------
# Capabilities — three small tools the router can bind to specialists.
# ---------------------------------------------------------------------------
@tool
def kb_search(query: str) -> str:
"""Search the knowledge base for a topic and return a short summary."""
canned = {
"rag": "RAG = retrieve documents from a vector store and condition the LLM on them.",
"router": "locus.router compiles a typed GoalFrame onto bounded orchestration shapes.",
"locus": "Locus is Oracle's zero-LangChain agentic SDK with native multi-agent shapes.",
}
return canned.get(query.lower(), f"No KB entry for {query!r}; suggest a richer query.")
@tool
def get_metric(name: str) -> str:
"""Return the latest value of a named metric (mocked)."""
metrics = {
"cpu": "cpu=87% (warn threshold 80%)",
"latency_p99": "latency_p99=420ms (slo 300ms — breach)",
"errors_5xx": "errors_5xx=0.4% (within 1% budget)",
}
return metrics.get(name.lower(), f"No metric named {name!r}.")
@tool
def list_alerts(window_minutes: int = 30) -> str:
"""List recent alerts inside the given window."""
if window_minutes <= 0:
return "no window provided"
return (
"alert_id=A-101 sev=high svc=checkout latency_p99 breach\n"
"alert_id=A-102 sev=medium svc=catalog cpu_warn"
)
# ---------------------------------------------------------------------------
# Router setup — capabilities, protocols, gate, compiler, router.
# ---------------------------------------------------------------------------
def _load_skills() -> SkillIndex:
"""Load every SKILL.md package under examples/skills/ and tag each
skill with the domain its frontmatter declares (or "global" when
absent — those then appear in every domain's catalogue)."""
idx = SkillIndex()
skills_dir = (Path(__file__).resolve().parent / "skills").resolve()
if not skills_dir.is_dir():
return idx
for skill in Skill.from_directory(skills_dir):
domain = (skill.metadata or {}).get("domain", "")
idx.register(skill, domain=domain)
return idx
def build_router() -> Router:
"""Stand up a fully configured Router for this demo."""
model = get_model()
tools = create_registry(kb_search, get_metric, list_alerts)
capabilities = CapabilityIndex(tools)
capabilities.annotate(
"kb_search",
tool_name="kb_search",
description="Look up a topic in the knowledge base.",
domain="research",
)
capabilities.annotate(
"metric_probe",
tool_name="get_metric",
description="Return the latest value of a named metric.",
domain="observability",
)
capabilities.annotate(
"alert_list",
tool_name="list_alerts",
description="List recent alerts in a time window.",
domain="observability",
)
protocols = ProtocolRegistry()
protocols.register_many(builtin_protocols())
skills = _load_skills()
extractor = Agent(
model=model,
system_prompt=(
"You are a goal-frame extractor for the cognitive router. Given the "
"user's request, fill the provided schema.\n\n"
"Rules:\n"
"1. Pick the single best primary_goal that matches the user's verb.\n"
"2. Risk: LOW for read-only/info; MEDIUM for build/modify/plan tasks; "
"HIGH only for irreversible production operations.\n"
"3. Complexity: LOW for one-step; MEDIUM for multi-step; HIGH for "
"fan-out across multiple specialists.\n"
"4. required_capabilities must come from this set; do not invent ids:\n"
" - kb_search (knowledge-base lookup)\n"
" - metric_probe (latest value of a named metric)\n"
" - alert_list (recent alerts)\n"
"If the request needs a tool that isn't in this list, leave "
"required_capabilities empty."
),
output_schema=GoalFrame,
)
compiler = CognitiveCompiler(
protocols=protocols,
capabilities=capabilities,
policy=PolicyGate(),
model=model,
skills=skills,
)
print(
f"[router] {len(protocols.all())} protocols · {len(capabilities)} capabilities · "
f"{len(skills)} skills",
)
return Router(
extractor=extractor,
compiler=compiler,
on_frame=lambda f: print(
f" [frame] goal={f.primary_goal.value} domain={f.domain} "
f"complexity={f.complexity.value} risk={f.risk.value} "
f"caps={f.required_capabilities}",
),
)
# ---------------------------------------------------------------------------
# Demo — three different inputs land on three different protocols.
# ---------------------------------------------------------------------------
# Each entry: (prompt, fallback_frame). The fallback is used only when
# the extractor can't produce a valid GoalFrame — which happens with
# the default mock model. Against any real provider the extractor
# fills the frame from the prompt and the fallback is unused.
DEMO_CASES: tuple[tuple[str, GoalFrame], ...] = (
(
"What does the locus router do in the context of this SDK?",
GoalFrame(
primary_goal=TaskType.ANSWER,
domain="research",
complexity=Complexity.LOW,
risk=Risk.LOW,
),
),
(
"Write a three-step plan to migrate our checkout service to a new "
"payment provider, including a validation step.",
GoalFrame(
primary_goal=TaskType.PLAN,
domain="engineering",
complexity=Complexity.MEDIUM,
risk=Risk.MEDIUM,
success_criteria=["plan has 3 numbered steps", "validation step present"],
),
),
(
"Diagnose what's slowing down checkout right now — pull recent "
"alerts and the latency_p99 metric, correlate them.",
GoalFrame(
primary_goal=TaskType.DIAGNOSE,
domain="observability",
complexity=Complexity.HIGH,
risk=Risk.MEDIUM,
required_capabilities=["metric_probe", "alert_list"],
),
),
(
"Compare two approaches to handling rate limits: token bucket vs "
"sliding window. Which is better for a high-traffic API?",
GoalFrame(
primary_goal=TaskType.COMPARE,
domain="engineering",
complexity=Complexity.HIGH,
risk=Risk.LOW,
),
),
(
"Generate a Python function that returns the SHA-256 of a string, "
"with a doctest that verifies it on the empty string.",
GoalFrame(
primary_goal=TaskType.GENERATE_CODE,
domain="engineering",
complexity=Complexity.MEDIUM,
risk=Risk.MEDIUM,
),
),
)
async def _dispatch_with_fallback(
router: Router, prompt: str, fallback: GoalFrame
) -> tuple[str, str]:
"""Run ``router.dispatch`` and fall back to the hand-built frame on parse failure.
Returns ``(protocol_id, text)``. The fallback path keeps the tutorial
runnable against the default mock model — the extractor only works
against a real provider, but the deterministic core (selection +
compile + execute) works either way.
"""
try:
result = await router.dispatch(prompt)
return result.protocol_id, result.text
except FrameExtractionError as exc:
print(f" [extractor unavailable: {exc}; using fallback frame]")
runnable = await router.compiler.compile(fallback)
result = await runnable.execute(prompt)
return result.protocol_id, result.text
async def run_demo() -> None:
router = build_router()
fired: dict[str, int] = {}
for i, (prompt, fallback) in enumerate(DEMO_CASES, start=1):
print(f"\n=== Prompt {i} ===\n {prompt}")
try:
protocol_id, text = await _dispatch_with_fallback(router, prompt, fallback)
except Exception as exc: # noqa: BLE001 — demo: surface every failure
print(f" [error] {type(exc).__name__}: {exc}")
continue
fired[protocol_id] = fired.get(protocol_id, 0) + 1
shape = RUNTIME_SHAPES.get(protocol_id, "(unknown shape)")
print(f" [protocol] {protocol_id}")
print(f" [shape] {shape}")
head = "\n ".join(text.strip().splitlines()[:6])
print(f" [result]\n {head}")
print("\n=== Protocol histogram ===")
for pid, n in sorted(fired.items()):
print(f" {pid:28s} × {n}")
if __name__ == "__main__":
asyncio.run(run_demo())