Skip to content

Cognitive Router

The cognitive router compiles natural-language tasks onto proven orchestration shapes. The LLM fills one typed GoalFrame; everything after that — protocol selection, policy gating, compilation — is rule-based.

Dispatch

Router

Router(*, extractor: Agent, compiler: CognitiveCompiler, on_frame: Callable[[GoalFrame], None] | None = None)

One-shot dispatcher: text in, :class:RunnableResult out.

Parameters

extractor: :class:~locus.Agent configured with output_schema=GoalFrame. Its parsed result drives selection. compiler: :class:CognitiveCompiler set up with protocols, capabilities, policy, and model. on_frame: Optional callback fired right after a frame is extracted. Useful for telemetry / workbench display.

Source code in src/locus/router/runtime.py
def __init__(
    self,
    *,
    extractor: Agent,
    compiler: CognitiveCompiler,
    on_frame: Callable[[GoalFrame], None] | None = None,
) -> None:
    self.extractor = extractor
    self.compiler = compiler
    self._on_frame = on_frame

extract async

extract(user_input: str, run_id: str | None = None) -> GoalFrame

Run the extractor and return the parsed :class:GoalFrame.

Emits router.frame.extracted on success or router.frame.failed on schema rejection, scoped to run_id when supplied. Raises :class:FrameExtractionError on failure — the compiler can't recover from that.

Source code in src/locus/router/runtime.py
async def extract(self, user_input: str, run_id: str | None = None) -> GoalFrame:
    """Run the extractor and return the parsed :class:`GoalFrame`.

    Emits ``router.frame.extracted`` on success or
    ``router.frame.failed`` on schema rejection, scoped to
    ``run_id`` when supplied. Raises :class:`FrameExtractionError`
    on failure — the compiler can't recover from that.
    """
    result: Any = await asyncio.to_thread(self.extractor.invoke, user_input)
    parsed = result.parsed
    if not isinstance(parsed, GoalFrame):
        err_msg = (
            "Extractor did not produce a GoalFrame. "
            f"parse_error={result.parse_error!r}, message={result.message!r}"
        )
        if run_id:
            await emit_frame_failed(run_id, err_msg)
        raise FrameExtractionError(err_msg)
    if run_id:
        await emit_frame_extracted(run_id, parsed)
    if self._on_frame is not None:
        self._on_frame(parsed)
    return parsed

dispatch async

dispatch(user_input: str, run_id: str | None = None) -> RunnableResult

Extract a frame, compile a runnable, execute it.

run_id scopes every emitted :class:StreamEvent to one cognitive dispatch. Defaults to a fresh uuid4 if omitted — callers that want their own correlation id (a request id, a Slack thread, etc.) should pass one in. The id is also attached to the :class:RunnableResult raw payload so downstream callers can correlate without re-parsing the bus.

Source code in src/locus/router/runtime.py
async def dispatch(self, user_input: str, run_id: str | None = None) -> RunnableResult:
    """Extract a frame, compile a runnable, execute it.

    ``run_id`` scopes every emitted :class:`StreamEvent` to one
    cognitive dispatch. Defaults to a fresh ``uuid4`` if omitted —
    callers that want their own correlation id (a request id, a
    Slack thread, etc.) should pass one in. The id is also
    attached to the :class:`RunnableResult` raw payload so
    downstream callers can correlate without re-parsing the bus.
    """
    rid = run_id or str(uuid4())
    # Pin the run_id on the current asyncio context so any
    # nested instrumentation (orchestrators, specialists,
    # pipelines, skills) auto-tags its events without us
    # threading the id through every signature.
    token = set_run_id(rid)
    try:
        frame = await self.extract(user_input, run_id=rid)
        runnable = await self.compiler.compile(frame, run_id=rid)
        protocol_id = getattr(runnable, "protocol_id", "?")
        inner = getattr(runnable, "inner", None)
        if protocol_id == "?" and inner is not None:
            protocol_id = getattr(inner, "protocol_id", "?")
        await emit_runnable_executing(rid, protocol_id)
        try:
            result = await runnable.execute(user_input)
        except Exception as exc:  # noqa: BLE001 — surface every failure on bus
            await emit_runnable_failed(rid, protocol_id, f"{type(exc).__name__}: {exc}")
            raise
        await emit_runnable_executed(rid, protocol_id, len(result.text or ""))
        return result
    finally:
        # Drain history and signal end-of-stream to subscribers so
        # the SSE consumer sees a clean termination, not a hung
        # connection. Then unbind the contextvar.
        await get_event_bus().close_stream(rid)
        reset_run_id(token)

