Skip to content

Multi-agent workflows built for production.

Describe the task. locus selects the protocol and coordinates the agents.

  • From idea to production agent in minutes, not weeks. Describe the task; locus selects the coordination pattern and assembles the network from eight production-tested protocols.
  • Self-critiquing agents with grounded outputs. Every turn is scored; every factual claim is verified against the source that produced it. Hallucinations caught at the source, not in production.
  • Full causal traceability. Every decision, tool call, and reasoning step is a typed event you can replay, audit, and debug — at 2 a.m. or in your compliance report.

Launch workbench GitHub

pip install "locus-sdk[oci]"   # OCI GenAI · OpenAI · Anthropic · Ollama

Built inside Oracle · Used in production · Open source

from locus import Agent, tool
from locus.observability import run_context, get_event_bus

@tool(idempotent=True)
def get_metric(name: str) -> float:
    """Return the current value of a named SRE metric."""
    return monitoring.read(name)

@tool(idempotent=True)
def page_oncall(reason: str) -> str:
    """Page the on-call engineer. Fires exactly once per reason."""
    return pager.send(reason)

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[get_metric, page_oncall],
    reflexion=True,   # self-evaluates every turn
)

async with run_context() as rid:
    result = agent.run_sync(
        "p99 latency spiked — investigate and page if critical."
    )

    async for ev in get_event_bus().subscribe(rid):
        match ev.event_type:
            case "agent.think":
                print("💭", ev.data["reasoning_preview"])
            case "agent.tool.started":
                print("🔧", ev.data["tool_name"])
            case "agent.tokens.used":
                print("🪙", ev.data["total_tokens"], "tokens")
            case "agent.terminate":
                print("✓", ev.data["final_message_preview"])

Six things you can ship

  • Multi-agent coordination


    Seven native patterns plus A2A: Sequential, Parallel, Loop, Orchestrator, Swarm, Handoff, StateGraph, Functional API, DeepAgent, cross-process A2A. Every pattern shares the same Agent class and event stream.

  • Cognitive router


    locus.router is a meta-orchestration layer on top of locus's existing primitives. It decomposes a natural-language request into a typed GoalFrame, picks a Protocol from a typed registry, and compiles it onto a real Agent / SequentialPipeline / Orchestrator from the standard locus toolkit.

  • Grounded reasoning


    Reflexion, Grounding, and Causal are first-class Think → Execute → Reflect nodes. GSAR adds a four-way claim partition — grounded, ungrounded, contradicted, complementary — with tiered replanning.

  • In-process observability


    Opt-in EventBus with agent yield bridge. One run_context() streams 60+ canonical events from every layer — agent, multi-agent, router, RAG, memory, A2A. Zero allocations when unused.

  • Idempotent tools


    @tool(idempotent=True) deduplicates on (name, args) inside the Execute node. No double-charge, double-book, or double-page — even on model retry or checkpoint resume.

  • Termination algebra


    MaxIterations(10) | TextMention("DONE") & ConfidenceMet(0.9) is real Python — __or__ / __and__ overloads on typed classes. Greppable, unit-testable, serialisable.

Let locus pick the right coordination strategy

Describe your goal in plain language. locus classifies the task, selects the best coordination pattern, and assembles the right agents for you. You don't choose between parallel vs sequential; locus does.

from locus.router import (
    Router, CognitiveCompiler, ProtocolRegistry,
    builtin_protocols, CapabilityIndex, PolicyGate, GoalFrame,
)

# Register the 8 built-in protocols.
registry = ProtocolRegistry()
registry.register_many(builtin_protocols())

compiler = CognitiveCompiler(
    protocols=registry,
    capabilities=caps,          # annotated ToolRegistry view
    policy=PolicyGate(          # risk thresholds
        max_risk=Risk.HIGH,
        require_approval_above=Risk.MEDIUM,
    ),
    model=model,
)
router = Router(
    extractor=Agent(model=model, output_schema=GoalFrame),
    compiler=compiler,
)

result = await router.dispatch("Diagnose the checkout API slowdown.")
print(result.protocol_id)   # "specialist_fanout"
print(result.text)          # findings from 3 parallel probes

