Multi-agent workflows built for production.¶
Describe the task. locus selects the protocol and coordinates the agents.
direct answer · pipeline · fan‑out · debate · code + test · approval gate · A2A · handoff
- From idea to production agent in minutes, not weeks. Describe the task; locus selects the coordination pattern and assembles the network from eight production-tested protocols.
- Self-critiquing agents with grounded outputs. Every turn is scored; every factual claim is verified against the source that produced it. Hallucinations caught at the source, not in production.
- Full causal traceability. Every decision, tool call, and reasoning step is a typed event you can replay, audit, and debug — at 2 a.m. or in your compliance report.
Built inside Oracle · Used in production · Open source
from locus import Agent, tool
from locus.observability import run_context, get_event_bus
@tool(idempotent=True)
def get_metric(name: str) -> float:
"""Return the current value of a named SRE metric."""
return monitoring.read(name)
@tool(idempotent=True)
def page_oncall(reason: str) -> str:
"""Page the on-call engineer. Fires exactly once per reason."""
return pager.send(reason)
agent = Agent(
model="oci:openai.gpt-5.5",
tools=[get_metric, page_oncall],
reflexion=True, # self-evaluates every turn
)
async with run_context() as rid:
result = agent.run_sync(
"p99 latency spiked — investigate and page if critical."
)
async for ev in get_event_bus().subscribe(rid):
match ev.event_type:
case "agent.think":
print("💭", ev.data["reasoning_preview"])
case "agent.tool.started":
print("🔧", ev.data["tool_name"])
case "agent.tokens.used":
print("🪙", ev.data["total_tokens"], "tokens")
case "agent.terminate":
print("✓", ev.data["final_message_preview"])
Six things you can ship¶
-
Seven native patterns plus A2A: Sequential, Parallel, Loop, Orchestrator, Swarm, Handoff, StateGraph, Functional API, DeepAgent, cross-process A2A. Every pattern shares the same
Agentclass and event stream. -
locus.routeris a meta-orchestration layer on top of locus's existing primitives. It decomposes a natural-language request into a typedGoalFrame, picks aProtocolfrom a typed registry, and compiles it onto a realAgent/SequentialPipeline/Orchestratorfrom the standard locus toolkit. -
Reflexion, Grounding, and Causal are first-class
Think → Execute → Reflectnodes. GSAR adds a four-way claim partition — grounded, ungrounded, contradicted, complementary — with tiered replanning. -
Opt-in
EventBuswith agent yield bridge. Onerun_context()streams 60+ canonical events from every layer — agent, multi-agent, router, RAG, memory, A2A. Zero allocations when unused. -
@tool(idempotent=True)deduplicates on(name, args)inside the Execute node. No double-charge, double-book, or double-page — even on model retry or checkpoint resume. -
MaxIterations(10) | TextMention("DONE") & ConfidenceMet(0.9)is real Python —__or__/__and__overloads on typed classes. Greppable, unit-testable, serialisable.
Let locus pick the right coordination strategy¶
Describe your goal in plain language. locus classifies the task, selects the best coordination pattern, and assembles the right agents for you. You don't choose between parallel vs sequential; locus does.
from locus.router import (
Router, CognitiveCompiler, ProtocolRegistry,
builtin_protocols, CapabilityIndex, PolicyGate, GoalFrame,
)
# Register the 8 built-in protocols.
registry = ProtocolRegistry()
registry.register_many(builtin_protocols())
compiler = CognitiveCompiler(
protocols=registry,
capabilities=caps, # annotated ToolRegistry view
policy=PolicyGate( # risk thresholds
max_risk=Risk.HIGH,
require_approval_above=Risk.MEDIUM,
),
model=model,
)
router = Router(
extractor=Agent(model=model, output_schema=GoalFrame),
compiler=compiler,
)
result = await router.dispatch("Diagnose the checkout API slowdown.")
print(result.protocol_id) # "specialist_fanout"
print(result.text) # findings from 3 parallel probes
Eight built-in protocols, each mapping to a different runtime shape:
| Protocol | Compiled shape | Canonical for |
|---|---|---|
direct_response |
Single Agent |
ANSWER, EXPLAIN |
plan_execute_validate |
SequentialPipeline (planner → executor → validator) |
PLAN, BUILD, MODIFY |
specialist_fanout |
ParallelPipeline of N tool-bound Agents |
DIAGNOSE, MONITOR |
debate |
Two debaters + judge Agent |
COMPARE |
codegen_test_validate |
LoopAgent (stops on PASS) |
GENERATE_CODE |
approval_gated_execution |
Agent wrapped in approval interrupt |
ESCALATE, REMEDIATE |
a2a_delegate |
A2AClient.invoke (opt-in only) |
— |
handoff_chain |
SequentialPipeline of one-tool Agents |
COORDINATE |
Agents that verify their own answers¶
Agents can hallucinate. locus catches it. After every response, it scores the reasoning and checks each claim against the actual tool output. Claims that don't hold up get dropped or sent back for re-research before the user ever sees them.
from locus import Agent
from locus.tools.decorator import tool
@tool
def search_web(query: str) -> str:
"""Search the web for facts."""
return search_api.query(query)
@tool
def read_url(url: str) -> str:
"""Fetch and clean text from a URL."""
return http.fetch_text(url)
agent = Agent(
model="oci:openai.gpt-5.5",
tools=[search_web, read_url],
reflexion=True, # self-evaluate every turn
grounding=True, # verify claims against tool results
)
result = agent.run_sync("Summarise the Q3 earnings call. Cite every number.")
print(result.text)
print(f"grounding score: {result.grounding_score:.2f}")
# → grounding score: 0.94 — three claims grounded, one dropped (revenue mix)
No double-booking. No duplicate emails¶
AI models sometimes retry the same action — after a glitch, an ambiguous result, or a restart. Add one decorator and locus guarantees the action only happens once, no matter how many times the model tries. Book a flight, submit an invoice, page an engineer — safely.
from locus import Agent
from locus.tools.decorator import tool
@tool(idempotent=True)
def submit_po(vendor_id: str, line_items: list[dict]) -> dict:
"""Submit the PO. Re-fires within the run return the cached receipt."""
return procurement.submit(vendor_id, line_items)
@tool(idempotent=True)
def email_cfo(po_id: str, body: str) -> str:
"""Send the CFO note. Same arguments → same delivery."""
return mail.send(to="cfo@org.com", subject=f"PO {po_id}", body=body)
agent = Agent(
model="oci:openai.gpt-5.5",
tools=[search_vendors, submit_po, email_cfo],
system_prompt="Approve a vendor; submit the PO; email the CFO.",
)
result = agent.run_sync("Approve Acme for the $42k laptop refresh.")
# → PO-2847 submitted. CFO emailed once. Three model retries deduped on
# the (name, kwargs) hash inside the ReAct loop's Execute node.
One conversation, many specialists¶
Pass a task from one specialist agent to another — triage to billing, billing to shipping — while the user sees a single, seamless reply. Each specialist is a separate agent, deployed independently by its own team.
from locus import create_handoff_agent, create_handoff_manager, HandoffReason
triage = create_handoff_agent(
name="Triage",
description="Routes incoming customer issues",
system_prompt="Decide: Billing or Shipping. Then hand off.",
)
billing = create_handoff_agent(
name="Billing",
description="Resolves invoices, refunds, charges",
system_prompt="Resolve the billing issue end-to-end.",
)
shipping = create_handoff_agent(
name="Shipping",
description="Tracks orders, reroutes shipments",
system_prompt="Resolve the shipping issue end-to-end.",
)
triage.can_delegate_to = [billing.id, shipping.id]
desk = create_handoff_manager(
agents=[triage, billing, shipping],
max_chain=5,
)
# → [Triage → Billing] "Refunded $129. Confirmation RF-19340."
See everything your agents are doing¶
Watch every step of every agent run in real time — what it thought, which tools it called, how many tokens it used. Wrap your code in one context manager to turn it on; remove it and there's zero performance cost. Connect to any monitoring system with a single OpenTelemetry hook.
from locus import Agent, tool
from locus.observability import run_context, get_event_bus
@tool
def get_metric(name: str) -> float:
"""Return the current value of a named metric."""
return monitoring.read(name)
agent = Agent(model="oci:openai.gpt-5.5", tools=[get_metric])
async with run_context() as rid:
result = agent.run_sync("What is the p99 latency right now?")
async for ev in get_event_bus().subscribe(rid):
match ev.event_type:
case "agent.think":
print("💭", ev.data["reasoning_preview"])
case "agent.tool.started":
print("🔧", ev.data["tool_name"], ev.data["span_id"])
case "agent.tool.completed":
print(" ↳", ev.data["output_preview"])
case "agent.tokens.used":
print("🪙", ev.data["total_tokens"], "tokens")
case "agent.terminate":
print("✓", ev.data["final_message_preview"])
Ten event prefixes, 60+ canonical types.
subscribe(run_id) replays history then goes live.
subscribe_global() watches all concurrent runs.
Slow consumers get dropped events, never stall the publisher.
Full reference Event catalogue Tutorial
Tell agents exactly when to stop¶
Define precisely when your agent should finish — when a specific tool ran, when it's confident enough, or after a safety cap. Combine rules with & and |. The conditions are typed objects you can unit-test, inspect, and version-control like any other code.
from locus import Agent
from locus.core.termination import (
MaxIterations, ToolCalled, ConfidenceMet, TextMention,
)
termination = (
(ToolCalled("submit_po") & ConfidenceMet(0.9)) # work done + confident
| TextMention(r"\bDONE\b") # …or model says DONE
| MaxIterations(15) # …or safety cap
)
agent = Agent(
model="oci:openai.gpt-5.5",
tools=[search_vendors, submit_po],
termination=termination,
)
result = agent.run_sync("Approve and submit the laptop PO.")
print(result.stop_reason)
# → ToolCalled('submit_po') and ConfidenceMet(0.92)
Deploy in two lines of code¶
Turn any locus agent into a production API in two lines. You get streaming responses, per-user conversation history that no other user can access, and persistent threads across sessions — ready to run on any platform that supports Python.
import os
from locus import Agent
from locus.memory.backends import oci_bucket_checkpointer
from locus.server import AgentServer
agent = Agent(
model="oci:openai.gpt-5.5",
tools=[lookup_invoice, refund],
checkpointer=oci_bucket_checkpointer(
bucket_name="support-threads",
namespace="<your-tenancy>",
),
)
server = AgentServer(
agent=agent,
api_key=os.environ["LOCUS_SERVER_API_KEY"],
)
server.run(host="0.0.0.0", port=8080)
# $ curl -X POST http://localhost:8080/invoke \
# -H "Authorization: Bearer $LOCUS_SERVER_API_KEY" \
# -d '{"prompt":"Refund order ORD-42","thread_id":"user-c42"}'
# → {"message": "Refunded $129. Confirmation RF-19340.", "thread_id": "user-c42"}
How every agent runs¶
Every locus agent runs the same four steps in a loop: decide what to do, do it, check the result, then decide whether to stop. This loop is identical whether you're running a single agent or a fleet of specialists working together.
| Node | What it does |
|---|---|
| Think | Model decides next action or final answer. Streams reasoning + tokens. |
| Execute | Runs tool calls in parallel. @tool(idempotent=True) dedupes on (name, args) — safe on retry or restart. |
| Reflect | Self-evaluates turn quality. Grounding scores claims vs tool results. Causal traces root cause from the evidence trail. |
| Terminate? | Typed stop conditions composed with \| and &. Inspectable, unit-testable, serialisable. |
from locus.core.termination import MaxIterations, ToolCalled, ConfidenceMet
terminate = (
ToolCalled("submit_po") & ConfidenceMet(0.9)
) | MaxIterations(10)
Every node emits a typed, write-protected event. The same stream
powers SSE in AgentServer, the OpenTelemetry telemetry hook, the
structured logging hook, and your async for event in agent.run(...)
consumer.
Read the full architecture reference →
Eight ways to coordinate agents¶
From simple parallel research to multi-team handoffs across services. Pick one pattern or combine them — they all use the same Agent class and stream into the same event feed.
-
Cognitive router
NL → typed
GoalFrame→ typed protocol selection → compiledAgent/Pipeline/Orchestrator. The LLM fills a schema; the registry picks the shape. -
Orchestrator + Specialists
One coordinator decides which expert handles each sub-task. Specialists run in parallel.
-
Composition
Linear chain · fan-out + merge. The simplest shape — describe the flow as a function.
-
Swarm
Peer-to-peer task pool with
SharedContext. Nobody is in charge. For open-ended research. -
Handoff
Escalation desk. The conversation moves with full history; the previous owner is out of the loop.
-
StateGraph
Explicit nodes and edges. Cycles, conditional routing, subgraphs, per-node retry/cache.
-
Functional API
@taskand@entrypointdecorators withSend/SendBatchfor map/reduce. Pythonic. -
A2A protocol
Cross-process / cross-runtime. Agents advertise capability via
AgentCard; discovered over HTTP. -
Agents as tools
Wrap any agent as a tool another agent can call. Recursive, no special API.
What you get¶
Orchestration¶
| 🧭 Multi-agent reasoning orchestrator | Picks one of eight protocols (direct_response, plan_execute_validate, specialist_fanout, debate, codegen_test_validate, approval_gated_execution, a2a_delegate, handoff_chain) and instantiates the matching locus primitive. The LLM fills a schema; routing is rule-based and auditable. |
| 🤝 Multi-agent patterns | Seven native patterns — Composition, Orchestrator, Swarm, Handoff, StateGraph, Functional, DeepAgent — plus cross-process A2A. Use them directly when you know what you need. |
| 📡 Observability | Opt-in EventBus — one run_context() streams 60+ canonical events, no external broker. TelemetryHook exports OpenTelemetry traces + metrics to Grafana, Honeycomb, OCI APM. Zero overhead when unused. |
Agent primitives¶
| 🧠 Reasoning | Reflexion + Grounding as first-class loop nodes. CausalChain for root-cause chains. GSAR (arXiv:2604.23366): four-way claim partition + tiered replanning. |
| 🛂 Termination algebra | MaxIterations(10) \| TextMention("DONE") & ConfidenceMet(0.9) — real Python __or__/__and__ overloads. Greppable, unit-testable, serialisable. |
| 🛡 Idempotent tools | @tool(idempotent=True) dedupes on (name, args) inside Execute. No double-charge on model retry or checkpoint resume. |
| 💾 Durable memory | Four native checkpointers + five storage-backed (PostgreSQL, OpenSearch, Redis, SQLite, Oracle 26ai). |
Deployment & integration¶
| 📡 Streaming + Server | Typed events for match consumers · SSE · drop-in FastAPI AgentServer. |
| 🪝 Hooks | Logging · Telemetry · ModelRetry · Guardrails · Steering. |
| 🔎 RAG | Seven vector stores · OCI Cohere + OpenAI embeddings · multimodal. |
| 🪙 MCP both ways | MCPClient consumes external servers. LocusMCPServer exposes locus tools. |
| 🌐 Multi-modal | web_search=, web_fetch=, image_generator=, speech_provider= on Agent(...). |
| 📊 Evaluation | EvalCase / EvalRunner / EvalReport regression suites. |
| 🧰 Models | OCI GenAI (V1 + SDK, 90+ models, OpenAI commercial + xAI Grok) · OpenAI · Anthropic · Ollama. One get_model() call, any provider. |
Hello, agent¶
from locus import Agent
from locus.tools.decorator import tool
@tool(idempotent=True)
def book_flight(flight_id: str, customer_id: str) -> dict:
"""Book a flight. Idempotent — re-fires return the cached receipt."""
return billing.charge_and_book(flight_id, customer_id)
agent = Agent(
model="oci:openai.gpt-5.5",
tools=[book_flight],
system_prompt="You are a travel concierge. Book the flight the user asks for.",
)
print(agent.run_sync("Book TK-12 for customer C-42").message)
That's the entire interface. The model picks the tool. The tool charges once. The agent stops.
A three-agent vendor PO approval workflow against a live Oracle 26ai
catalogue — Procurement and Compliance debate, hand off to an Approval
Officer, the human approves, idempotent writes fire — runs end-to-end
in the multi-agent and idempotency tutorials under
examples/.
Source¶
locus is small enough to read end-to-end. No magic, no hidden registries, no import-time side-effects. A few entry points:
| What | Where |
|---|---|
| Agent + ReAct loop | loop/nodes.py — ThinkNode:59, ExecuteNode:136, ReflectNode:260 |
| Cognitive router | router/runtime.py:42 → protocol.py → policy.py → compiler.py |
| Multi-agent shapes | multiagent/ — orchestrator.py, swarm.py, handoff.py, graph.py, functional.py |
| Observability | observability/event_bus.py · agent/runtime_loop.py:61 (@_bus_bridge) |
| Idempotent tools | tools/decorator.py:113 · core/termination.py:39 |
| OCI model transport | models/providers/oci/openai_compat.py:163 |
Full source map → Capabilities · API reference
Learn locus in an afternoon¶
The examples/
tree is 55 progressive tutorials. Every tutorial is one runnable
file and adds exactly one idea on top of the previous.
Track 1 — basics (first hour)¶
| # | What you learn |
|---|---|
| 01 basic agent | Make an Agent, give it a model, run a prompt. |
| 02 agent + tools | Decorate a Python function with @tool. The model sees a typed contract. |
| 03 agent memory | Conversations across runs — checkpointers, thread_id. |
| 04 streaming | Stream typed events as the agent thinks, calls tools, terminates. |
| 05 hooks | Lifecycle hooks — log every model call and every tool result. |
Track 2 — graphs & state (06–10)¶
| # | What you learn |
|---|---|
| 06 basic graph | StateGraph — explicit nodes and edges over implicit ReAct. |
| 07 conditional routing | Branch on state — add_conditional_edges. |
| 08 state reducers | Custom reducers for accumulating fields across nodes. |
| 09 human in the loop | Pause the graph for human approval, resume on input. |
| 10 advanced patterns | Send, broadcasts, subgraphs — map/reduce on agents. |
Track 3 — multi-agent (11, 16–18, 25, 34, 36)¶
The six native patterns plus A2A: Swarm · Handoff · Orchestrator · Specialists · Composition · A2A · Functional.
Track 4 — reasoning, RAG, skills (13–15, 22–24, 32)¶
Structured output · Reasoning patterns · Playbooks · RAG basics · RAG providers · RAG agents · Skills.
Track 5 — production (12, 19–21, 26–30, 33, 35, 37–40)¶
MCP · Guardrails · Checkpoint backends · SSE streaming · Evaluation · Hooks advanced · Agent Server · Model providers · Guardrails advanced · Plugins · Steering · Graph advanced · Termination · Multi-modal providers · GSAR typed grounding · OCI Dedicated AI Cluster (DAC).
Track 6 — cognitive router + observability (41, 51–55)¶
DeepAgent · Cognitive router · Observability basics · Agent yield bridge · EventBus subscriber patterns · Full event catalogue.
Then deploy¶
When the agent is ready, ship it. AgentServer is a drop-in FastAPI
wrapper; no extra glue.
from locus.server import AgentServer
server = AgentServer(agent=my_agent, cors_origins=["https://app.example.com"])
server.run(host="0.0.0.0", port=8080)
You get out of the box:
POST /invoke— synchronous run, fullAgentResultJSON.POST /stream— Server-Sent Events of every typed event.GET / DELETE /threads/{id}— conversation persistence (with a checkpointer attached).GET /health— liveness probe.
Deploys anywhere FastAPI runs:
- OCI Functions — serverless, scale to zero.
- OKE / Container Instances —
docker buildand ship. - OCI Compute —
uvicorn locus.server:run --port 8080. - Kubernetes / EKS / Cloud Run — same Dockerfile.
Built inside Oracle. Used in production. Open to everyone.
locus turns hard agentic work — retries that don't double-charge, state that survives restarts, multi-agent flows that fit the problem, and reasoning that catches its own mistakes — into ordinary Python you can reason about, test, and ship.