Skip to content

Tutorial 08: State Reducers

This tutorial covers:

  • What are reducers and why use them
  • Built-in reducers (add_messages, merge_dict, etc.)
  • Creating custom reducers
  • Using reducers with Pydantic models

Prerequisites: Tutorial 07 (Conditional Routing) Difficulty: Intermediate-Advanced

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 08: State Reducers

This tutorial covers:
- What are reducers and why use them
- Built-in reducers (add_messages, merge_dict, etc.)
- Creating custom reducers
- Using reducers with Pydantic models

Prerequisites: Tutorial 07 (Conditional Routing)
Difficulty: Intermediate-Advanced
"""

import asyncio
import time
from typing import Annotated

from config import get_model
from pydantic import BaseModel

from locus.agent import Agent
from locus.core import (
    Message,
    add_messages,
    add_numbers,
    append_list,
    last_value,
    merge_dict,
)
from locus.core.reducers import reducer
from locus.multiagent import END, START, StateGraph


def _llm_call(
    prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 60
) -> str:
    """Helper: real model call with timing/token banner — used by every Part."""
    agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
    t0 = time.perf_counter()
    res = agent.run_sync(prompt)
    dt = time.perf_counter() - t0
    print(
        f"  [model call: {dt:.2f}s · {res.metrics.prompt_tokens}{res.metrics.completion_tokens} tokens]"
    )
    return res.message.strip()


# =============================================================================
# Part 1: Why Reducers?
# =============================================================================


async def example_without_reducers():
    """The problem: state gets overwritten."""
    print("=== Part 1: The Problem Without Reducers ===\n")
    note = _llm_call("In one sentence, explain why a graph that overwrites state can lose data.")
    print(f"AI note: {note}")

    graph = StateGraph()

    async def node_a(inputs):
        return {"items": ["apple"]}

    async def node_b(inputs):
        # This OVERWRITES items, losing "apple"!
        return {"items": ["banana"]}

    async def node_c(inputs):
        return {"items": ["cherry"]}

    graph.add_node("a", node_a)
    graph.add_node("b", node_b)
    graph.add_node("c", node_c)

    graph.add_edge(START, "a")
    graph.add_edge("a", "b")
    graph.add_edge("b", "c")
    graph.add_edge("c", END)

    result = await graph.execute({})
    print(f"Without reducers: items = {result.final_state.get('items')}")
    print("  (Only 'cherry' - we lost 'apple' and 'banana'!)")
    print()


async def example_with_reducers():
    """The solution: reducers compose state updates."""
    print("=== Part 1b: With Reducers ===\n")
    note = _llm_call("In one sentence, explain what a reducer does in a state graph.")
    print(f"AI note: {note}")

    # Define state with a reducer
    class AppState(BaseModel):
        items: Annotated[list, append_list] = []

    graph = StateGraph(state_schema=AppState)

    async def node_a(inputs):
        return {"items": ["apple"]}

    async def node_b(inputs):
        return {"items": ["banana"]}

    async def node_c(inputs):
        return {"items": ["cherry"]}

    graph.add_node("a", node_a)
    graph.add_node("b", node_b)
    graph.add_node("c", node_c)

    graph.add_edge(START, "a")
    graph.add_edge("a", "b")
    graph.add_edge("b", "c")
    graph.add_edge("c", END)

    result = await graph.execute({})
    print(f"With append_list reducer: items = {result.final_state.get('items')}")
    print("  (All three items preserved!)")
    print()


# =============================================================================
# Part 2: Built-in Reducers
# =============================================================================


async def example_builtin_reducers():
    """Demonstrate the built-in reducers."""
    print("=== Part 2: Built-in Reducers ===\n")
    note = _llm_call(
        "In one sentence, when would you use add_messages vs add_numbers in a Locus graph?"
    )
    print(f"AI note: {note}")

    # 1. add_messages - for conversation history
    class ChatState(BaseModel):
        messages: Annotated[list, add_messages] = []
        total_tokens: Annotated[int, add_numbers] = 0

    graph = StateGraph(state_schema=ChatState)

    async def user_turn(inputs):
        return {
            "messages": [Message.user("Hello!")],
            "total_tokens": 5,
        }

    async def assistant_turn(inputs):
        return {
            "messages": [Message.assistant("Hi there!")],
            "total_tokens": 8,
        }

    graph.add_node("user", user_turn)
    graph.add_node("assistant", assistant_turn)
    graph.add_edge(START, "user")
    graph.add_edge("user", "assistant")
    graph.add_edge("assistant", END)

    result = await graph.execute({})

    print("add_messages reducer:")
    messages = result.final_state.get("messages", [])
    for msg in messages:
        print(f"  [{msg.role.value}] {msg.content}")

    print("\nadd_numbers reducer:")
    print(f"  total_tokens = {result.final_state.get('total_tokens')}")
    print()


async def example_merge_dict():
    """The merge_dict reducer."""
    print("=== Part 2b: merge_dict Reducer ===\n")
    note = _llm_call("In one sentence, give an example use-case for merge_dict.")
    print(f"AI note: {note}")

    class ConfigState(BaseModel):
        config: Annotated[dict, merge_dict] = {}

    graph = StateGraph(state_schema=ConfigState)

    async def set_defaults(inputs):
        return {"config": {"debug": False, "timeout": 30, "retries": 3}}

    async def override_debug(inputs):
        return {"config": {"debug": True}}  # Only changes debug

    async def override_timeout(inputs):
        return {"config": {"timeout": 60}}  # Only changes timeout

    graph.add_node("defaults", set_defaults)
    graph.add_node("debug", override_debug)
    graph.add_node("timeout", override_timeout)

    graph.add_edge(START, "defaults")
    graph.add_edge("defaults", "debug")
    graph.add_edge("debug", "timeout")
    graph.add_edge("timeout", END)

    result = await graph.execute({})
    print(f"Final config: {result.final_state.get('config')}")
    print("  (All settings merged together)")
    print()


# =============================================================================
# Part 3: Custom Reducers
# =============================================================================


async def example_custom_reducer():
    """Create your own reducer."""
    print("=== Part 3: Custom Reducers ===\n")
    note = _llm_call("In one sentence, name two cases where a custom reducer beats add_messages.")
    print(f"AI note: {note}")

    # Custom reducer: keep maximum value
    @reducer
    def max_value(current: int, new: int) -> int:
        """Keep the larger of two values."""
        return max(current or 0, new or 0)

    # Custom reducer: accumulate unique items
    @reducer
    def unique_append(current: list, new: list) -> list:
        """Append only items not already in list."""
        result = list(current or [])
        for item in new or []:
            if item not in result:
                result.append(item)
        return result

    class GameState(BaseModel):
        high_score: Annotated[int, max_value] = 0
        achievements: Annotated[list, unique_append] = []

    graph = StateGraph(state_schema=GameState)

    async def level_1(inputs):
        return {"high_score": 100, "achievements": ["first_step"]}

    async def level_2(inputs):
        return {"high_score": 50, "achievements": ["first_step", "speedrun"]}

    async def level_3(inputs):
        return {"high_score": 200, "achievements": ["speedrun", "perfectionist"]}

    graph.add_node("level1", level_1)
    graph.add_node("level2", level_2)
    graph.add_node("level3", level_3)

    graph.add_edge(START, "level1")
    graph.add_edge("level1", "level2")
    graph.add_edge("level2", "level3")
    graph.add_edge("level3", END)

    result = await graph.execute({})
    print(f"High score (max): {result.final_state.get('high_score')}")
    print(f"Achievements (unique): {result.final_state.get('achievements')}")
    print()


# =============================================================================
# Part 4: last_value Reducer
# =============================================================================


async def example_last_value():
    """Use last_value for fields that should be overwritten."""
    print("=== Part 4: last_value Reducer ===\n")
    note = _llm_call(
        "In one sentence, what kind of field is a good fit for the last_value reducer?"
    )
    print(f"AI note: {note}")

    class ProcessState(BaseModel):
        # last_value is the default behavior - just take the latest
        status: Annotated[str, last_value] = "pending"
        # These accumulate
        log: Annotated[list, append_list] = []

    graph = StateGraph(state_schema=ProcessState)

    async def step1(inputs):
        return {"status": "processing", "log": ["Step 1 complete"]}

    async def step2(inputs):
        return {"status": "validating", "log": ["Step 2 complete"]}

    async def step3(inputs):
        return {"status": "done", "log": ["Step 3 complete"]}

    graph.add_node("step1", step1)
    graph.add_node("step2", step2)
    graph.add_node("step3", step3)

    graph.add_edge(START, "step1")
    graph.add_edge("step1", "step2")
    graph.add_edge("step2", "step3")
    graph.add_edge("step3", END)

    result = await graph.execute({})
    print(f"Status (last value): {result.final_state.get('status')}")
    print(f"Log (accumulated): {result.final_state.get('log')}")
    print()


# =============================================================================
# Part 5: Complex State with Multiple Reducers
# =============================================================================


async def example_complex_state():
    """Combine multiple reducers in a real scenario."""
    print("=== Part 5: Complex State ===\n")
    note = _llm_call(
        "In one sentence, explain why combining append_list, add_numbers, and "
        "merge_dict reducers is useful for an order-processing graph."
    )
    print(f"AI note: {note}")

    class OrderState(BaseModel):
        # Order items accumulate
        items: Annotated[list, append_list] = []
        # Total price adds up
        total: Annotated[float, add_numbers] = 0.0
        # Discounts merge (could have multiple types)
        discounts: Annotated[dict, merge_dict] = {}
        # Status is always the latest
        status: Annotated[str, last_value] = "new"
        # Messages accumulate for history
        messages: Annotated[list, add_messages] = []

    graph = StateGraph(state_schema=OrderState)

    async def add_item(inputs):
        return {
            "items": [{"name": "Laptop", "price": 999.99}],
            "total": 999.99,
            "status": "items_added",
            "messages": [Message.system("Item added: Laptop")],
        }

    async def add_another(inputs):
        return {
            "items": [{"name": "Mouse", "price": 49.99}],
            "total": 49.99,
            "status": "items_added",
            "messages": [Message.system("Item added: Mouse")],
        }

    async def apply_discount(inputs):
        discount_amount = inputs.get("total", 0) * 0.1
        return {
            "discounts": {"loyalty": discount_amount},
            "total": -discount_amount,  # Subtract discount
            "status": "discount_applied",
            "messages": [Message.system(f"10% loyalty discount: -${discount_amount:.2f}")],
        }

    async def finalize(inputs):
        return {
            "status": "finalized",
            "messages": [Message.system(f"Order total: ${inputs.get('total', 0):.2f}")],
        }

    graph.add_node("add_item", add_item)
    graph.add_node("add_another", add_another)
    graph.add_node("discount", apply_discount)
    graph.add_node("finalize", finalize)

    graph.add_edge(START, "add_item")
    graph.add_edge("add_item", "add_another")
    graph.add_edge("add_another", "discount")
    graph.add_edge("discount", "finalize")
    graph.add_edge("finalize", END)

    result = await graph.execute({})

    print("Final Order State:")
    print(f"  Items: {len(result.final_state.get('items', []))} items")
    print(f"  Total: ${result.final_state.get('total', 0):.2f}")
    print(f"  Discounts: {result.final_state.get('discounts')}")
    print(f"  Status: {result.final_state.get('status')}")
    print(f"  Messages: {len(result.final_state.get('messages', []))} entries")
    print()


# =============================================================================
# Part 6: Reducers with a real LLM in the loop
# =============================================================================


async def example_reducer_with_llm():
    """Multiple LLM-producing nodes append messages via add_messages."""
    print("=== Part 6: Reducers + real LLM ===\n")

    class ChatLog(BaseModel):
        messages: Annotated[list, add_messages] = []

    graph = StateGraph(state_schema=ChatLog)

    import time as _t

    async def headline(_inputs):
        agent = Agent(
            model=get_model(max_tokens=40),
            system_prompt="You write punchy one-line product headlines.",
        )
        t0 = _t.perf_counter()
        result = agent.run_sync("Write a headline for an SDK that orchestrates AI agents.")
        dt = _t.perf_counter() - t0
        print(
            f"  [model call (headline): {dt:.2f}s · {result.metrics.prompt_tokens}{result.metrics.completion_tokens} tokens]"
        )
        return {"messages": [Message.assistant(f"[headline] {result.message.strip()}")]}

    async def tagline(_inputs):
        agent = Agent(
            model=get_model(max_tokens=40),
            system_prompt="You write 6-word taglines.",
        )
        t0 = _t.perf_counter()
        result = agent.run_sync("Tagline for a multi-agent reasoning SDK.")
        dt = _t.perf_counter() - t0
        print(
            f"  [model call (tagline):  {dt:.2f}s · {result.metrics.prompt_tokens}{result.metrics.completion_tokens} tokens]"
        )
        return {"messages": [Message.assistant(f"[tagline] {result.message.strip()}")]}

    graph.add_node("headline", headline)
    graph.add_node("tagline", tagline)
    graph.add_edge(START, "headline")
    graph.add_edge("headline", "tagline")
    graph.add_edge("tagline", END)

    result = await graph.execute({})
    for msg in result.final_state.get("messages", []):
        print(f"  {msg.content}")
    print()


# =============================================================================
# Main
# =============================================================================


async def main():
    """Run all tutorial parts."""
    print("=" * 60)
    print("Tutorial 08: State Reducers")
    print("=" * 60)
    print()

    await example_without_reducers()
    await example_with_reducers()
    await example_builtin_reducers()
    await example_merge_dict()
    await example_custom_reducer()
    await example_last_value()
    await example_complex_state()
    await example_reducer_with_llm()

    print("=" * 60)
    print("Next: Tutorial 09 - Human-in-the-Loop")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())