RunnableResult

Bases: BaseModel

Normalized result envelope for any compiled router execution.

Goal Frame

GoalFrame

Bases: BaseModel

The single typed object the LLM produces.

Everything downstream of GoalFrame — protocol selection, capability binding, policy verdict, builder dispatch — is deterministic. The LLM never authors graph topology, only this schema.

TaskType

Bases: StrEnum

The kind of cognitive work the user is asking for.

Every protocol declares which TaskType values it can handle, so the registry can filter candidates by frame.primary_goal alone.

Risk

Bases: _OrderedStrEnum

Complexity

Bases: _OrderedStrEnum

Protocol registry

Protocol

Bases: BaseModel

Declarative description of an orchestration shape.

ProtocolRegistry

ProtocolRegistry()

Deterministic mapping from :class:GoalFrame to :class:Protocol.

Source code in src/locus/router/protocol.py
def __init__(self) -> None:
    self._protocols: dict[str, Protocol] = {}

filter_candidates

filter_candidates(frame: GoalFrame, *, available_capabilities: set[str] | None = None) -> list[Protocol]

Return every protocol that passes the three gates.

The shared filter for both select() (rule-based ranker) and any opt-in selector that wants to choose among already-qualified candidates. Gates:

  • frame.primary_goal in p.handles
  • p.risk_max >= frame.risk
  • set(p.requires_capabilities).issubset(available_capabilities)

Returns the survivors in registration order. Empty list is a valid result — callers raise :class:NoMatchingProtocolError when appropriate (this method is filter-only by design).

Source code in src/locus/router/protocol.py
def filter_candidates(
    self,
    frame: GoalFrame,
    *,
    available_capabilities: set[str] | None = None,
) -> list[Protocol]:
    """Return every protocol that passes the three gates.

    The shared filter for both ``select()`` (rule-based ranker) and
    any opt-in selector that wants to choose among already-qualified
    candidates. Gates:

    - ``frame.primary_goal in p.handles``
    - ``p.risk_max >= frame.risk``
    - ``set(p.requires_capabilities).issubset(available_capabilities)``

    Returns the survivors in registration order. Empty list is a
    valid result — callers raise :class:`NoMatchingProtocolError`
    when appropriate (this method is filter-only by design).
    """
    caps_available = available_capabilities or set()
    return [
        p
        for p in self._protocols.values()
        if frame.primary_goal in p.handles
        and p.risk_max >= frame.risk
        and set(p.requires_capabilities).issubset(caps_available)
    ]

select

select(frame: GoalFrame, *, available_capabilities: set[str] | None = None) -> Protocol

Pick the best protocol for frame.

Filters by handles ∋ primary_goalrisk_max ≥ frame.riskrequired_capabilities ⊆ available_capabilities, then ranks candidates by complexity-fit + cost.

Source code in src/locus/router/protocol.py
def select(
    self,
    frame: GoalFrame,
    *,
    available_capabilities: set[str] | None = None,
) -> Protocol:
    """Pick the best protocol for ``frame``.

    Filters by ``handles ∋ primary_goal`` ∧ ``risk_max ≥ frame.risk``
    ∧ ``required_capabilities ⊆ available_capabilities``, then ranks
    candidates by complexity-fit + cost.
    """
    if not self._protocols:
        raise NoMatchingProtocolError("No protocols registered.")

    caps_available = available_capabilities or set()
    candidates = self.filter_candidates(frame, available_capabilities=caps_available)
    if not candidates:
        raise NoMatchingProtocolError(
            f"No protocol handles primary_goal={frame.primary_goal!r} "
            f"at risk={frame.risk!r} with capabilities={sorted(caps_available)}.",
        )

    return min(candidates, key=lambda p: _rank_key(p, frame))

