Tutorial 45: Multi-agent workflows with human-in-the-loop¶
Tutorial 09 covered HITL with a single agent. Real production agentic systems rarely look like that — most of them have:
1. A triage agent that classifies incoming work.
2. Specialist agents that do the work.
3. A *human approval gate* before any irreversible action.
4. An *escalation path* when the agents can't decide.
This tutorial walks three patterns that combine multi-agent workflows
with human input. All three use the interrupt() primitive so the
graph pauses, returns control to the caller, then resumes when the
human responds — no busy-waiting, no callback hell.
Patterns covered:
- Pattern A — Approval gate: a Triage agent classifies a refund request, a Specialist drafts the response, a human approves before it ships.
- Pattern B — Human-as-tool: when the Triage agent isn't confident, it asks the human a structured question rather than guessing. The human's answer becomes part of state for downstream specialists.
- Pattern C — Multi-step interrupt + checkpoint: the graph saves state across an interrupt boundary so a human can come back hours later (different process / different caller) and the workflow picks up where it left off.
What's differentiated about Locus here:
interrupt()is a function-level primitive — no need to wire a separate "wait-for-human" node type. Any node can pause.- The graph executor returns an
InterruptStatethat carries the full workflow state. Resume by callinggraph.execute(Command( resume=...)). State doesn't have to live in a global anywhere. - Combine with a
checkpointerand the workflow can pause for hours/days while preserving every specialist's context. - Set
LOCUS_MODEL_PROVIDER=oci|openaito drive real specialists.
Run::
python examples/tutorial_45_multiagent_human_in_loop.py
Difficulty: Advanced Prerequisites: tutorial_09_human_in_the_loop, tutorial_43 (this series)
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Tutorial 45: Multi-agent workflows with human-in-the-loop.
Tutorial 09 covered HITL with a single agent. Real production agentic
systems rarely look like that — most of them have:
1. A triage agent that classifies incoming work.
2. Specialist agents that do the work.
3. A *human approval gate* before any irreversible action.
4. An *escalation path* when the agents can't decide.
This tutorial walks three patterns that combine multi-agent workflows
with human input. All three use the ``interrupt()`` primitive so the
graph **pauses, returns control to the caller, then resumes** when the
human responds — no busy-waiting, no callback hell.
Patterns covered:
* **Pattern A — Approval gate**: a Triage agent classifies a refund
request, a Specialist drafts the response, a human approves before
it ships.
* **Pattern B — Human-as-tool**: when the Triage agent isn't confident,
it asks the human a structured question rather than guessing. The
human's answer becomes part of state for downstream specialists.
* **Pattern C — Multi-step interrupt + checkpoint**: the graph saves
state across an interrupt boundary so a human can come back hours
later (different process / different caller) and the workflow
picks up where it left off.
What's differentiated about Locus here:
* ``interrupt()`` is a function-level primitive — no need to wire a
separate "wait-for-human" node type. Any node can pause.
* The graph executor returns an ``InterruptState`` that carries the
full workflow state. Resume by calling ``graph.execute(Command(
resume=...))``. State doesn't have to live in a global anywhere.
* Combine with a ``checkpointer`` and the workflow can pause for
hours/days while preserving every specialist's context.
* Set ``LOCUS_MODEL_PROVIDER=oci|openai`` to drive real specialists.
Run::
python examples/tutorial_45_multiagent_human_in_loop.py
Difficulty: Advanced
Prerequisites: tutorial_09_human_in_the_loop, tutorial_43 (this series)
"""
from __future__ import annotations
import asyncio
from typing import Any
from config import get_model
from locus.agent import Agent, AgentConfig
from locus.core import Command, interrupt
from locus.core.events import TerminateEvent
from locus.multiagent.graph import END, START, StateGraph
# ---------------------------------------------------------------------------
# Specialists
# ---------------------------------------------------------------------------
def _make_agent(role: str, system_prompt: str, model: Any) -> Agent:
return Agent(
config=AgentConfig(
agent_id=f"agent-{role}",
model=model,
system_prompt=system_prompt,
max_iterations=2,
max_tokens=300,
)
)
TRIAGE_PROMPT = (
"You are a customer-support triage agent. Read the request and "
"respond with EXACTLY ONE of: refund, billing, technical, escalate. "
"Use 'escalate' only when the request is ambiguous or requires "
"manager judgment."
)
REFUND_PROMPT = (
"You are a refund specialist. Draft a polite, concise reply confirming "
"the refund will be processed. Two sentences max."
)
async def _run_agent(agent: Agent, prompt: str) -> str:
final = ""
async for event in agent.run(prompt):
if isinstance(event, TerminateEvent):
final = event.final_message or ""
return final.strip()
# ---------------------------------------------------------------------------
# Pattern A — Approval gate
# ---------------------------------------------------------------------------
async def triage_node(state: dict[str, Any]) -> dict[str, Any]:
agent = _make_agent("triage", TRIAGE_PROMPT, state["__model__"])
category = await _run_agent(agent, f"Customer request: {state['request']!r}")
return {"category": category.strip().lower().split()[0] if category else "escalate"}
async def draft_refund_node(state: dict[str, Any]) -> dict[str, Any]:
agent = _make_agent("refund", REFUND_PROMPT, state["__model__"])
draft = await _run_agent(
agent, f"Customer request: {state['request']!r}\nDraft the refund response."
)
return {"draft": draft}
async def human_approval_node(state: dict[str, Any]) -> dict[str, Any]:
"""Pause the graph until a human approves or rejects the draft.
``interrupt()`` raises ``InterruptException`` here, the graph
catches it, snapshots state, and returns an ``InterruptState`` to
the caller. When the caller calls ``graph.execute(Command(resume=
'yes'|'no'))`` we land back in this node, and ``interrupt()``
returns the resume value.
"""
response = interrupt(
{
"type": "approval",
"question": "Approve this refund response?",
"draft": state.get("draft", ""),
"options": ["yes", "no"],
}
)
return {"approved": response == "yes", "human_response": response}
async def send_or_cancel_node(state: dict[str, Any]) -> dict[str, Any]:
if state.get("approved"):
return {"result": "✓ Sent to customer", "outcome": "sent"}
return {"result": "✗ Cancelled by human reviewer", "outcome": "cancelled"}
def build_approval_graph() -> StateGraph:
g = StateGraph(name="hitl-approval-gate")
g.add_node("triage", triage_node)
g.add_node("draft", draft_refund_node)
g.add_node("approve", human_approval_node)
g.add_node("send", send_or_cancel_node)
g.add_edge(START, "triage")
g.add_edge("triage", "draft")
g.add_edge("draft", "approve")
g.add_edge("approve", "send")
g.add_edge("send", END)
return g
async def demo_pattern_a(model: Any) -> None:
print("\n=== Pattern A: Approval gate ===\n")
graph = build_approval_graph()
initial = {"request": "I want a refund for order #42 — it never shipped.", "__model__": model}
# First execute — runs triage, draft, then pauses at approve.
result = await graph.execute(initial)
if not result.interrupt:
print(f" ✗ unexpected: graph completed without interrupt: {result.final_state}")
return
payload = result.interrupt.interrupt.payload
print(f" ⏸ Paused at: {result.interrupt.node_id}")
print(f" Question: {payload.get('question')}")
print(f" Draft: {payload.get('draft')}")
# Simulate the human approving.
print(" ▶ Human responds: 'yes'")
final = await graph.execute(
Command(resume="yes", update=result.final_state),
)
print(f" ✓ Final outcome: {final.final_state.get('result')}")
# ---------------------------------------------------------------------------
# Pattern B — Human-as-tool (escalation when triage isn't confident)
# ---------------------------------------------------------------------------
async def smart_triage_node(state: dict[str, Any]) -> dict[str, Any]:
"""Triage with an *escalate* fallback that asks the human directly."""
valid = {"refund", "billing", "technical"}
agent = _make_agent("triage", TRIAGE_PROMPT, state["__model__"])
raw = await _run_agent(agent, f"Customer request: {state['request']!r}")
first = (raw.lower().split() or ["escalate"])[0]
# Treat anything that isn't one of the explicit categories — including
# the mock model's filler text — as 'escalate'. That's what we want
# in production too: never run a specialist with a bogus category.
category = first if first in valid else "escalate"
if category == "escalate":
# Human-as-tool: the agent admits it doesn't know; ask the person.
category = interrupt(
{
"type": "escalation",
"question": (
f"Triage agent is not confident. Pick a category for: {state['request']!r}"
),
"options": ["refund", "billing", "technical", "drop"],
}
)
return {"category": category}
async def route_node(state: dict[str, Any]) -> dict[str, Any]:
return {"final_category": state.get("category", "drop")}
def build_escalation_graph() -> StateGraph:
g = StateGraph(name="hitl-escalation")
g.add_node("triage", smart_triage_node)
g.add_node("route", route_node)
g.add_edge(START, "triage")
g.add_edge("triage", "route")
g.add_edge("route", END)
return g
async def demo_pattern_b(model: Any) -> None:
print("\n=== Pattern B: Human-as-tool (escalation) ===\n")
graph = build_escalation_graph()
initial = {
"request": "weird flickering on the dashboard but only on Tuesdays?",
"__model__": model,
}
result = await graph.execute(initial)
if result.interrupt:
payload = result.interrupt.interrupt.payload
print(f" ⏸ Triage escalated. Asking human:")
print(f" {payload.get('question')}")
print(" ▶ Human responds: 'technical'")
final = await graph.execute(Command(resume="technical", update=result.final_state))
print(f" ✓ Routed to: {final.final_state.get('final_category')}")
else:
# Triage was confident — no human needed.
print(f" ✓ Triage confident ({result.final_state.get('category')}) — no escalation")
# ---------------------------------------------------------------------------
# Pattern C — Long-pause workflow with checkpointing
# ---------------------------------------------------------------------------
async def demo_pattern_c(model: Any) -> None:
"""Long-pause workflow: save the workflow state, resume later.
The simple in-memory case is just "hold the ``InterruptState`` from
the first ``execute`` call somewhere durable, then call ``execute``
again with ``Command(resume=...)`` when the human responds." The
``InterruptState`` includes a ``state_snapshot`` that has every
upstream node's output, so the resumed call has full context.
For multi-process / multi-day workflows you'd swap the in-memory
snapshot for a checkpointer (Redis / Postgres / Oracle / OCI Bucket).
The graph's built-in checkpointer hook expects an AgentState — for
pure-graph workflows like this one, persisting the
``InterruptState`` to your own store is the simpler path.
"""
print("\n=== Pattern C: Long-pause workflow (snapshot + resume) ===\n")
graph = build_approval_graph()
initial = {"request": "Refund for order #42 — never arrived.", "__model__": model}
paused = await graph.execute(initial)
if not paused.interrupt:
print(" ✗ unexpected: workflow completed without pause")
return
snapshot_state = paused.final_state
print(f" ⏸ Paused at {paused.interrupt.node_id}")
print(
f" Snapshot has {len(snapshot_state)} state keys — persist these "
"to Redis / Postgres / a queue / etc."
)
# ... time passes; reviewer comes back ...
print(" ▶ Hours later: load snapshot, resume with the human's answer")
# Re-attach the model object — the snapshot has the JSON-able parts
# only. Real production code would rebuild the model from config too.
resumed = await graph.execute(
Command(resume="yes", update={**snapshot_state, "__model__": model}),
)
print(f" ✓ Resumed and finished: {resumed.final_state.get('result')}")
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
async def main() -> None:
print("Tutorial 45: Multi-agent + human-in-the-loop")
print("=" * 60)
model = get_model()
await demo_pattern_a(model)
await demo_pattern_b(model)
await demo_pattern_c(model)
if __name__ == "__main__":
asyncio.run(main())