Skip to content

Multi-agent workflows

Multi-agent workflows are what locus is for. Seven shapes you compose in one process or scale across a mesh, every shape backed by the same Agent class, the same event stream, and the same primitives. Pick a shape directly, or let the cognitive router select and compile the right one from a natural-language task description.

Seven multi-agent workflow shapes — Composition, Orchestrator + Specialists, Swarm, Handoff, StateGraph, Functional, A2A

Don't know which shape to use?

PRISM — the cognitive router extracts a typed GoalFrame from your task and selects a matching protocol from a typed registry. Eight built-in protocols, zero topology hand-writing.

What you can ship today

Every example below is a real examples/tutorial_NN_*.py file in the repo, runs end-to-end against the bundled MockModel (no creds), and upgrades to live OCI / OpenAI by setting one env var.

Workflow One line Code
41 DeepAgent — research factory create_deepagent with reflexion + grounding + subagent dispatch + deepagent.* SSE events. tutorial_41_deepagent.py
42 Map-reduce code review Scatter a diff to N reviewers via Send, reduce findings into one report. tutorial_42_map_reduce_code_review.py
43 Supervisor + critic loop Researcher → Writer → Critic, loop back to Writer until critic approves (cap'd revisions). tutorial_43_supervisor_critic_loop.py
44 Adversarial debate + judge PRO and CON argue across N rounds; Judge emits a typed Verdict via output_schema. tutorial_44_debate_with_judge.py
45 Multi-agent + human-in-the-loop Three patterns in one file: approval gate, human-as-tool, long-pause snapshot/resume. tutorial_45_multiagent_human_in_loop.py
46 On-call incident response Triage → 3 parallel investigators (logs / metrics / traces) → severity gate → page-the-human → mitigate → typed Postmortem. tutorial_46_incident_response.py
47 Tiered approval workflow Justifier → Vendor analyst → tier router (auto / manager / +finance / +CFO) → typed PurchaseOrder. Three stacked interrupt() gates on the top tier. tutorial_47_procurement_approval.py
48 Contract review + negotiation Parser → 3 parallel reviewers → negotiation gate → human counsel → Command(goto="sign_off") short-circuits when resolved. Cycles enabled. tutorial_48_contract_review.py

Pick a shape

Three questions get you to the right shape almost every time:

  1. Do agents need to talk across processes or runtimes? If yes, you want A2A. If no, everything else lives in one Python process.
  2. Does the flow have cycles or conditional routing? If yes, you want StateGraph. If it's a straight chain or fan-out, you want Composition.
  3. Do you want one coordinator picking the next agent, or peers collaborating without a central router? Coordinator → Orchestrator + Specialists. Peers → Swarm. A single agent passes the conversation onward → Handoff.

The decision tree below is the same questions in diagram form.

                ┌── do agents need to talk across processes / runtimes? ──┐
                │                                                         │
              yes ──→  A2A                                                no
                  ┌─── need explicit control flow? ───┐
                  │                                   │
                yes                                   no
                  │                                   │
        ┌─────────┴───────────┐         ┌─────────────┴────────────┐
        │                     │         │                          │
   linear / fan-out       cycles?     central router?         no router
   no cycles               yes          yes                     │
        │                  │            │                       │
   Composition         StateGraph   Orchestrator + Specialists   Swarm
                                                              one agent
                                                              hands off?
                                                             yes  │  no
                                                                Handoff

Writing your own glue (asyncio fan-out, retries, schedulers)? Use the Functional API (@task, @entrypoint) — a thin wrapper that brings agent runs into the ordinary asyncio universe.

The seven shapes

Pattern Best for Key class Source
Composition linear chains; fan-out + merge; revise-until-confidence SequentialPipeline, ParallelPipeline, LoopAgent agent/composition.py
Orchestrator + Specialists one router decides which expert handles each sub-task Orchestrator, Specialist multiagent/orchestrator.py
Swarm open-ended research; peer-to-peer; shared context Swarm, SharedContext multiagent/swarm.py
Handoff escalation desks; conversation moves with full history Handoff multiagent/handoff.py
StateGraph explicit DAG with cycles, conditional edges, subgraphs StateGraph, Node, Edge multiagent/graph.py
Functional map/reduce over agents; asyncio-native composition @task, @entrypoint multiagent/functional.py
A2A cross-process / cross-runtime; capability discovery A2AServer, A2AClient, AgentCard a2a/protocol.py

Workflow primitives

The pieces every shape is built from. Drop them into any graph node.

Send — scatter / map-reduce

from locus import Send

async def split(state):
    return [Send("worker", {"task": t}) for t in state["tasks"]]

Returning a list of Send from a node spawns parallel executions — no asyncio.gather, no shared mutable state. Each result lands in state[send.id] keyed by the send id. Used by tutorials 42, 46, 48.

interrupt() — pause for a human

from locus.core import interrupt

async def approval_node(state):
    response = interrupt({"question": "Ship it?", "options": ["yes", "no"]})
    return {"approved": response == "yes"}

interrupt() raises InterruptException; the graph catches it, snapshots state, and returns control to the caller. Resume by calling graph.execute(Command(resume="yes")). Used by tutorials 45, 46, 47, 48.

Command(goto=...) — explicit routing

from locus.core import goto

async def smart_router(state):
    if state["urgent"]:
        return goto("emergency", priority=10)   # skip ahead
    return {"score": compute_score(state)}      # normal flow

Return a Command from a node to override the default edge — useful for short-circuiting refinement loops or skipping straight to sign-off. Used by tutorial 48 to skip the negotiation loop when counsel says RESOLVED.

Agent(output_schema=...) — typed terminal artifacts

from pydantic import BaseModel
from locus import Agent, AgentConfig

class Verdict(BaseModel):
    winner: str
    confidence: float
    reasoning: str

agent = Agent(config=AgentConfig(model="oci:openai.gpt-5.5", output_schema=Verdict))
result = agent.run_sync("...")
verdict: Verdict = result.parsed   # validated Pydantic instance, not free text

When you need a typed artifact at the workflow boundary — Verdict, Postmortem, PurchaseOrder, ContractDecisionoutput_schema gives you a validated Pydantic instance. Used by tutorials 44, 46, 47, 48.

GraphConfig(allow_cycles=True) — refinement loops

from locus import GraphConfig, StateGraph

graph = StateGraph(config=GraphConfig(allow_cycles=True, max_iterations=20))
graph.add_edge("critic", "writer")   # loop edge — only legal with allow_cycles

Cycles are off by default (so you can't accidentally infinite-loop). Opt in with allow_cycles=True plus an iteration cap. Used by tutorials 43, 48.

Why these workflows ship to prod

The boring stuff that turns a demo into a product. Every primitive below works in any of the seven shapes — you don't pick "shape" or "production-ready", you get both.

Reflexion — catch a bad turn before the next one

agent = Agent(config=AgentConfig(model=..., reflexion=True))

reflexion=True self-evaluates every turn and feeds the next Think a sharper plan. → Reasoning concept

Grounding — verify claims against their source

agent = Agent(config=AgentConfig(model=..., grounding=True))

Each claim is scored against the tool result it came from; below-threshold claims get dropped or sent back. → Reasoning concept · GSAR for typed grounding.

Idempotent tools — side effects fire once

@tool(idempotent=True)
def book_flight(flight_id: str, customer_id: str) -> dict:
    return billing.charge_and_book(flight_id, customer_id)

The ReAct loop dedupes repeat calls on the (name, kwargs) hash — the model can't double-charge, double-book, or double-page. → Idempotency concept.

Checkpointing — survive every restart

agent = Agent(config=AgentConfig(
    model=...,
    checkpointer=OCIBucketBackend(bucket="...", namespace="..."),
))

Nine backends — one Protocol — and the graph snapshots state at every interrupt boundary. Pause for a human Friday afternoon, resume Monday morning from a different process. → Checkpointers.

Streaming events — every node visible

async for event in graph.stream(initial, mode=StreamMode.NODES):
    match event:
        case StreamEvent(node_id=n, mode=StreamMode.NODES):
            print(f"✓ {n}")

Every shape in this section emits the same typed events. SSE-ready, match-statement friendly, attributable to the specific specialist that produced them. → Streaming.

One event stream across all of them

All seven patterns plus A2A share the same typed event taxonomy. Consume directly from the generator, or use the opt-in EventBus to get per-component SSE events (agent.think, agent.tool.started, multiagent.orchestrator.routing, etc.) from every layer simultaneously:

from locus.observability import run_context, get_event_bus

async with run_context() as rid:
    result = orchestrator.run_sync("Plan Q3 launch.")

    async for ev in get_event_bus().subscribe(rid):
        match ev.event_type:
            case "multiagent.orchestrator.decision":
                print("coordinator →", ev.data["specialists_selected"])
            case "agent.tool.started":
                print("  🔧", ev.data["tool_name"])
            case "agent.terminate":
                print("  ✓", ev.data["final_message_preview"])

agent_name is set on every event so you can attribute output to the specialist that produced it. SSE streams from AgentServer carry the same shape — your front-end consumer is unchanged whether the back-end is a single agent, an orchestrator, a swarm, or an A2A mesh.

Observability — EventBus & SSE · SSE event catalogue

Mixing shapes

Nothing stops you running a Swarm whose members are themselves Orchestrators, with a StateGraph wrapping the whole thing for retry policy. The patterns compose; pick the shape that fits each layer of the problem.

See also

  • Agent Loop — the loop every agent in every shape runs.
  • Hooks — observe and steer across all of them.
  • Streaming — the typed event taxonomy.