builtin_protocols

builtin_protocols() -> list[Protocol]

The full eight-protocol catalogue. Compose your own :class:ProtocolRegistry from these (or a subset).

Source code in src/locus/router/protocol.py
def builtin_protocols() -> list[Protocol]:
    """The full eight-protocol catalogue. Compose your own
    :class:`ProtocolRegistry` from these (or a subset)."""
    return [
        Protocol(
            id="direct_response",
            description="Single-agent answer for direct questions and explanations.",
            handles=[TaskType.ANSWER, TaskType.EXPLAIN, TaskType.RESEARCH],
            primary_for=[TaskType.ANSWER, TaskType.EXPLAIN],
            risk_max=Risk.LOW,
            cost="low",
            latency="low",
            builder=_build_direct_response,
        ),
        Protocol(
            id="plan_execute_validate",
            description="Three-stage pipeline for tasks needing structure and validation.",
            handles=[
                TaskType.PLAN,
                TaskType.BUILD,
                TaskType.MODIFY,
                TaskType.GENERATE_CODE,
                TaskType.REMEDIATE,
            ],
            primary_for=[TaskType.PLAN, TaskType.BUILD, TaskType.MODIFY],
            risk_max=Risk.MEDIUM,
            cost="medium",
            latency="medium",
            builder=_build_plan_execute_validate,
        ),
        Protocol(
            id="specialist_fanout",
            description="Fan out to multiple specialists and correlate findings.",
            handles=[
                TaskType.DIAGNOSE,
                TaskType.COMPARE,
                TaskType.MONITOR,
                TaskType.COORDINATE,
                TaskType.RESEARCH,
            ],
            primary_for=[TaskType.DIAGNOSE, TaskType.MONITOR, TaskType.RESEARCH],
            risk_max=Risk.MEDIUM,
            cost="high",
            latency="high",
            builder=_build_specialist_fanout,
        ),
        Protocol(
            id="debate",
            description="Two debaters argue opposing positions; a judge picks the winner.",
            handles=[TaskType.COMPARE, TaskType.RESEARCH],
            primary_for=[TaskType.COMPARE],
            risk_max=Risk.LOW,
            cost="high",
            latency="high",
            builder=_build_debate,
        ),
        Protocol(
            id="codegen_test_validate",
            description=(
                "Loop a tool-using agent that must declare PASS/FAIL on every "
                "iteration; stops when output starts with PASS."
            ),
            handles=[TaskType.GENERATE_CODE, TaskType.BUILD],
            primary_for=[TaskType.GENERATE_CODE],
            risk_max=Risk.MEDIUM,
            cost="medium",
            latency="high",
            builder=_build_codegen_test_validate,
        ),
        Protocol(
            id="approval_gated_execution",
            description=(
                "Single-agent execution behind a forced approval interrupt — "
                "for risky REMEDIATE / MODIFY / ESCALATE actions."
            ),
            handles=[
                TaskType.REMEDIATE,
                TaskType.MODIFY,
                TaskType.ESCALATE,
            ],
            primary_for=[TaskType.ESCALATE, TaskType.REMEDIATE],
            risk_max=Risk.HIGH,
            cost="medium",
            latency="medium",
            builder=_build_approval_gated_execution,
        ),
        Protocol(
            id="a2a_delegate",
            description=(
                "Forward the task to a remote agent via the A2A protocol. "
                "Requires BuilderContext.a2a_endpoint to be set at compile time."
            ),
            handles=[TaskType.COORDINATE, TaskType.ESCALATE],
            primary_for=[],  # opt-in only
            risk_max=Risk.MEDIUM,
            cost="medium",
            latency="high",
            builder=_build_a2a_delegate,
        ),
        Protocol(
            id="handoff_chain",
            description=(
                "Sequential chain of one-tool agents — each link adds one fact "
                "and hands off to the next."
            ),
            handles=[TaskType.PLAN, TaskType.RESEARCH, TaskType.COORDINATE],
            primary_for=[TaskType.COORDINATE],
            risk_max=Risk.MEDIUM,
            cost="medium",
            latency="high",
            builder=_build_handoff_chain,
        ),
    ]

