Skip to content

Cognitive Router

locus.router compiles a natural-language request onto existing Locus primitives. The LLM never picks topology — it fills a typed GoalFrame; the router selects a protocol deterministically, and the compiler emits a real Agent / SequentialPipeline / ParallelPipeline / LoopAgent from a curated registry.

Pipeline::

natural-language input
      │
      ▼
Agent(output_schema=GoalFrame)     ← LLM fills typed schema only
      │ GoalFrame(primary_goal, domain, complexity, risk, …)
      ▼
ProtocolRegistry.select(frame)     ← typed filter + ranking
      │ Protocol (e.g. "specialist_fanout")
      ▼
PolicyGate.check(frame, protocol)  ← allow | require_approval | deny
      │
      ▼
CognitiveCompiler.compile(…)       ← emits Runnable adapter
      │ wraps real Agent / Pipeline / Orchestrator
      ▼
runnable.execute(task)
      │
      ▼
RunnableResult(text, protocol_id, frame)
  • Define a small capability set as annotated tools.
  • Register all 8 built-in protocols.
  • Load SKILL.md packages and tag them by domain so every emitted Agent gets the right catalog at runtime.
  • Stand up a Router with a GoalFrame extractor and a CognitiveCompiler.
  • Dispatch five distinct inputs that hit five different protocols (direct_response / plan_execute_validate / specialist_fanout / debate / codegen_test_validate) and print which protocol fired, the compiled runtime shape, and the result.

Run it (OCI Generative AI is the default; auto-detected from ~/.oci/config):

python examples/notebook_58_cognitive_router.py

Offline (uses fallback frames):

LOCUS_MODEL_PROVIDER=mock python examples/notebook_58_cognitive_router.py

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Notebook 53: Cognitive router — natural language to bounded graph.

locus.router compiles a natural-language request onto existing Locus
primitives. The LLM never picks topology — it fills a typed GoalFrame;
the router selects a protocol deterministically, and the compiler emits
a real Agent / SequentialPipeline / ParallelPipeline / LoopAgent from a
curated registry.

- Define a small capability set as annotated tools.
- Register all 8 built-in protocols.
- Load SKILL.md packages from examples/skills/ and tag them by domain
  so the compiler attaches the right ones to every emitted Agent.
- Stand up a Router with an Agent(output_schema=GoalFrame) extractor
  plus a CognitiveCompiler.
- Dispatch five distinct user inputs that hit five different protocols
  (answer / plan / diagnose / debate / codegen) and print which
  protocol fired, the runtime shape it compiled to, and the result.

Run it
    # Default: OCI Generative AI auto-detected from ~/.oci/config
    python examples/notebook_58_cognitive_router.py

    # Offline / no credentials (uses fallback frames):
    LOCUS_MODEL_PROVIDER=mock python examples/notebook_58_cognitive_router.py