Eight built-in protocols, each mapping to a different runtime shape:

Protocol Compiled shape Canonical for
direct_response Single Agent ANSWER, EXPLAIN
plan_execute_validate SequentialPipeline (planner → executor → validator) PLAN, BUILD, MODIFY
specialist_fanout ParallelPipeline of N tool-bound Agents DIAGNOSE, MONITOR
debate Two debaters + judge Agent COMPARE
codegen_test_validate LoopAgent (stops on PASS) GENERATE_CODE
approval_gated_execution Agent wrapped in approval interrupt ESCALATE, REMEDIATE
a2a_delegate A2AClient.invoke (opt-in only)
handoff_chain SequentialPipeline of one-tool Agents COORDINATE

Full reference Tutorial

Agents that verify their own answers

Agents can hallucinate. locus catches it. After every response, it scores the reasoning and checks each claim against the actual tool output. Claims that don't hold up get dropped or sent back for re-research before the user ever sees them.

from locus import Agent
from locus.tools.decorator import tool

@tool
def search_web(query: str) -> str:
    """Search the web for facts."""
    return search_api.query(query)

@tool
def read_url(url: str) -> str:
    """Fetch and clean text from a URL."""
    return http.fetch_text(url)

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[search_web, read_url],
    reflexion=True,    # self-evaluate every turn
    grounding=True,    # verify claims against tool results
)

result = agent.run_sync("Summarise the Q3 earnings call. Cite every number.")
print(result.text)
print(f"grounding score: {result.grounding_score:.2f}")
# → grounding score: 0.94 — three claims grounded, one dropped (revenue mix)

Full reference Tutorial

No double-booking. No duplicate emails

AI models sometimes retry the same action — after a glitch, an ambiguous result, or a restart. Add one decorator and locus guarantees the action only happens once, no matter how many times the model tries. Book a flight, submit an invoice, page an engineer — safely.

from locus import Agent
from locus.tools.decorator import tool

@tool(idempotent=True)
def submit_po(vendor_id: str, line_items: list[dict]) -> dict:
    """Submit the PO. Re-fires within the run return the cached receipt."""
    return procurement.submit(vendor_id, line_items)

@tool(idempotent=True)
def email_cfo(po_id: str, body: str) -> str:
    """Send the CFO note. Same arguments → same delivery."""
    return mail.send(to="cfo@org.com", subject=f"PO {po_id}", body=body)

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[search_vendors, submit_po, email_cfo],
    system_prompt="Approve a vendor; submit the PO; email the CFO.",
)

result = agent.run_sync("Approve Acme for the $42k laptop refresh.")
# → PO-2847 submitted. CFO emailed once. Three model retries deduped on
#   the (name, kwargs) hash inside the ReAct loop's Execute node.

Full reference Tutorial

One conversation, many specialists

Pass a task from one specialist agent to another — triage to billing, billing to shipping — while the user sees a single, seamless reply. Each specialist is a separate agent, deployed independently by its own team.

from locus import create_handoff_agent, create_handoff_manager, HandoffReason

triage = create_handoff_agent(
    name="Triage",
    description="Routes incoming customer issues",
    system_prompt="Decide: Billing or Shipping. Then hand off.",
)
billing = create_handoff_agent(
    name="Billing",
    description="Resolves invoices, refunds, charges",
    system_prompt="Resolve the billing issue end-to-end.",
)
shipping = create_handoff_agent(
    name="Shipping",
    description="Tracks orders, reroutes shipments",
    system_prompt="Resolve the shipping issue end-to-end.",
)
triage.can_delegate_to = [billing.id, shipping.id]

desk = create_handoff_manager(
    agents=[triage, billing, shipping],
    max_chain=5,
)
# → [Triage → Billing] "Refunded $129. Confirmation RF-19340."

Full reference Tutorial

See everything your agents are doing

Watch every step of every agent run in real time — what it thought, which tools it called, how many tokens it used. Wrap your code in one context manager to turn it on; remove it and there's zero performance cost. Connect to any monitoring system with a single OpenTelemetry hook.