Policy gate

PolicyGate

PolicyGate(*, max_risk: Risk = Risk.HIGH, require_approval_above: Risk = Risk.MEDIUM)

Pre-flight risk check for a (frame, protocol) pair.

Two thresholds:

  • max_risk — anything strictly above is denied.
  • require_approval_above — anything strictly above (and within max_risk) is allowed but flagged for an approval interrupt.

Defaults: max_risk=Risk.HIGH (nothing denied by default) and require_approval_above=Risk.MEDIUM (HIGH-risk frames need explicit approval).

Source code in src/locus/router/policy.py
def __init__(
    self,
    *,
    max_risk: Risk = Risk.HIGH,
    require_approval_above: Risk = Risk.MEDIUM,
) -> None:
    self._max_risk = max_risk
    self._approval_above = require_approval_above

PolicyVerdict

Bases: BaseModel

One of three outcomes from :meth:PolicyGate.check.

Compiler

CognitiveCompiler

CognitiveCompiler(*, protocols: ProtocolRegistry, capabilities: CapabilityIndex, policy: PolicyGate, model: Any, skills: SkillIndex | None = None, a2a_endpoint: str | None = None, on_approval: ApprovalCallback | None = None, protocol_picker: LLMProtocolPicker | None = None)

Glue between protocols, capabilities, policy, and the model.

Parameters

protocols: :class:ProtocolRegistry populated with built-in or custom protocols. capabilities: :class:CapabilityIndex over the surrounding ToolRegistry. The index resolves capability ids to real tools at compile time. policy: :class:PolicyGate that runs between selection and build. model: A locus model instance (or model string) injected into every builder. Builders pass it to :class:~locus.Agent / specialist constructors. skills: Optional :class:SkillIndex. When provided, every emitted :class:Agent is configured with a :class:~locus.skills.SkillsPlugin containing the skills tagged for frame.domain (plus any globally-tagged skills). The agent loop's L1 / L2 / L3 progressive disclosure surfaces them at runtime. on_approval: Optional async callback fired when the verdict requires approval. Defaults to denying — wire your workbench / CLI approval flow here. protocol_picker: Optional :class:LLMProtocolPicker. When present, the compiler delegates the last-mile protocol pick to the model whenever the filter leaves more than one candidate. When absent (the default), selection is the deterministic :func:_rank_key-based ranker. The picker is the only part of the routing pipeline that uses the model; everything else — filtering, policy gating, capability binding, builder dispatch — remains rule-based.

Source code in src/locus/router/compiler.py
def __init__(
    self,
    *,
    protocols: ProtocolRegistry,
    capabilities: CapabilityIndex,
    policy: PolicyGate,
    model: Any,
    skills: SkillIndex | None = None,
    a2a_endpoint: str | None = None,
    on_approval: ApprovalCallback | None = None,
    protocol_picker: LLMProtocolPicker | None = None,
) -> None:
    self.protocols = protocols
    self.capabilities = capabilities
    self.policy = policy
    self.model = model
    self.skills = skills
    self.a2a_endpoint = a2a_endpoint
    self._on_approval: ApprovalCallback = on_approval or _default_deny
    self.protocol_picker = protocol_picker

compile async

compile(frame: GoalFrame, run_id: str | None = None) -> Runnable

Pick a protocol, run the gate, build the runnable.

run_id (when provided) scopes every emitted :class:StreamEvent so the workbench's SSE consumer can correlate selection / verdict / compile events with one cognitive dispatch.