"""

from __future__ import annotations

import asyncio
from pathlib import Path

from config import get_model

from locus.agent import Agent
from locus.router import (
    CapabilityIndex,
    CognitiveCompiler,
    Complexity,
    GoalFrame,
    PolicyGate,
    ProtocolRegistry,
    Risk,
    Router,
    SkillIndex,
    TaskType,
    builtin_protocols,
)
from locus.router.runtime import FrameExtractionError
from locus.skills import Skill
from locus.tools import tool
from locus.tools.registry import create_registry


# Per-protocol-id description of the runtime shape the builder emits.
# Kept identical to the workbench Protocols tab so the notebook doesn't
# drift from the UI.
RUNTIME_SHAPES: dict[str, str] = {
    "direct_response": "Agent (single call)",
    "plan_execute_validate": "SequentialPipeline of 3 Agents (planner → executor → validator)",
    "specialist_fanout": "ParallelPipeline of N tool-bound Agents",
    "debate": "ParallelPipeline of 2 debaters + judge Agent",
    "codegen_test_validate": "LoopAgent (PASS/FAIL stop)",
    "approval_gated_execution": "Agent wrapped in approval interrupt",
    "a2a_delegate": "A2AClient.invoke against remote endpoint",
    "handoff_chain": "SequentialPipeline of one-tool Agents",
}


# Three small tools the router can bind to specialist agents.


@tool
def kb_search(query: str) -> str:
    """Search the knowledge base for a topic and return a short summary."""
    canned = {
        "rag": "RAG = retrieve documents from a vector store and condition the LLM on them.",
        "router": "locus.router compiles a typed GoalFrame onto bounded orchestration shapes.",
        "locus": "Locus is Oracle's zero-LangChain agentic SDK with native multi-agent shapes.",
    }
    return canned.get(query.lower(), f"No KB entry for {query!r}; suggest a richer query.")


@tool
def get_metric(name: str) -> str:
    """Return the latest value of a named metric (mocked)."""
    metrics = {
        "cpu": "cpu=87% (warn threshold 80%)",
        "latency_p99": "latency_p99=420ms (slo 300ms — breach)",
        "errors_5xx": "errors_5xx=0.4% (within 1% budget)",
    }
    return metrics.get(name.lower(), f"No metric named {name!r}.")


@tool
def list_alerts(window_minutes: int = 30) -> str:
    """List recent alerts inside the given window."""
    if window_minutes <= 0:
        return "no window provided"
    return (
        "alert_id=A-101 sev=high svc=checkout latency_p99 breach\n"
        "alert_id=A-102 sev=medium svc=catalog cpu_warn"
    )


# Router setup: capabilities, protocols, policy gate, compiler, router.


def _load_skills() -> SkillIndex:
    """Load every SKILL.md package under examples/skills/ and tag each
    skill with the domain its frontmatter declares (or "global" when
    absent — those then appear in every domain's catalogue)."""
    idx = SkillIndex()
    skills_dir = (Path(__file__).resolve().parent / "skills").resolve()
    if not skills_dir.is_dir():
        return idx
    for skill in Skill.from_directory(skills_dir):
        domain = (skill.metadata or {}).get("domain", "")
        idx.register(skill, domain=domain)
    return idx


def build_router() -> Router:
    """Stand up a fully configured Router for this demo."""
    model = get_model()

    tools = create_registry(kb_search, get_metric, list_alerts)
    capabilities = CapabilityIndex(tools)
    capabilities.annotate(
        "kb_search",
        tool_name="kb_search",
        description="Look up a topic in the knowledge base.",
        domain="research",
    )
    capabilities.annotate(
        "metric_probe",
        tool_name="get_metric",
        description="Return the latest value of a named metric.",
        domain="observability",
    )
    capabilities.annotate(
        "alert_list",
        tool_name="list_alerts",
        description="List recent alerts in a time window.",
        domain="observability",
    )

    protocols = ProtocolRegistry()
    protocols.register_many(builtin_protocols())

    skills = _load_skills()

    extractor = Agent(
        model=model,
        system_prompt=(
            "You are a goal-frame extractor for the cognitive router. Given the "
            "user's request, fill the provided schema.\n\n"
            "Rules:\n"
            "1. Pick the single best primary_goal that matches the user's verb.\n"
            "2. Risk: LOW for read-only/info; MEDIUM for build/modify/plan tasks; "
            "HIGH only for irreversible production operations.\n"
            "3. Complexity: LOW for one-step; MEDIUM for multi-step; HIGH for "
            "fan-out across multiple specialists.\n"
            "4. required_capabilities must come from this set; do not invent ids:\n"
            "     - kb_search   (knowledge-base lookup)\n"
            "     - metric_probe (latest value of a named metric)\n"
            "     - alert_list  (recent alerts)\n"
            "If the request needs a tool that isn't in this list, leave "
            "required_capabilities empty."
        ),
        output_schema=GoalFrame,
    )
    compiler = CognitiveCompiler(
        protocols=protocols,
        capabilities=capabilities,
        policy=PolicyGate(),
        model=model,
        skills=skills,
    )
    print(
        f"[router] {len(protocols.all())} protocols · {len(capabilities)} capabilities · "
        f"{len(skills)} skills",
    )
    return Router(
        extractor=extractor,
        compiler=compiler,
        on_frame=lambda f: print(
            f"  [frame] goal={f.primary_goal.value} domain={f.domain} "
            f"complexity={f.complexity.value} risk={f.risk.value} "
            f"caps={f.required_capabilities}",
        ),
    )


# Demo cases. Each entry: (prompt, fallback_frame). The fallback is used
# only when the extractor can't produce a valid GoalFrame — i.e. against
# the mock provider. A real provider fills the frame from the prompt.
DEMO_CASES: tuple[tuple[str, GoalFrame], ...] = (
    (
        "What does the locus router do in the context of this SDK?",
        GoalFrame(
            primary_goal=TaskType.ANSWER,
            domain="research",
            complexity=Complexity.LOW,
            risk=Risk.LOW,
        ),
    ),
    (
        "Write a three-step plan to migrate our checkout service to a new "
        "payment provider, including a validation step.",
        GoalFrame(
            primary_goal=TaskType.PLAN,
            domain="engineering",
            complexity=Complexity.MEDIUM,
            risk=Risk.MEDIUM,
            success_criteria=["plan has 3 numbered steps", "validation step present"],
        ),
    ),
    (
        "Diagnose what's slowing down checkout right now — pull recent "
        "alerts and the latency_p99 metric, correlate them.",
        GoalFrame(
            primary_goal=TaskType.DIAGNOSE,
            domain="observability",
            complexity=Complexity.HIGH,
            risk=Risk.MEDIUM,
            required_capabilities=["metric_probe", "alert_list"],
        ),
    ),
    (
        "Compare two approaches to handling rate limits: token bucket vs "
        "sliding window. Which is better for a high-traffic API?",
        GoalFrame(
            primary_goal=TaskType.COMPARE,
            domain="engineering",
            complexity=Complexity.HIGH,
            risk=Risk.LOW,
        ),
    ),
    (
        "Generate a Python function that returns the SHA-256 of a string, "
        "with a doctest that verifies it on the empty string.",
        GoalFrame(
            primary_goal=TaskType.GENERATE_CODE,
            domain="engineering",
            complexity=Complexity.MEDIUM,
            risk=Risk.MEDIUM,
        ),
    ),
)


async def _dispatch_with_fallback(
    router: Router, prompt: str, fallback: GoalFrame
) -> tuple[str, str]:
    """Run router.dispatch; fall back to the hand-built frame on parse failure.

    The deterministic core (protocol selection, compile, execute) works
    even when the extractor can't produce a valid frame, so the notebook
    runs end-to-end against the mock provider.
    """
    try:
        result = await router.dispatch(prompt)
        return result.protocol_id, result.text
    except FrameExtractionError as exc:
        print(f"  [extractor unavailable: {exc}; using fallback frame]")
        runnable = await router.compiler.compile(fallback)
        result = await runnable.execute(prompt)
        return result.protocol_id, result.text


async def run_demo() -> None:
    router = build_router()
    fired: dict[str, int] = {}
    for i, (prompt, fallback) in enumerate(DEMO_CASES, start=1):
        print(f"\n=== Prompt {i} ===\n  {prompt}")
        try:
            protocol_id, text = await _dispatch_with_fallback(router, prompt, fallback)
        except Exception as exc:  # noqa: BLE001 — surface every failure for the demo
            print(f"  [error] {type(exc).__name__}: {exc}")
            continue
        fired[protocol_id] = fired.get(protocol_id, 0) + 1
        shape = RUNTIME_SHAPES.get(protocol_id, "(unknown shape)")
        print(f"  [protocol] {protocol_id}")
        print(f"  [shape]    {shape}")
        head = "\n  ".join(text.strip().splitlines()[:6])
        print(f"  [result]\n  {head}")
    print("\n=== Protocol histogram ===")
    for pid, n in sorted(fired.items()):
        print(f"  {pid:28s} × {n}")


if __name__ == "__main__":
    asyncio.run(run_demo())