from locus import Agent, tool
from locus.observability import run_context, get_event_bus

@tool
def get_metric(name: str) -> float:
    """Return the current value of a named metric."""
    return monitoring.read(name)

agent = Agent(model="oci:openai.gpt-5.5", tools=[get_metric])

async with run_context() as rid:
    result = agent.run_sync("What is the p99 latency right now?")

    async for ev in get_event_bus().subscribe(rid):
        match ev.event_type:
            case "agent.think":
                print("💭", ev.data["reasoning_preview"])
            case "agent.tool.started":
                print("🔧", ev.data["tool_name"], ev.data["span_id"])
            case "agent.tool.completed":
                print("   ↳", ev.data["output_preview"])
            case "agent.tokens.used":
                print("🪙", ev.data["total_tokens"], "tokens")
            case "agent.terminate":
                print("✓", ev.data["final_message_preview"])

Ten event prefixes, 60+ canonical types. subscribe(run_id) replays history then goes live. subscribe_global() watches all concurrent runs. Slow consumers get dropped events, never stall the publisher.

Full reference Event catalogue Tutorial

Tell agents exactly when to stop

Define precisely when your agent should finish — when a specific tool ran, when it's confident enough, or after a safety cap. Combine rules with & and |. The conditions are typed objects you can unit-test, inspect, and version-control like any other code.

from locus import Agent
from locus.core.termination import (
    MaxIterations, ToolCalled, ConfidenceMet, TextMention,
)

termination = (
    (ToolCalled("submit_po") & ConfidenceMet(0.9))   # work done + confident
    | TextMention(r"\bDONE\b")                         # …or model says DONE
    | MaxIterations(15)                                # …or safety cap
)

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[search_vendors, submit_po],
    termination=termination,
)

result = agent.run_sync("Approve and submit the laptop PO.")
print(result.stop_reason)
# → ToolCalled('submit_po') and ConfidenceMet(0.92)

Full reference Tutorial

Deploy in two lines of code

Turn any locus agent into a production API in two lines. You get streaming responses, per-user conversation history that no other user can access, and persistent threads across sessions — ready to run on any platform that supports Python.

import os
from locus import Agent
from locus.memory.backends import oci_bucket_checkpointer
from locus.server import AgentServer

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[lookup_invoice, refund],
    checkpointer=oci_bucket_checkpointer(
        bucket_name="support-threads",
        namespace="<your-tenancy>",
    ),
)

server = AgentServer(
    agent=agent,
    api_key=os.environ["LOCUS_SERVER_API_KEY"],
)
server.run(host="0.0.0.0", port=8080)

# $ curl -X POST http://localhost:8080/invoke \
#       -H "Authorization: Bearer $LOCUS_SERVER_API_KEY" \
#       -d '{"prompt":"Refund order ORD-42","thread_id":"user-c42"}'
# → {"message": "Refunded $129. Confirmation RF-19340.", "thread_id": "user-c42"}

Full reference Tutorial

How every agent runs

Every locus agent runs the same four steps in a loop: decide what to do, do it, check the result, then decide whether to stop. This loop is identical whether you're running a single agent or a fleet of specialists working together.

locus agent loop — Think → Execute → Reflect → Terminate, with idempotent dedupe at Execute, Reflexion and Causal at Reflect, and composable termination algebra at Terminate

Node What it does
Think Model decides next action or final answer. Streams reasoning + tokens.
Execute Runs tool calls in parallel. @tool(idempotent=True) dedupes on (name, args) — safe on retry or restart.
Reflect Self-evaluates turn quality. Grounding scores claims vs tool results. Causal traces root cause from the evidence trail.
Terminate? Typed stop conditions composed with \| and &. Inspectable, unit-testable, serialisable.
from locus.core.termination import MaxIterations, ToolCalled, ConfidenceMet

terminate = (
    ToolCalled("submit_po") & ConfidenceMet(0.9)
) | MaxIterations(10)

Every node emits a typed, write-protected event. The same stream powers SSE in AgentServer, the OpenTelemetry telemetry hook, the structured logging hook, and your async for event in agent.run(...) consumer.

