Tutorial 10: Advanced Patterns¶
This tutorial covers:
- Command primitive for routing control
- Send for map-reduce patterns
- Subgraph composition
- Cross-thread Store
- Combining patterns
Prerequisites: Tutorial 09 (Human-in-the-Loop) Difficulty: Advanced
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 10: Advanced Patterns
This tutorial covers:
- Command primitive for routing control
- Send for map-reduce patterns
- Subgraph composition
- Cross-thread Store
- Combining patterns
Prerequisites: Tutorial 09 (Human-in-the-Loop)
Difficulty: Advanced
"""
import asyncio
import time
from config import get_model
from locus.agent import Agent
from locus.core import Command, broadcast, end, goto, scatter
from locus.memory import InMemoryStore
from locus.multiagent import END, START, StateGraph
def _llm_call(
prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
"""Helper: real model call with timing/token banner — used by every Part."""
agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
t0 = time.perf_counter()
res = agent.run_sync(prompt)
dt = time.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {res.metrics.prompt_tokens}→{res.metrics.completion_tokens} tokens]"
)
return res.message.strip()
# =============================================================================
# Part 1: Command Primitive
# =============================================================================
async def example_command_routing():
"""Use Command for dynamic routing decisions."""
print("=== Part 1: Command Routing ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why is Locus Command better than separate edges + state writes?')}"
)
graph = StateGraph()
async def classify(inputs):
request_type = inputs.get("type", "unknown")
# Command controls both state update AND routing
if request_type == "urgent":
return Command(
update={"priority": "high", "classified": True},
goto="fast_track",
)
elif request_type == "normal":
return Command(
update={"priority": "normal", "classified": True},
goto="standard",
)
else:
return Command(
update={"priority": "low", "classified": True},
goto="review",
)
async def fast_track(inputs):
return {"path": "fast_track", "sla": "1 hour"}
async def standard(inputs):
return {"path": "standard", "sla": "24 hours"}
async def review(inputs):
return {"path": "review", "sla": "48 hours"}
graph.add_node("classify", classify)
graph.add_node("fast_track", fast_track)
graph.add_node("standard", standard)
graph.add_node("review", review)
graph.add_edge(START, "classify")
# No need for explicit edges - Command handles routing
graph.add_edge("fast_track", END)
graph.add_edge("standard", END)
graph.add_edge("review", END)
for req_type in ["urgent", "normal", "unknown"]:
result = await graph.execute({"type": req_type})
print(
f"{req_type}: path={result.final_state.get('path')}, sla={result.final_state.get('sla')}"
)
print()
async def example_goto_helpers():
"""Use goto() and end() helper functions."""
print("=== Part 1b: goto/end Helpers ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is goto() preferable to a Command literal?')}"
)
graph = StateGraph()
async def check_auth(inputs):
token = inputs.get("token", "")
if token == "valid": # noqa: S105 — tutorial literal, not a real secret
# goto() is shorthand for Command(goto=...)
return goto("authorized", authenticated=True)
return goto("denied", authenticated=False)
async def authorized(inputs):
# end() routes to END with final state update
return end(message="Welcome!", access="granted")
async def denied(inputs):
return end(message="Access denied", access="none")
graph.add_node("auth", check_auth)
graph.add_node("authorized", authorized)
graph.add_node("denied", denied)
graph.add_edge(START, "auth")
graph.add_edge("authorized", END)
graph.add_edge("denied", END)
for token in ["valid", "invalid"]:
result = await graph.execute({"token": token})
print(f"Token '{token}': {result.final_state.get('message')}")
print()
# =============================================================================
# Part 2: Send for Map-Reduce
# =============================================================================
async def example_scatter():
"""Fan-out processing with scatter()."""
print("=== Part 2: scatter() Pattern ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, give an SDK use-case for the scatter() fan-out helper.')}"
)
graph = StateGraph()
async def split_work(inputs):
items = inputs.get("items", [])
# scatter sends each item to a worker node
return scatter("process", items, key="item")
async def process(inputs):
item = inputs.get("item", "")
# Process each item
return {"processed": item.upper()}
async def collect(inputs):
# Collect all send results
results = []
for key, value in inputs.items():
if key.startswith("send_") and isinstance(value, dict):
results.append(value.get("processed"))
return {"results": results, "count": len(results)}
graph.add_node("split", split_work)
graph.add_node("process", process)
graph.add_node("collect", collect)
graph.add_edge(START, "split")
graph.add_edge("split", "collect")
graph.add_edge("collect", END)
result = await graph.execute({"items": ["apple", "banana", "cherry"]})
print(f"Processed {result.final_state.get('count')} items")
print(f"Results: {result.final_state.get('results')}")
print()
async def example_broadcast():
"""Send same data to multiple processors."""
print("=== Part 2b: broadcast() Pattern ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is broadcast() better than scatter() in a graph?')}"
)
graph = StateGraph()
async def prepare(inputs):
text = inputs.get("text", "")
# Send same text to multiple analyzers
return broadcast(["sentiment", "keywords", "length"], {"text": text})
async def sentiment(inputs):
text = inputs.get("text", "").lower()
score = "positive" if "good" in text or "great" in text else "neutral"
return {"sentiment": score}
async def keywords(inputs):
words = inputs.get("text", "").split()
return {"keywords": words[:3]}
async def length(inputs):
return {"length": len(inputs.get("text", ""))}
async def combine(inputs):
analysis = {}
for key in ["sentiment", "keywords", "length"]:
if key in inputs:
analysis[key] = inputs[key]
return {"analysis": analysis}
graph.add_node("prepare", prepare)
graph.add_node("sentiment", sentiment)
graph.add_node("keywords", keywords)
graph.add_node("length", length)
graph.add_node("combine", combine)
graph.add_edge(START, "prepare")
graph.add_edge("prepare", "combine")
graph.add_edge("combine", END)
result = await graph.execute({"text": "This is a great example of text analysis"})
print(f"Analysis: {result.final_state.get('analysis')}")
print()
# =============================================================================
# Part 3: Subgraph Composition
# =============================================================================
async def example_subgraph():
"""Compose graphs from reusable subgraphs."""
print("=== Part 3: Subgraph Composition ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when should you factor a piece of graph logic out as a subgraph?')}"
)
# Create a reusable validation subgraph
validation_graph = StateGraph()
async def check_required(inputs):
data = inputs.get("data", {})
missing = [f for f in ["name", "email"] if f not in data]
return {"missing_fields": missing, "has_required": len(missing) == 0}
async def check_format(inputs):
data = inputs.get("data", {})
email = data.get("email", "")
return {"valid_email": "@" in email}
validation_graph.add_node("required", check_required)
validation_graph.add_node("format", check_format)
validation_graph.add_edge(START, "required")
validation_graph.add_edge("required", "format")
validation_graph.add_edge("format", END)
# Main graph uses the subgraph
main_graph = StateGraph()
async def prepare_data(inputs):
return {"data": inputs}
# Add subgraph as a node!
main_graph.add_node("prepare", prepare_data)
main_graph.add_node("validate", validation_graph) # Subgraph as node
async def process_result(inputs):
is_valid = inputs.get("has_required") and inputs.get("valid_email")
return {"status": "valid" if is_valid else "invalid"}
main_graph.add_node("result", process_result)
main_graph.add_edge(START, "prepare")
main_graph.add_edge("prepare", "validate")
main_graph.add_edge("validate", "result")
main_graph.add_edge("result", END)
# Test with valid data
result = await main_graph.execute({"name": "Alice", "email": "alice@example.com"})
print(f"Valid data: status = {result.final_state.get('status')}")
# Test with invalid data
result = await main_graph.execute({"name": "Bob"})
print(f"Missing email: status = {result.final_state.get('status')}")
print()
# =============================================================================
# Part 4: Cross-Thread Store
# =============================================================================
async def example_store():
"""Use Store for cross-conversation memory."""
print("=== Part 4: Cross-Thread Store ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, what kind of state belongs in InMemoryStore vs in graph state?')}"
)
store = InMemoryStore()
graph = StateGraph()
async def greet_user(inputs):
user_id = inputs.get("user_id")
# Check if we know this user
name = await store.get(("users", user_id), "name")
if name:
return {"greeting": f"Welcome back, {name}!", "known_user": True}
return {"greeting": "Hello! What's your name?", "known_user": False}
async def learn_name(inputs):
if not inputs.get("known_user"):
user_id = inputs.get("user_id")
name = inputs.get("provided_name", "Friend")
# Store for next time
await store.put(("users", user_id), "name", name)
return {"learned": True, "stored_name": name}
return {"learned": False}
graph.add_node("greet", greet_user)
graph.add_node("learn", learn_name)
graph.add_edge(START, "greet")
graph.add_edge("greet", "learn")
graph.add_edge("learn", END)
# First visit - don't know user
print("Session 1:")
result = await graph.execute({"user_id": "user123", "provided_name": "Alice"})
print(f" {result.final_state.get('greeting')}")
# Second visit - remember user
print("\nSession 2:")
result = await graph.execute({"user_id": "user123"})
print(f" {result.final_state.get('greeting')}")
print()
# =============================================================================
# Part 5: Combining Patterns
# =============================================================================
async def example_combined():
"""Combine multiple patterns in one workflow."""
print("=== Part 5: Combined Patterns ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why is combining Command + scatter + Store typical for multi-tenant order pipelines?')}"
)
store = InMemoryStore()
graph = StateGraph()
async def classify_order(inputs):
amount = inputs.get("amount", 0)
user_id = inputs.get("user_id")
# Check user's VIP status from store
is_vip = await store.get(("users", user_id), "vip") or False
if amount > 1000 or is_vip:
return Command(
update={"priority": "high", "vip": is_vip},
goto="priority_process",
)
return Command(
update={"priority": "normal", "vip": is_vip},
goto="standard_process",
)
async def priority_process(inputs):
# Send to multiple handlers in parallel
return scatter("handler", ["verify", "discount", "notify"], key="action")
async def standard_process(inputs):
return {"processed": True, "path": "standard"}
async def handler(inputs):
action = inputs.get("action", "")
return {f"{action}_done": True}
async def finalize(inputs):
# Store order in user's history
user_id = inputs.get("user_id")
await store.put(
("users", user_id, "orders"),
f"order_{inputs.get('amount')}",
{"amount": inputs.get("amount"), "priority": inputs.get("priority")},
)
return {"status": "complete", "priority": inputs.get("priority")}
graph.add_node("classify", classify_order)
graph.add_node("priority_process", priority_process)
graph.add_node("standard_process", standard_process)
graph.add_node("handler", handler)
graph.add_node("finalize", finalize)
graph.add_edge(START, "classify")
graph.add_edge("priority_process", "finalize")
graph.add_edge("standard_process", "finalize")
graph.add_edge("finalize", END)
# Set up a VIP user
await store.put(("users", "vip_user"), "vip", True) # noqa: FBT003 — store.put signature is (key, value, is_vip)
# Regular user, small order
result = await graph.execute({"user_id": "regular", "amount": 50})
print(f"Regular user, $50: {result.final_state.get('priority')} priority")
# Regular user, large order
result = await graph.execute({"user_id": "regular", "amount": 2000})
print(f"Regular user, $2000: {result.final_state.get('priority')} priority")
# VIP user, any order
result = await graph.execute({"user_id": "vip_user", "amount": 10})
print(f"VIP user, $10: {result.final_state.get('priority')} priority")
print()
# =============================================================================
# Part 6: LLM picks the Command target
# =============================================================================
async def example_command_with_llm():
"""An LLM classifies a customer request and the node returns a Command
with the chosen branch — combining real reasoning with structured
routing primitives."""
print("=== Part 6: Command + real LLM ===\n")
graph = StateGraph()
async def triage(inputs):
import time as _t
message = inputs.get("message", "")
agent = Agent(
model=get_model(max_tokens=10),
system_prompt=(
"You are a triage classifier. Output one of: refund, ship, escalate. "
"Reply with just that single word."
),
)
t0 = _t.perf_counter()
result = agent.run_sync(message)
dt = _t.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {result.metrics.prompt_tokens}→{result.metrics.completion_tokens} tokens]"
)
label = result.message.strip().lower()
if label not in {"refund", "ship", "escalate"}:
label = "escalate"
return Command(update={"label": label}, goto=label)
async def refund(_inputs):
return {"resolution": "refund queued"}
async def ship(_inputs):
return {"resolution": "shipping label generated"}
async def escalate(_inputs):
return {"resolution": "escalated to a human agent"}
graph.add_node("triage", triage)
graph.add_node("refund", refund)
graph.add_node("ship", ship)
graph.add_node("escalate", escalate)
graph.add_edge(START, "triage")
graph.add_edge("refund", END)
graph.add_edge("ship", END)
graph.add_edge("escalate", END)
samples = [
"Charge me back, the package never arrived.",
"When will my order #482 ship?",
"I want to speak with a manager about your data policy.",
]
for msg in samples:
result = await graph.execute({"message": msg})
print(f" '{msg[:40]}…' → {result.final_state.get('resolution')}")
print()
# =============================================================================
# Main
# =============================================================================
async def main():
"""Run all tutorial parts."""
print("=" * 60)
print("Tutorial 10: Advanced Patterns")
print("=" * 60)
print()
await example_command_routing()
await example_goto_helpers()
await example_scatter()
await example_broadcast()
await example_subgraph()
await example_store()
await example_combined()
await example_command_with_llm()
print("=" * 60)
print("Next: Tutorial 11 - Swarm Multi-Agent")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())