Skip to content

Tutorial 09: Human-in-the-Loop

This tutorial covers:

  • Pausing graphs for human input
  • The interrupt() function
  • Resuming execution with responses
  • Approval workflows

Prerequisites: Tutorial 08 (State Reducers) Difficulty: Advanced

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 09: Human-in-the-Loop

This tutorial covers:
- Pausing graphs for human input
- The interrupt() function
- Resuming execution with responses
- Approval workflows

Prerequisites: Tutorial 08 (State Reducers)
Difficulty: Advanced
"""

import asyncio

from locus.core import Command, interrupt
from locus.multiagent import END, START, StateGraph


# =============================================================================
# Part 1: Basic Interrupt
# =============================================================================


async def example_basic_interrupt():
    """Pause execution and wait for human input."""
    print("=== Part 1: Basic Interrupt ===\n")

    graph = StateGraph()

    async def prepare(inputs):
        return {"action": "delete", "target": inputs.get("file", "data.txt")}

    async def request_approval(inputs):
        # interrupt() pauses execution and returns when resumed
        response = interrupt(
            {
                "question": f"Approve {inputs['action']} on {inputs['target']}?",
                "options": ["yes", "no"],
            }
        )
        return {"approved": response == "yes", "response": response}

    async def execute_action(inputs):
        if inputs.get("approved"):
            return {"result": f"Executed {inputs['action']} on {inputs['target']}"}
        return {"result": "Action cancelled"}

    graph.add_node("prepare", prepare)
    graph.add_node("approval", request_approval)
    graph.add_node("execute", execute_action)

    graph.add_edge(START, "prepare")
    graph.add_edge("prepare", "approval")
    graph.add_edge("approval", "execute")
    graph.add_edge("execute", END)

    # First execution - will pause at approval
    print("Starting workflow...")
    result = await graph.execute({"file": "important.txt"})

    if result.is_interrupted:
        print(f"PAUSED at: {result.interrupt.node_id}")
        print(f"Question: {result.interrupt.interrupt.payload['question']}")

        # Simulate human providing "yes"
        print("User responds: 'yes'")

        # Resume with the response
        result = await graph.execute(Command(update=result.final_state, resume="yes"))

        print(f"Result: {result.final_state.get('result')}")
    print()


# =============================================================================
# Part 2: Multi-Step Approval
# =============================================================================


async def example_multi_step():
    """Multiple interrupt points in a workflow."""
    print("=== Part 2: Multi-Step Approval ===\n")

    graph = StateGraph()

    async def ask_name(inputs):
        name = interrupt({"question": "What is your name?", "type": "text"})
        return {"name": name}

    async def ask_email(inputs):
        email = interrupt({"question": f"Hi {inputs['name']}, what's your email?"})
        return {"email": email}

    async def confirm(inputs):
        confirmed = interrupt(
            {
                "question": f"Confirm: {inputs['name']} <{inputs['email']}>?",
                "options": ["confirm", "cancel"],
            }
        )
        return {"confirmed": confirmed == "confirm"}

    async def complete(inputs):
        if inputs.get("confirmed"):
            return {"status": "Account created", "user": inputs["name"]}
        return {"status": "Cancelled"}

    graph.add_node("name", ask_name)
    graph.add_node("email", ask_email)
    graph.add_node("confirm", confirm)
    graph.add_node("complete", complete)

    graph.add_edge(START, "name")
    graph.add_edge("name", "email")
    graph.add_edge("email", "confirm")
    graph.add_edge("confirm", "complete")
    graph.add_edge("complete", END)

    # Simulate the full flow
    responses = ["Alice", "alice@example.com", "confirm"]

    print("Registration flow:")
    result = await graph.execute({})

    for response in responses:
        if result.is_interrupted:
            print(f"  Q: {result.interrupt.interrupt.payload['question']}")
            print(f"  A: {response}")
            result = await graph.execute(Command(update=result.final_state, resume=response))
        else:
            break

    print(f"\nFinal: {result.final_state.get('status')}")
    print()


# =============================================================================
# Part 3: Conditional Interrupts
# =============================================================================


async def example_conditional_interrupt():
    """Only interrupt when certain conditions are met."""
    print("=== Part 3: Conditional Interrupts ===\n")

    graph = StateGraph()

    async def assess_risk(inputs):
        amount = inputs.get("amount", 0)
        if amount < 100:
            risk = "low"
        elif amount < 1000:
            risk = "medium"
        else:
            risk = "high"
        return {"amount": amount, "risk": risk}

    async def maybe_approve(inputs):
        risk = inputs.get("risk")

        # Only interrupt for medium/high risk
        if risk == "low":
            return {"approved": True, "approver": "auto"}

        # High risk needs manager approval
        required = "manager" if risk == "medium" else "executive"
        response = interrupt(
            {
                "message": f"${inputs['amount']} requires {required} approval",
                "risk": risk,
            }
        )
        return {"approved": response == "approve", "approver": required}

    async def process(inputs):
        if inputs.get("approved"):
            return {"result": f"Transaction approved by {inputs['approver']}"}
        return {"result": "Transaction rejected"}

    graph.add_node("assess", assess_risk)
    graph.add_node("approve", maybe_approve)
    graph.add_node("process", process)

    graph.add_edge(START, "assess")
    graph.add_edge("assess", "approve")
    graph.add_edge("approve", "process")
    graph.add_edge("process", END)

    # Test different amounts
    test_cases = [
        (50, None),  # Low risk - auto approved
        (500, "approve"),  # Medium risk - manager approval
        (5000, "approve"),  # High risk - executive approval
    ]

    for amount, user_response in test_cases:
        print(f"Processing ${amount}...")
        result = await graph.execute({"amount": amount})

        if result.is_interrupted:
            print(f"  Needs approval: {result.interrupt.interrupt.payload['risk']} risk")
            result = await graph.execute(Command(update=result.final_state, resume=user_response))

        print(f"  -> {result.final_state.get('result')}")
    print()


# =============================================================================
# Part 4: interrupt_before Configuration
# =============================================================================


async def example_interrupt_before():
    """Use config to interrupt before specific nodes."""
    print("=== Part 4: interrupt_before ===\n")

    graph = StateGraph()

    async def prepare(inputs):
        return {"data": inputs.get("data", "sample"), "prepared": True}

    async def deploy(inputs):
        # This is a sensitive operation
        return {"deployed": True, "target": inputs.get("environment")}

    async def verify(inputs):
        return {"verified": True}

    graph.add_node("prepare", prepare)
    graph.add_node("deploy", deploy)
    graph.add_node("verify", verify)

    graph.add_edge(START, "prepare")
    graph.add_edge("prepare", "deploy")
    graph.add_edge("deploy", "verify")
    graph.add_edge("verify", END)

    # Configure to interrupt before deploy node
    graph.config.interrupt_before = ["deploy"]

    print("Deploying to production...")
    result = await graph.execute({"environment": "production", "data": "v2.0"})

    if result.is_interrupted:
        print(f"PAUSED before: {result.interrupt.node_id}")
        print(f"Current state: prepared={result.final_state.get('prepared')}")
        print("\nThis allows review before sensitive operations!")
    print()


# =============================================================================
# Part 5: Complete Approval Workflow
# =============================================================================


async def example_complete_workflow():
    """A realistic approval workflow."""
    print("=== Part 5: Complete Approval Workflow ===\n")

    graph = StateGraph()

    async def create_request(inputs):
        return {
            "request_id": "REQ-001",
            "type": inputs.get("type", "change"),
            "description": inputs.get("description", ""),
            "status": "pending",
        }

    async def technical_review(inputs):
        approval = interrupt(
            {
                "step": "Technical Review",
                "request": inputs["request_id"],
                "description": inputs["description"],
                "question": "Is this technically feasible?",
            }
        )
        return {
            "tech_approved": approval == "approve",
            "tech_comments": "Reviewed by engineering",
        }

    async def manager_approval(inputs):
        if not inputs.get("tech_approved"):
            return {"status": "rejected", "reason": "Technical review failed"}

        approval = interrupt(
            {
                "step": "Manager Approval",
                "request": inputs["request_id"],
                "question": "Approve this change request?",
            }
        )
        return {
            "manager_approved": approval == "approve",
            "status": "approved" if approval == "approve" else "rejected",
        }

    async def finalize(inputs):
        status = inputs.get("status")
        return {
            "final_status": status,
            "message": f"Request {inputs['request_id']}: {status}",
        }

    graph.add_node("create", create_request)
    graph.add_node("tech", technical_review)
    graph.add_node("manager", manager_approval)
    graph.add_node("finalize", finalize)

    graph.add_edge(START, "create")
    graph.add_edge("create", "tech")
    graph.add_edge("tech", "manager")
    graph.add_edge("manager", "finalize")
    graph.add_edge("finalize", END)

    # Simulate the workflow
    print("Change Request Workflow")
    print("-" * 30)

    result = await graph.execute(
        {
            "type": "change",
            "description": "Update database schema",
        }
    )

    approvals = ["approve", "approve"]
    approval_idx = 0

    while result.is_interrupted and approval_idx < len(approvals):
        step = result.interrupt.interrupt.payload.get("step", "Unknown")
        question = result.interrupt.interrupt.payload.get("question", "")
        print(f"\n{step}: {question}")
        print(f"  -> {approvals[approval_idx]}")

        result = await graph.execute(
            Command(update=result.final_state, resume=approvals[approval_idx])
        )
        approval_idx += 1

    print(f"\nResult: {result.final_state.get('message')}")
    print()


# =============================================================================
# Main
# =============================================================================


async def main():
    """Run all tutorial parts."""
    print("=" * 60)
    print("Tutorial 09: Human-in-the-Loop")
    print("=" * 60)
    print()

    await example_basic_interrupt()
    await example_multi_step()
    await example_conditional_interrupt()
    await example_interrupt_before()
    await example_complete_workflow()

    print("=" * 60)
    print("Next: Tutorial 10 - Advanced Patterns")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())