Skip to content

Tutorial 47: Procurement approval with tiered human gates

Real procurement workflows have a threshold-based escalation chain:

Request submitted
   │
   ▼
Justifier  (drafts business justification)
   │
   ▼
Vendor analyst  (validates vendor + pricing)
   │
   ▼
Tier router   ── < $1k     ──> auto-approve
              ── $1k-$10k  ──> manager approval (interrupt)
              ── $10k-$100k──> manager + finance approval (two interrupts)
              ── > $100k   ──> manager + finance + CFO approval (three interrupts)
   │
   ▼
PO generator  (emits structured PurchaseOrder)

Each approval gate is a separate interrupt() so a real reviewer can come back to it later. The workflow ends with a typed PurchaseOrder Pydantic model that can be filed straight into an ERP without parsing.

Differentiated Locus pieces:

  • The tier router is a plain conditional edge — no DSL, no policy file.
  • Each gate is its own node — easy to add a fourth tier, easy to re-order, easy to swap out the human gate for an automated rule.
  • output_schema=PurchaseOrder keeps the workflow's terminal artifact typed end-to-end.

Run::

python examples/tutorial_47_procurement_approval.py

Difficulty: Advanced Prerequisites: tutorial_45 (HITL multi-agent)

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Tutorial 47: Procurement approval with tiered human gates.

Real procurement workflows have a *threshold-based escalation chain*:

    Request submitted


    Justifier  (drafts business justification)


    Vendor analyst  (validates vendor + pricing)


    Tier router   ── < $1k     ──> auto-approve
                  ── $1k-$10k  ──> manager approval (interrupt)
                  ── $10k-$100k──> manager + finance approval (two interrupts)
                  ── > $100k   ──> manager + finance + CFO approval (three interrupts)


    PO generator  (emits structured PurchaseOrder)

Each approval gate is a separate ``interrupt()`` so a real reviewer
can come back to it later. The workflow ends with a typed
``PurchaseOrder`` Pydantic model that can be filed straight into
an ERP without parsing.

Differentiated Locus pieces:

* The tier router is a plain conditional edge — no DSL, no policy file.
* Each gate is *its own node* — easy to add a fourth tier, easy to
  re-order, easy to swap out the human gate for an automated rule.
* ``output_schema=PurchaseOrder`` keeps the workflow's terminal
  artifact typed end-to-end.

Run::

    python examples/tutorial_47_procurement_approval.py

Difficulty: Advanced
Prerequisites: tutorial_45 (HITL multi-agent)
"""

from __future__ import annotations

import asyncio
from typing import Any

from config import get_model
from pydantic import BaseModel, Field

from locus.agent import Agent, AgentConfig
from locus.core import Command, interrupt
from locus.core.events import TerminateEvent
from locus.multiagent.graph import END, START, StateGraph


# ---------------------------------------------------------------------------
# Data shapes
# ---------------------------------------------------------------------------


class PurchaseOrder(BaseModel):
    """Final structured artifact filed into ERP."""

    request_id: str
    vendor: str
    item: str
    amount_usd: float
    business_justification: str
    vendor_assessment: str
    approvals: list[str] = Field(description="ordered list of approver titles")
    approved_at: str
    status: str = Field(description="approved | denied")


# ---------------------------------------------------------------------------
# Specialists
# ---------------------------------------------------------------------------


PROMPTS = {
    "justify": (
        "You are a procurement analyst. Given an item and use-case, write a "
        "two-sentence business justification."
    ),
    "vendor": (
        "You are a vendor-risk analyst. Given a vendor name and an item, write "
        "a one-paragraph assessment covering: financial stability, data-handling "
        "posture (if applicable), and pricing reasonableness for this category."
    ),
}


def _make_agent(role: str, model: Any) -> Agent:
    return Agent(
        config=AgentConfig(
            agent_id=f"proc-{role}",
            model=model,
            system_prompt=PROMPTS[role],
            max_iterations=2,
            max_tokens=300,
        )
    )


async def _run(agent: Agent, prompt: str) -> str:
    final = ""
    async for event in agent.run(prompt):
        if isinstance(event, TerminateEvent):
            final = event.final_message or ""
    return final.strip()


# ---------------------------------------------------------------------------
# Nodes
# ---------------------------------------------------------------------------


async def justify(state: dict[str, Any]) -> dict[str, Any]:
    agent = _make_agent("justify", state["__model__"])
    text = await _run(agent, f"Item: {state['item']}\nUse-case: {state['use_case']}")
    return {"justification": text}


async def assess_vendor(state: dict[str, Any]) -> dict[str, Any]:
    agent = _make_agent("vendor", state["__model__"])
    text = await _run(agent, f"Vendor: {state['vendor']}\nItem: {state['item']}")
    return {"vendor_assessment": text}


def tier_router(state: dict[str, Any]) -> str:
    """Route by amount — each tier picks up the prior tier's approvals."""
    amt = float(state.get("amount_usd", 0.0))
    if amt < 1_000:
        return "auto"
    if amt < 10_000:
        return "manager"
    if amt < 100_000:
        return "finance"
    return "cfo"


