Skip to content

Tutorial 51: Cognitive router — bounded graph generation

locus.router is a meta-orchestration layer that compiles natural language onto existing locus primitives. The LLM never picks topology — it fills a typed GoalFrame; the router picks a protocol from a typed registry and the compiler emits a real Agent / SequentialPipeline / ParallelPipeline / LoopAgent from a curated registry.

Pipeline::

natural-language input
      │
      ▼
Agent(output_schema=GoalFrame)     ← LLM fills typed schema only
      │ GoalFrame(primary_goal, domain, complexity, risk, …)
      ▼
ProtocolRegistry.select(frame)     ← typed filter + ranking
      │ Protocol (e.g. "specialist_fanout")
      ▼
PolicyGate.check(frame, protocol)  ← allow | require_approval | deny
      │
      ▼
CognitiveCompiler.compile(…)       ← emits Runnable adapter
      │ wraps real Agent / Pipeline / Orchestrator
      ▼
runnable.execute(task)
      │
      ▼
RunnableResult(text, protocol_id, frame)

This tutorial covers:

  1. Defining a small capability set (annotated tools).
  2. Registering all 8 built-in protocols.
  3. Loading SKILL.md packages and tagging them by domain so every emitted Agent gets the right catalog at runtime.
  4. Standing up a Router with a GoalFrame extractor and a CognitiveCompiler.
  5. Dispatching five distinct inputs that hit five different protocols (direct_response / plan_execute_validate / specialist_fanout / debate / codegen_test_validate) and printing which protocol fired, the compiled runtime shape, and the result.

Why this is differentiated:

  • The LLM never touches orchestration topology — it fills exactly one typed schema. Everything downstream (protocol selection, policy gating, compilation) is rule-based and reproducible.
  • Eight built-in protocols cover the cardinal shapes: single-Agent, 3-stage SequentialPipeline, ParallelPipeline fanout, debate + judge, PASS/FAIL LoopAgent, approval-gated execution, A2A delegation, one-tool handoff chain.
  • Adding a domain (observability, codegen, support) is one CapabilityIndex swap — no new protocol needed.

Run::

python examples/tutorial_51_cognitive_router.py

Difficulty: Intermediate Prerequisites: tutorial_01_basic_agent (Agent), tutorial_11_swarm_multiagent (Orchestrator), tutorial_13_structured_output (structured output)

See also

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Tutorial 51: cognitive router — bounded graph generation.

``locus.router`` is a meta-orchestration layer that compiles natural
language onto existing locus primitives. The LLM never picks topology —
it fills a typed ``GoalFrame``; the router selects a protocol
deterministically and the compiler emits a real ``Agent`` /
``SequentialPipeline`` / ``ParallelPipeline`` / ``LoopAgent`` from a
curated registry.

This tutorial covers:

1. Defining a small capability set (annotated tools).
2. Registering all 8 builtin protocols.
3. Loading SKILL.md packages from ``examples/skills/`` and tagging
   them by domain so the compiler attaches the right ones to every
   emitted Agent.
4. Standing up a Router with an ``Agent(output_schema=GoalFrame)``
   extractor and a ``CognitiveCompiler``.
5. Dispatching five distinct user inputs that hit five different
   protocols (answer / plan / diagnose / debate / codegen) and
   printing which protocol fired, the runtime shape it compiled to,
   and the result.

Difficulty: intermediate. Prerequisites: tutorials 01 (Agent), 11
(Orchestrator), and 13 (structured output).

Run with:
    python examples/tutorial_51_cognitive_router.py