Read the full architecture reference →

Eight ways to coordinate agents

From simple parallel research to multi-team handoffs across services. Pick one pattern or combine them — they all use the same Agent class and stream into the same event feed.

  • Cognitive router


    NL → typed GoalFrame → typed protocol selection → compiled Agent / Pipeline / Orchestrator. The LLM fills a schema; the registry picks the shape.

    Cognitive router →

  • Orchestrator + Specialists


    One coordinator decides which expert handles each sub-task. Specialists run in parallel.

    Orchestrator →

  • Composition


    Linear chain · fan-out + merge. The simplest shape — describe the flow as a function.

    Composition →

  • Swarm


    Peer-to-peer task pool with SharedContext. Nobody is in charge. For open-ended research.

    Swarm →

  • Handoff


    Escalation desk. The conversation moves with full history; the previous owner is out of the loop.

    Handoff →

  • StateGraph


    Explicit nodes and edges. Cycles, conditional routing, subgraphs, per-node retry/cache.

    StateGraph →

  • Functional API


    @task and @entrypoint decorators with Send / SendBatch for map/reduce. Pythonic.

    Functional →

  • A2A protocol


    Cross-process / cross-runtime. Agents advertise capability via AgentCard; discovered over HTTP.

    A2A →

  • Agents as tools


    Wrap any agent as a tool another agent can call. Recursive, no special API.

    Composition →

What you get

Orchestration

🧭 Multi-agent reasoning orchestrator Picks one of eight protocols (direct_response, plan_execute_validate, specialist_fanout, debate, codegen_test_validate, approval_gated_execution, a2a_delegate, handoff_chain) and instantiates the matching locus primitive. The LLM fills a schema; routing is rule-based and auditable.
🤝 Multi-agent patterns Seven native patterns — Composition, Orchestrator, Swarm, Handoff, StateGraph, Functional, DeepAgent — plus cross-process A2A. Use them directly when you know what you need.
📡 Observability Opt-in EventBus — one run_context() streams 60+ canonical events, no external broker. TelemetryHook exports OpenTelemetry traces + metrics to Grafana, Honeycomb, OCI APM. Zero overhead when unused.

Agent primitives

🧠 Reasoning Reflexion + Grounding as first-class loop nodes. CausalChain for root-cause chains. GSAR (arXiv:2604.23366): four-way claim partition + tiered replanning.
🛂 Termination algebra MaxIterations(10) \| TextMention("DONE") & ConfidenceMet(0.9) — real Python __or__/__and__ overloads. Greppable, unit-testable, serialisable.
🛡 Idempotent tools @tool(idempotent=True) dedupes on (name, args) inside Execute. No double-charge on model retry or checkpoint resume.
💾 Durable memory Four native checkpointers + five storage-backed (PostgreSQL, OpenSearch, Redis, SQLite, Oracle 26ai).

Deployment & integration

📡 Streaming + Server Typed events for match consumers · SSE · drop-in FastAPI AgentServer.
🪝 Hooks Logging · Telemetry · ModelRetry · Guardrails · Steering.
🔎 RAG Seven vector stores · OCI Cohere + OpenAI embeddings · multimodal.
🪙 MCP both ways MCPClient consumes external servers. LocusMCPServer exposes locus tools.
🌐 Multi-modal web_search=, web_fetch=, image_generator=, speech_provider= on Agent(...).
📊 Evaluation EvalCase / EvalRunner / EvalReport regression suites.
🧰 Models OCI GenAI (V1 + SDK, 90+ models, OpenAI commercial + xAI Grok) · OpenAI · Anthropic · Ollama. One get_model() call, any provider.

Hello, agent

from locus import Agent
from locus.tools.decorator import tool

@tool(idempotent=True)
def book_flight(flight_id: str, customer_id: str) -> dict:
    """Book a flight. Idempotent — re-fires return the cached receipt."""
    return billing.charge_and_book(flight_id, customer_id)

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[book_flight],
    system_prompt="You are a travel concierge. Book the flight the user asks for.",
)

