Skip to content

Tutorial 06: Introduction to StateGraph

This tutorial covers:

  • What is a StateGraph and when to use it
  • Creating nodes and edges
  • Executing a simple graph
  • Understanding state flow

Prerequisites: Tutorial 05 (Agent Hooks) Difficulty: Intermediate

When to use StateGraph vs Agent:

  • Agent: Single LLM with tools, ReAct loop, simple tasks
  • StateGraph: Complex workflows, multiple steps, conditional logic, human-in-the-loop, multi-agent coordination

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 06: Introduction to StateGraph

This tutorial covers:
- What is a StateGraph and when to use it
- Creating nodes and edges
- Executing a simple graph
- Understanding state flow

Prerequisites: Tutorial 05 (Agent Hooks)
Difficulty: Intermediate

When to use StateGraph vs Agent:
- Agent: Single LLM with tools, ReAct loop, simple tasks
- StateGraph: Complex workflows, multiple steps, conditional logic,
              human-in-the-loop, multi-agent coordination
"""

import asyncio
import time

from config import get_model

from locus.agent import Agent
from locus.multiagent import END, START, StateGraph


def _llm_call(
    prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
    """Helper: real model call with timing/token banner — used by every Part."""
    agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
    t0 = time.perf_counter()
    res = agent.run_sync(prompt)
    dt = time.perf_counter() - t0
    print(
        f"  [model call: {dt:.2f}s · {res.metrics.prompt_tokens}{res.metrics.completion_tokens} tokens]"
    )
    return res.message.strip()


# =============================================================================
# Part 1: Your First Graph
# =============================================================================


async def example_first_graph():
    """Create the simplest possible graph — node calls a real Agent."""
    print("=== Part 1: Your First Graph ===\n")

    graph = StateGraph()

    async def greet(inputs):
        name = inputs.get("name", "World")
        ai_line = _llm_call(
            f"Greet {name} warmly in one sentence.",
            system="You are a friendly assistant.",
        )
        return {"greeting": ai_line}

    # Add the node
    graph.add_node("greet", greet)

    # Connect: START -> greet -> END
    graph.add_edge(START, "greet")
    graph.add_edge("greet", END)

    # Execute with initial state
    result = await graph.execute({"name": "Alice"})

    print("Input:   name = 'Alice'")
    print(f"Output:  greeting = '{result.final_state.get('greeting')}'")
    print(f"Success: {result.success}")
    print()


# =============================================================================
# Part 2: Multiple Nodes in Sequence
# =============================================================================


async def example_sequence():
    """Chain multiple nodes together."""
    print("=== Part 2: Sequential Nodes ===\n")

    graph = StateGraph()

    # Step 1: Validate input
    async def validate(inputs):
        text = inputs.get("text", "")
        return {
            "text": text.strip(),
            "is_valid": len(text.strip()) > 0,
        }

    # Step 2: Transform text
    async def transform(inputs):
        text = inputs.get("text", "")
        return {
            "uppercase": text.upper(),
            "word_count": len(text.split()),
        }

    # Step 3: Create summary using a real Agent
    async def summarize(inputs):
        wc = inputs.get("word_count")
        ai = _llm_call(
            f"In one short sentence, comment on a text with {wc} words "
            f"that was {'valid' if inputs.get('is_valid') else 'invalid'}.",
        )
        return {
            "summary": f"{wc} words, valid={inputs.get('is_valid')}{ai}",
        }

    graph.add_node("validate", validate)
    graph.add_node("transform", transform)
    graph.add_node("summarize", summarize)

    # Chain: START -> validate -> transform -> summarize -> END
    graph.add_edge(START, "validate")
    graph.add_edge("validate", "transform")
    graph.add_edge("transform", "summarize")
    graph.add_edge("summarize", END)

    result = await graph.execute({"text": "  hello world  "})

    print("Input:     text = '  hello world  '")
    print(f"Validated: is_valid = {result.final_state.get('is_valid')}")
    print(f"Uppercase: {result.final_state.get('uppercase')}")
    print(f"Summary:   {result.final_state.get('summary')}")
    print()


# =============================================================================
# Part 3: Understanding State Flow
# =============================================================================


async def example_state_flow():
    """See how state accumulates through nodes."""
    print("=== Part 3: State Flow ===\n")

    graph = StateGraph()

    async def step_a(inputs):
        print(f"  Step A receives: {list(inputs.keys())}")
        return {"a_output": "from A", "value": 10}

    async def step_b(inputs):
        print(f"  Step B receives: {list(inputs.keys())}")
        value = inputs.get("value", 0)
        return {"b_output": "from B", "doubled": value * 2}

    async def step_c(inputs):
        print(f"  Step C receives: {list(inputs.keys())}")
        # Final node delegates to a real Agent.
        doubled = inputs.get("doubled", 0)
        ai = _llm_call(
            f"Briefly comment on a graph that doubled the value to {doubled}.",
        )
        return {"c_output": "from C", "final": doubled + 5, "ai_comment": ai}

    graph.add_node("step_a", step_a)
    graph.add_node("step_b", step_b)
    graph.add_node("step_c", step_c)

    graph.add_edge(START, "step_a")
    graph.add_edge("step_a", "step_b")
    graph.add_edge("step_b", "step_c")
    graph.add_edge("step_c", END)

    print("Executing graph...")
    result = await graph.execute({"initial_data": True})

    print("\nFinal state:")
    for key, value in result.final_state.items():
        if not key.startswith("_"):  # Skip internal keys
            print(f"  {key}: {value}")
    print()


# =============================================================================
# Part 4: Parallel Nodes
# =============================================================================


async def example_parallel():
    """Execute independent nodes in parallel."""
    print("=== Part 4: Parallel Nodes ===\n")

    graph = StateGraph()
    graph.config.parallel = True  # Enable parallel execution

    async def analyze_sentiment(inputs):
        text = inputs.get("text", "")
        label = _llm_call(
            f"Classify the sentiment of '{text}' as positive, negative, or "
            "neutral. Reply with one word.",
            system="Output one of: positive | negative | neutral. Nothing else.",
            max_tokens=10,
        )
        return {"sentiment": label.lower()}

    async def count_words(inputs):
        text = inputs.get("text", "")
        await asyncio.sleep(0.1)
        return {"word_count": len(text.split())}

    async def detect_language(inputs):
        # Simplified - always returns English
        await asyncio.sleep(0.1)
        return {"language": "en"}

    async def combine_results(inputs):
        return {
            "analysis": {
                "sentiment": inputs.get("sentiment"),
                "words": inputs.get("word_count"),
                "lang": inputs.get("language"),
            }
        }

    graph.add_node("sentiment", analyze_sentiment)
    graph.add_node("words", count_words)
    graph.add_node("language", detect_language)
    graph.add_node("combine", combine_results)

    # Fan-out: START -> [sentiment, words, language]
    graph.add_edge(START, "sentiment")
    graph.add_edge(START, "words")
    graph.add_edge(START, "language")

    # Fan-in: [sentiment, words, language] -> combine
    graph.add_edge("sentiment", "combine")
    graph.add_edge("words", "combine")
    graph.add_edge("language", "combine")

    graph.add_edge("combine", END)

    import time

    start = time.time()
    result = await graph.execute({"text": "This is a great example!"})
    elapsed = (time.time() - start) * 1000

    print("Input: 'This is a great example!'")
    print(f"Analysis: {result.final_state.get('analysis')}")
    print(f"Time: {elapsed:.0f}ms (parallel nodes run concurrently)")
    print()


# =============================================================================
# Part 5: Graph Results and Metadata
# =============================================================================


async def example_results():
    """Explore the GraphResult structure."""
    print("=== Part 5: Graph Results ===\n")

    graph = StateGraph()

    async def process(inputs):
        # Even Part 5's tiny graph drives a real LLM call.
        v = inputs.get("value", 0)
        comment = _llm_call(f"In one sentence, comment on doubling {v}.")
        return {"processed": True, "result": v * 2, "comment": comment}

    graph.add_node("process", process)
    graph.add_edge(START, "process")
    graph.add_edge("process", END)

    result = await graph.execute({"value": 21})

    print("GraphResult fields:")
    print(f"  .success         = {result.success}")
    print(f"  .graph_id        = {result.graph_id}")
    print(f"  .duration_ms     = {result.duration_ms:.1f}")
    print(f"  .iterations      = {result.iterations}")
    print(f"  .execution_order = {result.execution_order}")

    print("\n  .final_state:")
    for k, v in result.final_state.items():
        if not k.startswith("_"):
            print(f"    {k}: {v}")

    print("\n  .node_results:")
    for node_id, node_result in result.node_results.items():
        print(f"    {node_id}: status={node_result.status.value}")
    print()


async def example_streaming():
    """Stream node-completion events in real time + emit custom progress."""
    print("=== Part 6: Real-time streaming + emit_custom ===\n")
    from locus.multiagent import StreamMode, emit_custom

    graph = StateGraph()

    async def step1(inputs):
        # CUSTOM events stream out while a real Agent is generating.
        await emit_custom({"phase": "starting", "value": inputs.get("x", 0)})
        ai = _llm_call(
            f"In one sentence, congratulate someone who reached {inputs.get('x', 0) * 2}.",
        )
        await emit_custom({"phase": "halfway"})
        return {"y": inputs.get("x", 0) * 2, "ai": ai}

    async def step2(inputs):
        ai = _llm_call(
            f"In one short sentence, narrate adding 10 to {inputs.get('y', 0)}.",
        )
        return {"z": inputs.get("y", 0) + 10, "ai": ai}

    graph.add_node("step1", step1)
    graph.add_node("step2", step2)
    graph.add_edge(START, "step1")
    graph.add_edge("step1", "step2")
    graph.add_edge("step2", END)

    print("Streaming UPDATES + CUSTOM events as they arrive:")
    async for event in graph.stream({"x": 21}, mode=StreamMode.UPDATES):
        if event.mode == StreamMode.CUSTOM:
            print(f"  [CUSTOM]  {event.node_id}: {event.data}")
        else:
            print(f"  [UPDATE]  {event.node_id}: {event.data}")
    print()


# =============================================================================
# Part 7: Calling a real LLM from inside a graph node
# =============================================================================


async def example_graph_with_llm():
    """A graph node that delegates work to a real Agent."""
    print("=== Part 7: Graph + real LLM ===\n")

    graph = StateGraph()

    async def ai_summarize(inputs):
        import time as _t

        topic = inputs.get("topic", "")
        agent = Agent(
            model=get_model(max_tokens=80),
            system_prompt="You write one-sentence factual summaries.",
        )
        t0 = _t.perf_counter()
        result = agent.run_sync(f"Summarize the topic '{topic}' in one sentence.")
        dt = _t.perf_counter() - t0
        print(
            f"  [model call: {dt:.2f}s · {result.metrics.prompt_tokens}{result.metrics.completion_tokens} tokens]"
        )
        return {"summary": result.message}

    graph.add_node("summarize", ai_summarize)
    graph.add_edge(START, "summarize")
    graph.add_edge("summarize", END)

    result = await graph.execute({"topic": "Oracle Cloud Generative AI"})
    print(f"AI summary: {result.final_state.get('summary')}")
    print()


# =============================================================================
# Main
# =============================================================================


async def main():
    """Run all tutorial parts."""
    print("=" * 60)
    print("Tutorial 06: Introduction to StateGraph")
    print("=" * 60)
    print()

    await example_first_graph()
    await example_sequence()
    await example_state_flow()
    await example_parallel()
    await example_results()
    await example_streaming()
    await example_graph_with_llm()

    print("=" * 60)
    print("Next: Tutorial 07 - Conditional Routing")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())