Skip to content

Incident Response

Models the loop a real on-call engineer runs when a page fires::

Page fires
  │
  └──> Triage  ──>  scatter to 3 parallel investigators
                      ├── log analyst
                      ├── metric analyst
                      └── trace analyst
                      ▼
               Synthesizer (root-cause hypothesis)
                      │
                      ▼
        Severity gate ─── critical? ──> page humans (interrupt)
                      │                     │
                      │                  approve mitigation? yes/no
                      │                     │
                      ▼                     ▼
                   Mitigator <──────────────┘
                      │
                      ▼
                   Postmortem (structured)
  • Send: fan out to 3 investigator Agents in parallel.
  • add_conditional_edges: severity-based routing decides auto-mitigate vs escalate to a human.
  • interrupt(): critical severity pauses for explicit human approval before any mitigation runs.
  • output_schema=Postmortem: the final report is a typed Pydantic instance, ready to file into a runbook database.

Run it (OCI Generative AI is the default; auto-detected from ~/.oci/config):

python examples/notebook_63_incident_response.py

Offline:

LOCUS_MODEL_PROVIDER=mock python examples/notebook_63_incident_response.py

Pin a strong-enough model for the structured postmortem schema:

LOCUS_MODEL_ID=openai.gpt-4.1 python examples/notebook_63_incident_response.py

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Notebook 58: Incident-response runbook (SRE workflow).

Models the loop a real on-call engineer runs when a page fires::

    Page fires

      └──> Triage  ──>  scatter to 3 parallel investigators
                          ├── log analyst
                          ├── metric analyst
                          └── trace analyst

                   Synthesizer (root-cause hypothesis)


            Severity gate ─── critical? ──> page humans (interrupt)
                          │                     │
                          │                  approve mitigation? yes/no
                          │                     │
                          ▼                     ▼
                       Mitigator <──────────────┘


                       Postmortem (structured)

- Send: fan out to 3 investigator Agents in parallel.
- add_conditional_edges: severity-based routing decides auto-mitigate
  vs escalate to a human.
- interrupt(): critical severity pauses for explicit human approval
  before any mitigation runs.
- output_schema=Postmortem: the final report is a typed Pydantic
  instance, ready to file into a runbook database.

Run it
    # Default: OCI Generative AI auto-detected from ~/.oci/config
    python examples/notebook_63_incident_response.py

    # Offline / no credentials:
    LOCUS_MODEL_PROVIDER=mock python examples/notebook_63_incident_response.py

    # Pin a strong-enough model for the structured postmortem schema:
    LOCUS_MODEL_ID=openai.gpt-4.1 python examples/notebook_63_incident_response.py