print(agent.run_sync("Book TK-12 for customer C-42").message)
Booked TK-12 for customer C-42. Confirmation BK-58291.

That's the entire interface. The model picks the tool. The tool charges once. The agent stops.

A three-agent vendor PO approval workflow against a live Oracle 26ai catalogue — Procurement and Compliance debate, hand off to an Approval Officer, the human approves, idempotent writes fire — runs end-to-end in the multi-agent and idempotency tutorials under examples/.

Source

locus is small enough to read end-to-end. No magic, no hidden registries, no import-time side-effects. A few entry points:

What Where
Agent + ReAct loop loop/nodes.pyThinkNode:59, ExecuteNode:136, ReflectNode:260
Cognitive router router/runtime.py:42protocol.pypolicy.pycompiler.py
Multi-agent shapes multiagent/orchestrator.py, swarm.py, handoff.py, graph.py, functional.py
Observability observability/event_bus.py · agent/runtime_loop.py:61 (@_bus_bridge)
Idempotent tools tools/decorator.py:113 · core/termination.py:39
OCI model transport models/providers/oci/openai_compat.py:163

Full source map → Capabilities · API reference

Learn locus in an afternoon

The examples/ tree is 55 progressive tutorials. Every tutorial is one runnable file and adds exactly one idea on top of the previous.

Track 1 — basics (first hour)

# What you learn
01 basic agent Make an Agent, give it a model, run a prompt.
02 agent + tools Decorate a Python function with @tool. The model sees a typed contract.
03 agent memory Conversations across runs — checkpointers, thread_id.
04 streaming Stream typed events as the agent thinks, calls tools, terminates.
05 hooks Lifecycle hooks — log every model call and every tool result.

Track 2 — graphs & state (06–10)

# What you learn
06 basic graph StateGraph — explicit nodes and edges over implicit ReAct.
07 conditional routing Branch on state — add_conditional_edges.
08 state reducers Custom reducers for accumulating fields across nodes.
09 human in the loop Pause the graph for human approval, resume on input.
10 advanced patterns Send, broadcasts, subgraphs — map/reduce on agents.

Track 3 — multi-agent (11, 16–18, 25, 34, 36)

The six native patterns plus A2A: Swarm · Handoff · Orchestrator · Specialists · Composition · A2A · Functional.

Track 4 — reasoning, RAG, skills (13–15, 22–24, 32)

Structured output · Reasoning patterns · Playbooks · RAG basics · RAG providers · RAG agents · Skills.

Track 5 — production (12, 19–21, 26–30, 33, 35, 37–40)

MCP · Guardrails · Checkpoint backends · SSE streaming · Evaluation · Hooks advanced · Agent Server · Model providers · Guardrails advanced · Plugins · Steering · Graph advanced · Termination · Multi-modal providers · GSAR typed grounding · OCI Dedicated AI Cluster (DAC).

Track 6 — cognitive router + observability (41, 51–55)

DeepAgent · Cognitive router · Observability basics · Agent yield bridge · EventBus subscriber patterns · Full event catalogue.

Then deploy

When the agent is ready, ship it. AgentServer is a drop-in FastAPI wrapper; no extra glue.

from locus.server import AgentServer

server = AgentServer(agent=my_agent, cors_origins=["https://app.example.com"])
server.run(host="0.0.0.0", port=8080)

You get out of the box:

  • POST /invoke — synchronous run, full AgentResult JSON.
  • POST /stream — Server-Sent Events of every typed event.
  • GET / DELETE /threads/{id} — conversation persistence (with a checkpointer attached).
  • GET /health — liveness probe.

Deploys anywhere FastAPI runs:

  • OCI Functions — serverless, scale to zero.
  • OKE / Container Instancesdocker build and ship.
  • OCI Computeuvicorn locus.server:run --port 8080.
  • Kubernetes / EKS / Cloud Run — same Dockerfile.

Read the deploy concept →


Built inside Oracle. Used in production. Open to everyone.

locus turns hard agentic work — retries that don't double-charge, state that survives restarts, multi-agent flows that fit the problem, and reasoning that catches its own mistakes — into ordinary Python you can reason about, test, and ship.