"""

from __future__ import annotations

import asyncio
from pathlib import Path

from config import get_model

from locus import Agent, tool
from locus.router import (
    CapabilityIndex,
    CognitiveCompiler,
    Complexity,
    GoalFrame,
    PolicyGate,
    ProtocolRegistry,
    Risk,
    Router,
    SkillIndex,
    TaskType,
    builtin_protocols,
)
from locus.router.runtime import FrameExtractionError
from locus.skills import Skill
from locus.tools.registry import create_registry


# Per-protocol-id description of the runtime shape the builder emits.
# Reproduces the same strings the workbench shows in the Protocols tab
# so the tutorial stays a single source of truth.
RUNTIME_SHAPES: dict[str, str] = {
    "direct_response": "Agent (single call)",
    "plan_execute_validate": "SequentialPipeline of 3 Agents (planner → executor → validator)",
    "specialist_fanout": "ParallelPipeline of N tool-bound Agents",
    "debate": "ParallelPipeline of 2 debaters + judge Agent",
    "codegen_test_validate": "LoopAgent (PASS/FAIL stop)",
    "approval_gated_execution": "Agent wrapped in approval interrupt",
    "a2a_delegate": "A2AClient.invoke against remote endpoint",
    "handoff_chain": "SequentialPipeline of one-tool Agents",
}


# ---------------------------------------------------------------------------
# Capabilities — three small tools the router can bind to specialists.
# ---------------------------------------------------------------------------


@tool
def kb_search(query: str) -> str:
    """Search the knowledge base for a topic and return a short summary."""
    canned = {
        "rag": "RAG = retrieve documents from a vector store and condition the LLM on them.",
        "router": "locus.router compiles a typed GoalFrame onto bounded orchestration shapes.",
        "locus": "Locus is Oracle's zero-LangChain agentic SDK with native multi-agent shapes.",
    }
    return canned.get(query.lower(), f"No KB entry for {query!r}; suggest a richer query.")


@tool
def get_metric(name: str) -> str:
    """Return the latest value of a named metric (mocked)."""
    metrics = {
        "cpu": "cpu=87% (warn threshold 80%)",
        "latency_p99": "latency_p99=420ms (slo 300ms — breach)",
        "errors_5xx": "errors_5xx=0.4% (within 1% budget)",
    }
    return metrics.get(name.lower(), f"No metric named {name!r}.")


@tool
def list_alerts(window_minutes: int = 30) -> str:
    """List recent alerts inside the given window."""
    if window_minutes <= 0:
        return "no window provided"
    return (
        "alert_id=A-101 sev=high svc=checkout latency_p99 breach\n"
        "alert_id=A-102 sev=medium svc=catalog cpu_warn"
    )


# ---------------------------------------------------------------------------
# Router setup — capabilities, protocols, gate, compiler, router.
# ---------------------------------------------------------------------------


def _load_skills() -> SkillIndex:
    """Load every SKILL.md package under examples/skills/ and tag each
    skill with the domain its frontmatter declares (or "global" when
    absent — those then appear in every domain's catalogue)."""
    idx = SkillIndex()
    skills_dir = (Path(__file__).resolve().parent / "skills").resolve()
    if not skills_dir.is_dir():
        return idx
    for skill in Skill.from_directory(skills_dir):
        domain = (skill.metadata or {}).get("domain", "")
        idx.register(skill, domain=domain)
    return idx


def build_router() -> Router:
    """Stand up a fully configured Router for this demo."""
    model = get_model()

    tools = create_registry(kb_search, get_metric, list_alerts)
    capabilities = CapabilityIndex(tools)
    capabilities.annotate(
        "kb_search",
        tool_name="kb_search",
        description="Look up a topic in the knowledge base.",
        domain="research",
    )
    capabilities.annotate(
        "metric_probe",
        tool_name="get_metric",
        description="Return the latest value of a named metric.",
        domain="observability",
    )
    capabilities.annotate(
        "alert_list",
        tool_name="list_alerts",
        description="List recent alerts in a time window.",
        domain="observability",
    )

    protocols = ProtocolRegistry()
    protocols.register_many(builtin_protocols())

    skills = _load_skills()

    extractor = Agent(
        model=model,
        system_prompt=(
            "You are a goal-frame extractor for the cognitive router. Given the "
            "user's request, fill the provided schema.\n\n"
            "Rules:\n"
            "1. Pick the single best primary_goal that matches the user's verb.\n"
            "2. Risk: LOW for read-only/info; MEDIUM for build/modify/plan tasks; "
            "HIGH only for irreversible production operations.\n"
            "3. Complexity: LOW for one-step; MEDIUM for multi-step; HIGH for "
            "fan-out across multiple specialists.\n"
            "4. required_capabilities must come from this set; do not invent ids:\n"
            "     - kb_search   (knowledge-base lookup)\n"
            "     - metric_probe (latest value of a named metric)\n"
            "     - alert_list  (recent alerts)\n"
            "If the request needs a tool that isn't in this list, leave "
            "required_capabilities empty."
        ),
        output_schema=GoalFrame,
    )
    compiler = CognitiveCompiler(
        protocols=protocols,
        capabilities=capabilities,
        policy=PolicyGate(),
        model=model,
        skills=skills,
    )
    print(
        f"[router] {len(protocols.all())} protocols · {len(capabilities)} capabilities · "
        f"{len(skills)} skills",
    )
    return Router(
        extractor=extractor,
        compiler=compiler,
        on_frame=lambda f: print(
            f"  [frame] goal={f.primary_goal.value} domain={f.domain} "
            f"complexity={f.complexity.value} risk={f.risk.value} "
            f"caps={f.required_capabilities}",
        ),
    )


# ---------------------------------------------------------------------------
# Demo — three different inputs land on three different protocols.
# ---------------------------------------------------------------------------


# Each entry: (prompt, fallback_frame). The fallback is used only when
# the extractor can't produce a valid GoalFrame — which happens with
# the default mock model. Against any real provider the extractor
# fills the frame from the prompt and the fallback is unused.
DEMO_CASES: tuple[tuple[str, GoalFrame], ...] = (
    (
        "What does the locus router do in the context of this SDK?",
        GoalFrame(
            primary_goal=TaskType.ANSWER,
            domain="research",
            complexity=Complexity.LOW,
            risk=Risk.LOW,
        ),
    ),
    (
        "Write a three-step plan to migrate our checkout service to a new "
        "payment provider, including a validation step.",
        GoalFrame(
            primary_goal=TaskType.PLAN,
            domain="engineering",
            complexity=Complexity.MEDIUM,
            risk=Risk.MEDIUM,
            success_criteria=["plan has 3 numbered steps", "validation step present"],
        ),
    ),
    (
        "Diagnose what's slowing down checkout right now — pull recent "
        "alerts and the latency_p99 metric, correlate them.",
        GoalFrame(
            primary_goal=TaskType.DIAGNOSE,
            domain="observability",
            complexity=Complexity.HIGH,
            risk=Risk.MEDIUM,
            required_capabilities=["metric_probe", "alert_list"],
        ),
    ),
    (
        "Compare two approaches to handling rate limits: token bucket vs "
        "sliding window. Which is better for a high-traffic API?",
        GoalFrame(
            primary_goal=TaskType.COMPARE,
            domain="engineering",
            complexity=Complexity.HIGH,
            risk=Risk.LOW,
        ),
    ),
    (
        "Generate a Python function that returns the SHA-256 of a string, "
        "with a doctest that verifies it on the empty string.",
        GoalFrame(
            primary_goal=TaskType.GENERATE_CODE,
            domain="engineering",
            complexity=Complexity.MEDIUM,
            risk=Risk.MEDIUM,
        ),
    ),
)


async def _dispatch_with_fallback(
    router: Router, prompt: str, fallback: GoalFrame
) -> tuple[str, str]:
    """Run ``router.dispatch`` and fall back to the hand-built frame on parse failure.

    Returns ``(protocol_id, text)``. The fallback path keeps the tutorial
    runnable against the default mock model — the extractor only works
    against a real provider, but the deterministic core (selection +
    compile + execute) works either way.
    """
    try:
        result = await router.dispatch(prompt)
        return result.protocol_id, result.text
    except FrameExtractionError as exc:
        print(f"  [extractor unavailable: {exc}; using fallback frame]")
        runnable = await router.compiler.compile(fallback)
        result = await runnable.execute(prompt)
        return result.protocol_id, result.text


async def run_demo() -> None:
    router = build_router()
    fired: dict[str, int] = {}
    for i, (prompt, fallback) in enumerate(DEMO_CASES, start=1):
        print(f"\n=== Prompt {i} ===\n  {prompt}")
        try:
            protocol_id, text = await _dispatch_with_fallback(router, prompt, fallback)
        except Exception as exc:  # noqa: BLE001 — demo: surface every failure
            print(f"  [error] {type(exc).__name__}: {exc}")
            continue
        fired[protocol_id] = fired.get(protocol_id, 0) + 1
        shape = RUNTIME_SHAPES.get(protocol_id, "(unknown shape)")
        print(f"  [protocol] {protocol_id}")
        print(f"  [shape]    {shape}")
        head = "\n  ".join(text.strip().splitlines()[:6])
        print(f"  [result]\n  {head}")
    print("\n=== Protocol histogram ===")
    for pid, n in sorted(fired.items()):
        print(f"  {pid:28s} × {n}")


if __name__ == "__main__":
    asyncio.run(run_demo())