async def approve_manager(state: dict[str, Any]) -> dict[str, Any]:
    decision = interrupt(
        {
            "type": "approval",
            "tier": "manager",
            "question": (
                f"Manager approval needed for ${state['amount_usd']:,.2f} "
                f"({state['vendor']}{state['item']}). Approve?"
            ),
            "options": ["yes", "no"],
        }
    )
    return _record_decision(state, "Manager", decision)


async def approve_finance(state: dict[str, Any]) -> dict[str, Any]:
    decision = interrupt(
        {
            "type": "approval",
            "tier": "finance",
            "question": (
                f"Finance approval needed for ${state['amount_usd']:,.2f} "
                f"({state['vendor']}). Approve?"
            ),
            "options": ["yes", "no"],
        }
    )
    return _record_decision(state, "Finance Director", decision)


async def approve_cfo(state: dict[str, Any]) -> dict[str, Any]:
    decision = interrupt(
        {
            "type": "approval",
            "tier": "cfo",
            "question": (
                f"CFO approval needed for ${state['amount_usd']:,.2f} ({state['vendor']}). Approve?"
            ),
            "options": ["yes", "no"],
        }
    )
    return _record_decision(state, "CFO", decision)


def _record_decision(state: dict[str, Any], role: str, decision: str) -> dict[str, Any]:
    approvals: list[str] = list(state.get("approvals", []))
    if decision == "yes":
        approvals.append(role)
        return {"approvals": approvals, "status": "pending"}
    return {"approvals": approvals, "status": "denied"}


def gate_after_manager(state: dict[str, Any]) -> str:
    """If denied at any tier, jump straight to the PO node (which marks denied)."""
    if state.get("status") == "denied":
        return "emit_po"
    amt = float(state.get("amount_usd", 0.0))
    if amt >= 10_000:
        return "approve_finance"
    return "emit_po"


def gate_after_finance(state: dict[str, Any]) -> str:
    if state.get("status") == "denied":
        return "emit_po"
    amt = float(state.get("amount_usd", 0.0))
    if amt >= 100_000:
        return "approve_cfo"
    return "emit_po"


async def auto_approve(state: dict[str, Any]) -> dict[str, Any]:
    return {"approvals": ["AUTO (under threshold)"], "status": "pending"}


async def emit_po(state: dict[str, Any]) -> dict[str, Any]:
    """Build the PO via ``Agent.output_schema=PurchaseOrder``.

    A real ERP integration generates the PO record from the workflow's
    accumulated state. Routing through an Agent with ``output_schema``
    means the artifact is a typed Pydantic model — the workflow can
    POST it directly to the ERP without parsing.
    """
    import asyncio as _asyncio

    final_status = "approved" if state.get("status") != "denied" else "denied"
    agent = Agent(
        config=AgentConfig(
            agent_id="po-emitter",
            model=state["__model__"],
            system_prompt=(
                "You are a procurement-ops officer producing a PurchaseOrder. "
                "Use the supplied fields verbatim. Don't invent vendors or amounts."
            ),
            output_schema=PurchaseOrder,
            max_iterations=2,
            max_tokens=300,
        )
    )
    prompt = (
        f"Request: {state.get('request_id')}\n"
        f"Vendor: {state['vendor']}\n"
        f"Item: {state['item']}\n"
        f"Amount: {float(state['amount_usd'])}\n"
        f"Status: {final_status}\n"
        f"Approvals: {state.get('approvals', [])}\n"
        f"Justification: {state.get('justification', '')[:200]}\n"
        f"Vendor assessment: {state.get('vendor_assessment', '')[:200]}\n\n"
        "Emit the PurchaseOrder."
    )
    last_exc: BaseException | None = None
    result = None
    for attempt in range(3):
        try:
            result = await _asyncio.to_thread(agent.run_sync, prompt)
            break
        except Exception as exc:  # noqa: BLE001 — retry transient OCI flakiness
            last_exc = exc
            await _asyncio.sleep(0.5 * (attempt + 1))
    if result is None:
        raise RuntimeError(
            f"PO emitter failed after 3 attempts. Last error: {last_exc!r}"
        ) from last_exc
    po = result.parsed
    if po is None:
        raise RuntimeError(
            "PO emitter returned no parsed PurchaseOrder. The configured model "
            "could not honor the JSON schema. Use a stronger model "
            "(e.g. openai.gpt-4o, openai.gpt-5, anthropic.claude-3-5-sonnet) "
            f"for tutorial 47. Raw output: {result.message!r}"
        )
    return {"purchase_order": po}


