Skip to content

Human-in-the-Loop

Pause a graph mid-execution, ask a human, then resume with their answer. interrupt(payload) pauses the running node and returns control to the caller with result.is_interrupted = True. The caller inspects the payload, gets a response (web form, CLI prompt, Slack reply — whatever makes sense), then calls graph.execute(Command(update=..., resume=...)) to continue. The same node restarts; its interrupt() call now returns the supplied response.

What you'll see:

  • interrupt(payload) — surface a payload to the caller.
  • Command(update=..., resume=...) — resume execution.
  • Multiple interrupts in one workflow.
  • Conditional interrupts (only ask for higher-risk cases).
  • graph.config.interrupt_before = [...] — pause before specific nodes without modifying them.

This notebook doesn't call any LLM, so the model provider doesn't matter:

LOCUS_MODEL_PROVIDER=mock python examples/notebook_25_human_in_the_loop.py

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Pause a graph mid-execution, ask a human, then resume with their answer.

`interrupt(payload)` pauses the running node and returns control to the
caller with `result.is_interrupted = True`. The caller inspects the
payload, gets a response (web form, CLI prompt, Slack reply — whatever
makes sense), then calls `graph.execute(Command(update=..., resume=...))`
to continue. The same node restarts; its `interrupt()` call now
returns the supplied response.

- `interrupt(payload)` — pause and surface a payload to the caller.
- `Command(update=..., resume=...)` — resume execution with a response.
- Multiple interrupts in one workflow.
- Conditional interrupts (only ask for higher-risk cases).
- `graph.config.interrupt_before = [...]` — pause before specific nodes.

Run it:
    LOCUS_MODEL_PROVIDER=mock python examples/notebook_25_human_in_the_loop.py