"""

from __future__ import annotations

import asyncio
from typing import Any

from config import get_model
from pydantic import BaseModel, Field

from locus.agent import Agent, AgentConfig
from locus.core import Command, interrupt
from locus.core.events import TerminateEvent
from locus.core.send import Send
from locus.multiagent.graph import END, START, StateGraph


# Data shapes for the typed workflow state.


class Postmortem(BaseModel):
    """Structured incident postmortem the workflow emits."""

    incident_id: str
    severity: str = Field(description="info | warn | critical")
    root_cause_hypothesis: str
    contributing_factors: list[str]
    mitigation_applied: str
    follow_up_actions: list[str]


class InvestigatorReport(BaseModel):
    source: str  # "logs" | "metrics" | "traces"
    findings: list[str]
    confidence: float = Field(ge=0.0, le=1.0)


# Specialist prompts — one role per workflow node.


PROMPTS = {
    "triage": (
        "You are an SRE triage bot. Given an incident description, classify "
        "severity as one of: info, warn, critical. Reply with one word."
    ),
    "logs": (
        "You are a log-analysis bot. List 1–3 concrete error patterns or "
        "warning sequences a human should investigate. Be specific. Bullets only."
    ),
    "metrics": (
        "You are a metrics-analysis bot. List 1–3 specific anomalies (latency "
        "spikes, error rate, saturation, traffic) that match the symptom. Bullets only."
    ),
    "traces": (
        "You are a distributed-trace bot. Identify 1–3 specific code paths or "
        "downstream calls likely involved. Bullets only."
    ),
    "synthesize": (
        "You are an incident commander. Given investigator reports, name the "
        "single most likely root-cause hypothesis. One sentence."
    ),
    "mitigate": (
        "You are an automated remediator. Given a root-cause hypothesis, "
        "propose ONE specific mitigation step. One sentence."
    ),
}


def _make_agent(role: str, model: Any) -> Agent:
    return Agent(
        config=AgentConfig(
            agent_id=f"sre-{role}",
            model=model,
            system_prompt=PROMPTS[role],
            max_iterations=2,
            max_tokens=300,
        )
    )


async def _run(agent: Agent, prompt: str) -> str:
    final = ""
    async for event in agent.run(prompt):
        if isinstance(event, TerminateEvent):
            final = event.final_message or ""
    return final.strip()


# Graph nodes.


async def triage(state: dict[str, Any]) -> dict[str, Any]:
    """Classify severity by asking the triage agent."""
    symptom = state.get("symptom", "")
    agent = _make_agent("triage", state["__model__"])
    raw = await _run(agent, f"Incident: {symptom}")
    sev = (raw.lower().split() or ["warn"])[0].strip(".,!?:;")
    severity = sev if sev in {"info", "warn", "critical"} else "warn"
    print(f"  [triage] agent classified severity={severity!r} (raw={raw!r})")
    return {"severity": severity}


async def scatter_investigators(state: dict[str, Any]) -> list[Send]:
    """Fan out to three investigators in parallel."""
    return [
        Send(node="investigate", payload={"source": s}, metadata={"source": s})
        for s in ("logs", "metrics", "traces")
    ]


async def investigate(state: dict[str, Any]) -> dict[str, Any]:
    source = state["source"]
    agent = _make_agent(source, state["__model__"])
    findings_text = await _run(
        agent, f"Symptom: {state['symptom']}\n\nFrom {source}, what do you see?"
    )
    findings = [line.lstrip("- *•").strip() for line in findings_text.splitlines() if line.strip()]
    return {
        "report": InvestigatorReport(
            source=source,
            findings=findings or [findings_text or f"(no {source} signal)"],
            confidence=0.7,
        )
    }


async def synthesize(state: dict[str, Any]) -> dict[str, Any]:
    reports = [v["report"] for v in state.values() if isinstance(v, dict) and "report" in v]
    bullets = "\n".join(f"[{r.source}] " + "; ".join(r.findings) for r in reports)
    agent = _make_agent("synthesize", state["__model__"])
    hypothesis = await _run(
        agent, f"Symptom: {state['symptom']}\n\nInvestigator findings:\n{bullets}"
    )
    return {"hypothesis": hypothesis, "reports": reports}


def severity_gate(state: dict[str, Any]) -> str:
    return "page_human" if state.get("severity") == "critical" else "auto_mitigate"


async def page_human(state: dict[str, Any]) -> dict[str, Any]:
    """Critical incidents pause for human approval before applying mitigation."""
    response = interrupt(
        {
            "type": "approval",
            "question": (
                f"CRITICAL incident — apply auto-mitigation for: "
                f"{state.get('hypothesis', '(no hypothesis)')!r}?"
            ),
            "options": ["yes", "no"],
            "incident": state.get("symptom"),
        }
    )
    return {"human_approved": response == "yes"}


async def mitigate(state: dict[str, Any]) -> dict[str, Any]:
    if state.get("severity") == "critical" and not state.get("human_approved"):
        return {"mitigation": "(skipped — human declined)"}
    agent = _make_agent("mitigate", state["__model__"])
    mitigation = await _run(
        agent, f"Hypothesis: {state.get('hypothesis', '')}\n\nProposed mitigation?"
    )
    return {"mitigation": mitigation}


async def write_postmortem(state: dict[str, Any]) -> dict[str, Any]:
    """Emit the structured Postmortem via ``Agent.output_schema=Postmortem``.

    The agent reads the workflow's accumulated state (severity, hypothesis,
    investigator reports, mitigation) and emits a Pydantic ``Postmortem``.
    ``result.parsed`` must be populated — if the model can't honor the JSON
    schema we surface that as a hard error rather than fabricating a record.
    """
    import asyncio as _asyncio

    reports: list[InvestigatorReport] = state.get("reports", [])
    bullets = "\n".join(f"- [{r.source}] {'; '.join(r.findings)}" for r in reports)
    agent = Agent(
        config=AgentConfig(
            agent_id="postmortem-writer",
            model=state["__model__"],
            system_prompt=(
                "You are an SRE writing a postmortem. Produce a Postmortem "
                "object. Be terse and factual."
            ),
            output_schema=Postmortem,
            max_iterations=2,
            max_tokens=400,
        )
    )
    prompt = (
        f"Incident:   {state.get('incident_id', 'INC-0001')}\n"
        f"Severity:   {state.get('severity', 'warn')}\n"
        f"Symptom:    {state.get('symptom', '')}\n"
        f"Hypothesis: {state.get('hypothesis', '(unknown)')}\n"
        f"Findings:\n{bullets}\n"
        f"Mitigation: {state.get('mitigation', '(none)')}\n\n"
        "Emit the Postmortem now."
    )
    last_exc: BaseException | None = None
    result = None
    for attempt in range(3):
        try:
            result = await _asyncio.to_thread(agent.run_sync, prompt)
            break
        except Exception as exc:  # noqa: BLE001 — retry transient OCI flakiness
            last_exc = exc
            await _asyncio.sleep(0.5 * (attempt + 1))
    if result is None:
        raise RuntimeError(
            f"Postmortem writer failed after 3 attempts. Last error: {last_exc!r}"
        ) from last_exc
    pm = result.parsed
    if pm is None:
        raise RuntimeError(
            "Postmortem writer returned no parsed Postmortem. The configured "
            "model could not honor the JSON schema. Use a stronger model "
            "(e.g. openai.gpt-4o, openai.gpt-5, anthropic.claude-3-5-sonnet) "
            f"for notebook 57. Raw output: {result.message!r}"
        )
    return {"postmortem": pm}


# Build the runbook graph.


def build_runbook() -> StateGraph:
    g = StateGraph(name="incident-runbook")
    g.add_node("triage", triage)
    g.add_node("scatter", scatter_investigators)
    g.add_node("investigate", investigate)
    g.add_node("synthesize", synthesize)
    g.add_node("page_human", page_human)
    g.add_node("auto_mitigate", mitigate)
    g.add_node("postmortem", write_postmortem)

    g.add_edge(START, "triage")
    g.add_edge("triage", "scatter")
    g.add_edge("scatter", "synthesize")
    g.add_conditional_edges(
        "synthesize",
        severity_gate,
        targets={"page_human": "page_human", "auto_mitigate": "auto_mitigate"},
    )
    g.add_edge("page_human", "auto_mitigate")
    g.add_edge("auto_mitigate", "postmortem")
    g.add_edge("postmortem", END)
    return g


# Driver.


def _print_postmortem(pm: Postmortem | None) -> None:
    print("\nPostmortem:")
    print("-" * 60)
    if pm is None:
        print("(missing)")
        return
    print(f"  Incident:        {pm.incident_id}")
    print(f"  Severity:        {pm.severity}")
    print(f"  Root cause:      {pm.root_cause_hypothesis}")
    print(f"  Mitigation:      {pm.mitigation_applied}")
    print(
        f"  Contributing:    " + ("\n                   ".join(pm.contributing_factors) or "(none)")
    )
    print(f"  Follow-ups:      " + "\n                   ".join(pm.follow_up_actions))


async def main() -> None:
    print("Notebook 58: Incident response runbook")
    print("=" * 60)

    model = get_model()
    graph = build_runbook()

    # No keyword pre-filter — severity comes entirely from the triage
    # agent's classification, so the human-approval branch only fires
    # when the model itself calls the incident critical.
    initial = {
        "incident_id": "INC-2026-0517",
        "symptom": (
            "API p99 latency is 8x its baseline and approximately 12% of "
            "requests return 503. The regression started at 04:12 UTC and is "
            "ongoing. Customer-facing checkout is degraded for paying users "
            "across multiple regions; on-call has been paged."
        ),
        "__model__": model,
    }

    print(f"\nIncident: {initial['symptom']}\n")

    result = await graph.execute(initial)

    if result.interrupt:
        # Critical path — workflow paused for human approval. Resume
        # with the operator's decision.
        payload = result.interrupt.interrupt.payload
        print(f"  ⏸  PAGED: {payload.get('question')}")
        print("  ▶  Operator responds: 'yes'")
        result = await graph.execute(
            Command(resume="yes", update={**result.final_state, "__model__": model})
        )

    print(
        f"\nWorkflow finished in {result.duration_ms:.0f} ms across "
        f"{result.iterations} graph iterations"
    )
    _print_postmortem(result.final_state.get("postmortem"))


if __name__ == "__main__":
    asyncio.run(main())