# ---------------------------------------------------------------------------
# Graph
# ---------------------------------------------------------------------------


def build_procurement_graph() -> StateGraph:
    g = StateGraph(name="procurement-approval")
    g.add_node("justify", justify)
    g.add_node("assess_vendor", assess_vendor)
    g.add_node("auto_approve", auto_approve)
    g.add_node("approve_manager", approve_manager)
    g.add_node("approve_finance", approve_finance)
    g.add_node("approve_cfo", approve_cfo)
    g.add_node("emit_po", emit_po)

    g.add_edge(START, "justify")
    g.add_edge("justify", "assess_vendor")
    g.add_conditional_edges(
        "assess_vendor",
        tier_router,
        targets={
            "auto": "auto_approve",
            "manager": "approve_manager",
            "finance": "approve_manager",
            "cfo": "approve_manager",
        },
    )
    g.add_edge("auto_approve", "emit_po")
    g.add_conditional_edges(
        "approve_manager",
        gate_after_manager,
        targets={"approve_finance": "approve_finance", "emit_po": "emit_po"},
    )
    g.add_conditional_edges(
        "approve_finance",
        gate_after_finance,
        targets={"approve_cfo": "approve_cfo", "emit_po": "emit_po"},
    )
    g.add_edge("approve_cfo", "emit_po")
    g.add_edge("emit_po", END)
    return g


# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------


def _print_po(po: PurchaseOrder | None) -> None:
    print("\nPurchase order:")
    print("-" * 60)
    if po is None:
        print("(missing)")
        return
    print(f"  Request:        {po.request_id}")
    print(f"  Status:         {po.status}")
    print(f"  Vendor:         {po.vendor}")
    print(f"  Item:           {po.item}")
    print(f"  Amount:         ${po.amount_usd:,.2f}")
    print(f"  Approvals:      " + (" → ".join(po.approvals) if po.approvals else "(none)"))
    print(f"  Justification:  {po.business_justification[:120]}")


async def _drive(graph: StateGraph, initial: dict[str, Any], answers: list[str]) -> Any:
    """Run the graph, auto-resuming any interrupts with the given answers."""
    result = await graph.execute(initial)
    answer_idx = 0
    while result.interrupt:
        answer = answers[answer_idx] if answer_idx < len(answers) else "yes"
        answer_idx += 1
        payload = result.interrupt.interrupt.payload
        print(f"  ⏸  [{payload.get('tier', '?')}] {payload.get('question')}")
        print(f"  ▶  Reviewer responds: {answer!r}")
        result = await graph.execute(
            Command(resume=answer, update={**result.final_state, "__model__": initial["__model__"]})
        )
    return result


async def main() -> None:
    print("Tutorial 47: Procurement approval with tiered human gates")
    print("=" * 60)

    model = get_model()
    graph = build_procurement_graph()

    scenarios = [
        ("REQ-1001", "Acme Corp", "USB hubs (10x)", 280.00, "office equipment refresh"),
        ("REQ-1002", "DataDog", "APM annual subscription", 9_500.00, "production observability"),
        ("REQ-1003", "Salesforce", "Sales Cloud (50 seats, 1 yr)", 75_000.00, "GTM ramp"),
        ("REQ-1004", "Oracle", "Exadata cloud — 12-month commit", 480_000.00, "DB platform"),
    ]

    for req_id, vendor, item, amt, use_case in scenarios:
        print(f"\n--- {req_id}: ${amt:,.2f}{vendor}{item} ---")
        initial = {
            "request_id": req_id,
            "vendor": vendor,
            "item": item,
            "amount_usd": amt,
            "use_case": use_case,
            "__model__": model,
        }
        result = await _drive(graph, initial, ["yes", "yes", "yes"])
        _print_po(result.final_state.get("purchase_order"))


if __name__ == "__main__":
    asyncio.run(main())