This notebook doesn't call any LLM, so the model provider doesn't
matter. The default is still OCI GenAI when ~/.oci/config exists.
"""

import asyncio

from locus.core import Command, interrupt
from locus.multiagent import END, START, StateGraph


# =============================================================================
# Part 1: One interrupt
# =============================================================================


async def example_basic_interrupt():
    """Pause before a destructive action and wait for a yes/no response."""
    print("=== Part 1: One interrupt ===\n")

    graph = StateGraph()

    async def prepare(inputs):
        return {"action": "delete", "target": inputs.get("file", "data.txt")}

    async def request_approval(inputs):
        # interrupt() pauses the node, surfaces `payload` to the caller via
        # result.interrupt, and returns the value passed to resume= when the
        # graph is re-executed.
        response = interrupt(
            {
                "question": f"Approve {inputs['action']} on {inputs['target']}?",
                "options": ["yes", "no"],
            }
        )
        return {"approved": response == "yes", "response": response}

    async def execute_action(inputs):
        if inputs.get("approved"):
            return {"result": f"Executed {inputs['action']} on {inputs['target']}"}
        return {"result": "Action cancelled"}

    graph.add_node("prepare", prepare)
    graph.add_node("approval", request_approval)
    graph.add_node("execute", execute_action)

    graph.add_edge(START, "prepare")
    graph.add_edge("prepare", "approval")
    graph.add_edge("approval", "execute")
    graph.add_edge("execute", END)

    print("Starting workflow...")
    result = await graph.execute({"file": "important.txt"})

    if result.is_interrupted:
        print(f"PAUSED at: {result.interrupt.node_id}")
        print(f"Question: {result.interrupt.interrupt.payload['question']}")

        print("User responds: 'yes'")
        result = await graph.execute(Command(update=result.final_state, resume="yes"))

        print(f"Result: {result.final_state.get('result')}")
    print()


# =============================================================================
# Part 2: Several interrupts in a row
# =============================================================================


async def example_multi_step():
    """A multi-question form modelled as one interrupt per question."""
    print("=== Part 2: Several interrupts in a row ===\n")

    graph = StateGraph()

    async def ask_name(inputs):
        name = interrupt({"question": "What is your name?", "type": "text"})
        return {"name": name}

    async def ask_email(inputs):
        email = interrupt({"question": f"Hi {inputs['name']}, what's your email?"})
        return {"email": email}

    async def confirm(inputs):
        confirmed = interrupt(
            {
                "question": f"Confirm: {inputs['name']} <{inputs['email']}>?",
                "options": ["confirm", "cancel"],
            }
        )
        return {"confirmed": confirmed == "confirm"}

    async def complete(inputs):
        if inputs.get("confirmed"):
            return {"status": "Account created", "user": inputs["name"]}
        return {"status": "Cancelled"}

    graph.add_node("name", ask_name)
    graph.add_node("email", ask_email)
    graph.add_node("confirm", confirm)
    graph.add_node("complete", complete)

    graph.add_edge(START, "name")
    graph.add_edge("name", "email")
    graph.add_edge("email", "confirm")
    graph.add_edge("confirm", "complete")
    graph.add_edge("complete", END)

    responses = ["Alice", "alice@example.com", "confirm"]

    print("Registration flow:")
    result = await graph.execute({})

    for response in responses:
        if result.is_interrupted:
            print(f"  Q: {result.interrupt.interrupt.payload['question']}")
            print(f"  A: {response}")
            result = await graph.execute(Command(update=result.final_state, resume=response))
        else:
            break

    print(f"\nFinal: {result.final_state.get('status')}")
    print()


# =============================================================================
# Part 3: Interrupt only when it matters
# =============================================================================


async def example_conditional_interrupt():
    """Auto-approve low-risk cases; pause only for medium and high risk."""
    print("=== Part 3: Interrupt only when it matters ===\n")

    graph = StateGraph()

    async def assess_risk(inputs):
        amount = inputs.get("amount", 0)
        if amount < 100:
            risk = "low"
        elif amount < 1000:
            risk = "medium"
        else:
            risk = "high"
        return {"amount": amount, "risk": risk}

    async def maybe_approve(inputs):
        risk = inputs.get("risk")
        if risk == "low":
            return {"approved": True, "approver": "auto"}

        required = "manager" if risk == "medium" else "executive"
        response = interrupt(
            {
                "message": f"${inputs['amount']} requires {required} approval",
                "risk": risk,
            }
        )
        return {"approved": response == "approve", "approver": required}

    async def process(inputs):
        if inputs.get("approved"):
            return {"result": f"Transaction approved by {inputs['approver']}"}
        return {"result": "Transaction rejected"}

    graph.add_node("assess", assess_risk)
    graph.add_node("approve", maybe_approve)
    graph.add_node("process", process)

    graph.add_edge(START, "assess")
    graph.add_edge("assess", "approve")
    graph.add_edge("approve", "process")
    graph.add_edge("process", END)

    test_cases = [
        (50, None),
        (500, "approve"),
        (5000, "approve"),
    ]

    for amount, user_response in test_cases:
        print(f"Processing ${amount}...")
        result = await graph.execute({"amount": amount})

        if result.is_interrupted:
            print(f"  Needs approval: {result.interrupt.interrupt.payload['risk']} risk")
            result = await graph.execute(Command(update=result.final_state, resume=user_response))

        print(f"  -> {result.final_state.get('result')}")
    print()


# =============================================================================
# Part 4: interrupt_before — pause without modifying the node
# =============================================================================


async def example_interrupt_before():
    """Pause before listed nodes without putting interrupt() inside them."""
    print("=== Part 4: interrupt_before — pause without modifying the node ===\n")

    graph = StateGraph()

    async def prepare(inputs):
        return {"data": inputs.get("data", "sample"), "prepared": True}

    async def deploy(inputs):
        return {"deployed": True, "target": inputs.get("environment")}

    async def verify(inputs):
        return {"verified": True}

    graph.add_node("prepare", prepare)
    graph.add_node("deploy", deploy)
    graph.add_node("verify", verify)

    graph.add_edge(START, "prepare")
    graph.add_edge("prepare", "deploy")
    graph.add_edge("deploy", "verify")
    graph.add_edge("verify", END)

    # Pause before any node in this list. Useful when the sensitive step
    # is third-party code you can't edit to call interrupt() directly.
    graph.config.interrupt_before = ["deploy"]

    print("Deploying to production...")
    result = await graph.execute({"environment": "production", "data": "v2.0"})

    if result.is_interrupted:
        print(f"PAUSED before: {result.interrupt.node_id}")
        print(f"Current state: prepared={result.final_state.get('prepared')}")
        print("\nResume with graph.execute(Command(update=..., resume=...)).")
    print()


# =============================================================================
# Part 5: A two-stage approval workflow
# =============================================================================


async def example_complete_workflow():
    """Technical review then manager sign-off, each its own interrupt."""
    print("=== Part 5: A two-stage approval workflow ===\n")

    graph = StateGraph()

    async def create_request(inputs):
        return {
            "request_id": "REQ-001",
            "type": inputs.get("type", "change"),
            "description": inputs.get("description", ""),
            "status": "pending",
        }

    async def technical_review(inputs):
        approval = interrupt(
            {
                "step": "Technical Review",
                "request": inputs["request_id"],
                "description": inputs["description"],
                "question": "Is this technically feasible?",
            }
        )
        return {
            "tech_approved": approval == "approve",
            "tech_comments": "Reviewed by engineering",
        }

    async def manager_approval(inputs):
        if not inputs.get("tech_approved"):
            return {"status": "rejected", "reason": "Technical review failed"}

        approval = interrupt(
            {
                "step": "Manager Approval",
                "request": inputs["request_id"],
                "question": "Approve this change request?",
            }
        )
        return {
            "manager_approved": approval == "approve",
            "status": "approved" if approval == "approve" else "rejected",
        }

    async def finalize(inputs):
        status = inputs.get("status")
        return {
            "final_status": status,
            "message": f"Request {inputs['request_id']}: {status}",
        }

    graph.add_node("create", create_request)
    graph.add_node("tech", technical_review)
    graph.add_node("manager", manager_approval)
    graph.add_node("finalize", finalize)

    graph.add_edge(START, "create")
    graph.add_edge("create", "tech")
    graph.add_edge("tech", "manager")
    graph.add_edge("manager", "finalize")
    graph.add_edge("finalize", END)

    print("Change Request Workflow")
    print("-" * 30)

    result = await graph.execute(
        {
            "type": "change",
            "description": "Update database schema",
        }
    )

    approvals = ["approve", "approve"]
    approval_idx = 0

    while result.is_interrupted and approval_idx < len(approvals):
        step = result.interrupt.interrupt.payload.get("step", "Unknown")
        question = result.interrupt.interrupt.payload.get("question", "")
        print(f"\n{step}: {question}")
        print(f"  -> {approvals[approval_idx]}")

        result = await graph.execute(
            Command(update=result.final_state, resume=approvals[approval_idx])
        )
        approval_idx += 1

    print(f"\nResult: {result.final_state.get('message')}")
    print()


# =============================================================================
# Main
# =============================================================================


async def main():
    print("=" * 60)
    print("Notebook 20: Human-in-the-loop")
    print("=" * 60)
    print()

    await example_basic_interrupt()
    await example_multi_step()
    await example_conditional_interrupt()
    await example_interrupt_before()
    await example_complete_workflow()

    print("=" * 60)
    print("Next: Notebook 21 — Advanced patterns")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())