Source code in src/locus/router/compiler.py
async def compile(self, frame: GoalFrame, run_id: str | None = None) -> Runnable:
    """Pick a protocol, run the gate, build the runnable.

    ``run_id`` (when provided) scopes every emitted
    :class:`StreamEvent` so the workbench's SSE consumer can
    correlate selection / verdict / compile events with one
    cognitive dispatch.
    """
    available = {c.id for c in self.capabilities.all()}
    candidates = self.protocols.filter_candidates(frame, available_capabilities=available)
    if not candidates:
        err = NoMatchingProtocolError(
            f"No protocol handles primary_goal={frame.primary_goal!r} "
            f"at risk={frame.risk!r} with capabilities={sorted(available)}.",
        )
        if run_id:
            await emit_protocol_no_match(run_id, frame, str(err))
        raise err

    protocol, method, rationale = await self._pick_protocol(frame, candidates, run_id=run_id)

    if run_id:
        await emit_protocol_selected(
            run_id, frame, protocol, method=method, rationale=rationale
        )

    verdict = self.policy.check(frame, protocol)
    if run_id:
        await emit_policy_verdict(run_id, frame, protocol, verdict)
    if not verdict.allow:
        raise PolicyDeniedError(verdict.reason)

    # Intersect with the actual registry — the LLM extractor sometimes
    # hallucinates capability ids that don't exist. Strict lookup
    # would crash the whole dispatch on a single bad id; lenient
    # filtering is the right call at this boundary because the
    # protocol registry's selection step has *already* checked that
    # every protocol-required capability is available, so the only
    # things being dropped here are extractor-suggested extras.
    requested = list(frame.required_capabilities)
    valid = [cid for cid in requested if cid in available]
    caps = self.capabilities.lookup(valid) if valid else []
    runnable = protocol.builder(frame, caps, self._build_context())

    if verdict.require_approval:
        runnable = _ApprovalRunnable(
            inner=runnable,
            frame=frame,
            verdict=verdict,
            callback=self._on_approval,
        )

    if run_id:
        await emit_runnable_compiled(
            run_id,
            protocol_id=protocol.id,
            runnable_type=type(runnable).__name__,
            capability_count=len(caps),
        )
    return runnable

Capabilities

Capability

Bases: BaseModel

A tool (or human action) tagged with router-relevant metadata.

is_human property

is_human: bool

True if this capability is the human-approval sentinel.

CapabilityIndex

CapabilityIndex(tools: ToolRegistry)

View over a :class:ToolRegistry that adds router metadata.

Not a replacement for ToolRegistry — the underlying Tool instances live there. CapabilityIndex only holds the metadata overlay (domain, risk) and resolves capabilities back to tools on demand.

Source code in src/locus/router/capability.py
def __init__(self, tools: ToolRegistry) -> None:
    self._tools = tools
    self._caps: dict[str, Capability] = {}

annotate

annotate(cap_id: str, *, tool_name: str, description: str, domain: str, risk: Risk = Risk.LOW) -> Capability

Register router metadata for an existing tool (or a human step).

tool_name must already exist in the underlying :class:ToolRegistry, except for the $human sentinel which is treated as a non-tool capability.

Source code in src/locus/router/capability.py
def annotate(
    self,
    cap_id: str,
    *,
    tool_name: str,
    description: str,
    domain: str,
    risk: Risk = Risk.LOW,
) -> Capability:
    """Register router metadata for an existing tool (or a human step).

    ``tool_name`` must already exist in the underlying
    :class:`ToolRegistry`, except for the ``$human`` sentinel which is
    treated as a non-tool capability.
    """
    if tool_name != HUMAN_SENTINEL and tool_name not in self._tools:
        available = sorted(self._tools.list_tools())
        raise KeyError(
            f"Capability {cap_id!r} references unknown tool {tool_name!r}. "
            f"Register the tool first. Available: {available}",
        )
    if cap_id in self._caps:
        raise ValueError(f"Capability already registered: {cap_id!r}")
    cap = Capability(
        id=cap_id,
        description=description,
        domain=domain,
        risk=risk,
        tool_name=tool_name,
    )
    self._caps[cap_id] = cap
    return cap

lookup

lookup(ids: list[str]) -> list[Capability]

Resolve a list of capability ids; raises if any are missing.

Source code in src/locus/router/capability.py
def lookup(self, ids: list[str]) -> list[Capability]:
    """Resolve a list of capability ids; raises if any are missing."""
    missing = [i for i in ids if i not in self._caps]
    if missing:
        available = sorted(self._caps.keys())
        raise KeyError(
            f"Unknown capability ids: {missing}. Available: {available}",
        )
    return [self._caps[i] for i in ids]

for_domain

for_domain(domain: str) -> list[Capability]

All capabilities tagged with domain.

Source code in src/locus/router/capability.py
def for_domain(self, domain: str) -> list[Capability]:
    """All capabilities tagged with ``domain``."""
    return [c for c in self._caps.values() if c.domain == domain]

all

all() -> list[Capability]

All registered capabilities, in registration order.

Source code in src/locus/router/capability.py
def all(self) -> list[Capability]:
    """All registered capabilities, in registration order."""
    return list(self._caps.values())

resolve_tool

resolve_tool(cap: Capability) -> Tool

Return the underlying :class:Tool. Raises for human capabilities.

Source code in src/locus/router/capability.py
def resolve_tool(self, cap: Capability) -> Tool:
    """Return the underlying :class:`Tool`. Raises for human capabilities."""
    if cap.is_human:
        raise ValueError(
            f"Capability {cap.id!r} is a human-approval step, not a Tool.",
        )
    return self._tools.get_or_raise(cap.tool_name)

SkillIndex

SkillIndex()

A domain-tagged view over a list of :class:Skill instances.

Source code in src/locus/router/skill_index.py
def __init__(self) -> None:
    self._skills: dict[str, Skill] = {}
    self._domains: dict[str, str] = {}  # skill_name -> domain (empty = global)

register

register(skill: Skill, *, domain: str = '') -> None

Add a skill to the index with an optional domain tag.

domain="" (the default) means "applies to every domain"; :meth:for_domain always returns these alongside domain-specific matches.

Source code in src/locus/router/skill_index.py
def register(self, skill: Skill, *, domain: str = "") -> None:
    """Add a skill to the index with an optional domain tag.

    ``domain=""`` (the default) means "applies to every domain";
    :meth:`for_domain` always returns these alongside domain-specific
    matches.
    """
    if skill.name in self._skills:
        raise ValueError(f"Skill already registered: {skill.name!r}")
    self._skills[skill.name] = skill
    self._domains[skill.name] = domain

register_many

register_many(skills: list[Skill], *, domain: str = '') -> None

Register multiple skills under the same domain.

Source code in src/locus/router/skill_index.py
def register_many(self, skills: list[Skill], *, domain: str = "") -> None:
    """Register multiple skills under the same domain."""
    for skill in skills:
        self.register(skill, domain=domain)

get

get(name: str) -> Skill

Return a skill by name. Raises :class:KeyError if missing.

Source code in src/locus/router/skill_index.py
def get(self, name: str) -> Skill:
    """Return a skill by name. Raises :class:`KeyError` if missing."""
    if name not in self._skills:
        available = sorted(self._skills.keys())
        raise KeyError(
            f"Unknown skill {name!r}. Available: {available}",
        )
    return self._skills[name]

for_domain

for_domain(domain: str) -> list[Skill]

Skills tagged with domain, plus every globally-tagged skill.

The router calls this with :attr:GoalFrame.domain to pick the catalogue handed to each compiled :class:Agent. Globally-tagged skills (registered with domain="") are always included so a common catalogue (e.g. "communication tone", "safety checks") is available everywhere.

Source code in src/locus/router/skill_index.py
def for_domain(self, domain: str) -> list[Skill]:
    """Skills tagged with ``domain``, plus every globally-tagged skill.

    The router calls this with :attr:`GoalFrame.domain` to pick the
    catalogue handed to each compiled :class:`Agent`. Globally-tagged
    skills (registered with ``domain=""``) are always included so a
    common catalogue (e.g. "communication tone", "safety checks") is
    available everywhere.
    """
    return [self._skills[name] for name, d in self._domains.items() if d in (domain, "")]

all

all() -> list[Skill]

Every registered skill, in registration order.

Source code in src/locus/router/skill_index.py
def all(self) -> list[Skill]:
    """Every registered skill, in registration order."""
    return list(self._skills.values())