Advanced Patterns¶
Five primitives that turn a StateGraph into a general-purpose
runtime. Reach for these once basic graphs stop being enough: dynamic
routing from inside a node, fan-out to many workers, reusable
subgraphs, cross-conversation key/value storage, and combining them
in one workflow.
What you'll see:
Command(update=..., goto=...)— write state and pick the next node in one return value.goto()andend()— short helpers for commonCommandshapes.scatter("worker", items, key=...)— fan a list of items out to copies of a worker node.broadcast(nodes, payload)— fan one payload out to several different nodes.- Subgraph-as-node — call one
StateGraphfrom inside another. InMemoryStore— durable key/value space that outlives a single run.
Runs on the same OCI GenAI default as the rest of the notebooks:
LOCUS_MODEL_ID=openai.gpt-4.1 python examples/notebook_26_advanced_patterns.py
# or, fully offline:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_26_advanced_patterns.py
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Five primitives that turn a StateGraph into a general-purpose runtime.
This notebook introduces the building blocks you reach for once basic
graphs stop being enough: dynamic routing from inside a node, fan-out
to many workers, reusable subgraphs, cross-conversation key/value
storage, and combining them in one workflow.
- Command(update=..., goto=...) — write state and pick the next node in one return value.
- goto() / end() — short helpers for common Command shapes.
- scatter() — fan a list of items out to copies of a worker node.
- broadcast() — fan one payload out to several different nodes.
- Subgraph-as-node — call one StateGraph from inside another.
- InMemoryStore — durable key/value space that outlives a single run.
Run it:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_26_advanced_patterns.py
The default provider is OCI GenAI — auto-detected from ~/.oci/config.
Set LOCUS_MODEL_PROVIDER=mock for offline runs. Pick an OCI model with
LOCUS_MODEL_ID=openai.gpt-4.1 (or meta.llama-3.3-70b-instruct, etc.).
"""
import asyncio
import time
from config import get_model
from locus.agent import Agent
from locus.core import Command, broadcast, end, goto, scatter
from locus.memory import InMemoryStore
from locus.multiagent import END, START, StateGraph
def _llm_call(
prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
"""Run a one-shot Agent and print a timing/token banner. Used by every part."""
agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
t0 = time.perf_counter()
res = agent.run_sync(prompt)
dt = time.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {res.metrics.prompt_tokens}→{res.metrics.completion_tokens} tokens]"
)
return res.message.strip()
# =============================================================================
# Part 1: Command — state and routing in one return
# =============================================================================
async def example_command_routing():
"""A node that returns Command picks its own next destination."""
print("=== Part 1: Command — state and routing in one return ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why is Locus Command better than separate edges + state writes?')}"
)
graph = StateGraph()
async def classify(inputs):
request_type = inputs.get("type", "unknown")
# Returning a Command both writes state and selects the next node,
# so this single node replaces a conditional edge + a state writer.
if request_type == "urgent":
return Command(
update={"priority": "high", "classified": True},
goto="fast_track",
)
elif request_type == "normal":
return Command(
update={"priority": "normal", "classified": True},
goto="standard",
)
else:
return Command(
update={"priority": "low", "classified": True},
goto="review",
)
async def fast_track(inputs):
return {"path": "fast_track", "sla": "1 hour"}
async def standard(inputs):
return {"path": "standard", "sla": "24 hours"}
async def review(inputs):
return {"path": "review", "sla": "48 hours"}
graph.add_node("classify", classify)
graph.add_node("fast_track", fast_track)
graph.add_node("standard", standard)
graph.add_node("review", review)
graph.add_edge(START, "classify")
# No outgoing edges from classify — Command(goto=...) handles routing.
graph.add_edge("fast_track", END)
graph.add_edge("standard", END)
graph.add_edge("review", END)
for req_type in ["urgent", "normal", "unknown"]:
result = await graph.execute({"type": req_type})
print(
f"{req_type}: path={result.final_state.get('path')}, sla={result.final_state.get('sla')}"
)
print()
async def example_goto_helpers():
"""goto() and end() are shorthand for the most common Command shapes."""
print("=== Part 1b: goto() and end() ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is goto() preferable to a Command literal?')}"
)
graph = StateGraph()
async def check_auth(inputs):
token = inputs.get("token", "")
if token == "valid": # noqa: S105 — notebook literal, not a real secret
# goto("name", k=v) == Command(goto="name", update={"k": v})
return goto("authorized", authenticated=True)
return goto("denied", authenticated=False)
async def authorized(inputs):
# end(k=v) == Command(goto=END, update={"k": v})
return end(message="Welcome!", access="granted")
async def denied(inputs):
return end(message="Access denied", access="none")
graph.add_node("auth", check_auth)
graph.add_node("authorized", authorized)
graph.add_node("denied", denied)
graph.add_edge(START, "auth")
graph.add_edge("authorized", END)
graph.add_edge("denied", END)
for token in ["valid", "invalid"]:
result = await graph.execute({"token": token})
print(f"Token '{token}': {result.final_state.get('message')}")
print()
# =============================================================================
# Part 2: scatter — fan one list out to many worker copies
# =============================================================================
async def example_scatter():
"""scatter("worker", items, key="x") runs `worker` once per item, in parallel."""
print("=== Part 2: scatter() ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, give an SDK use-case for the scatter() fan-out helper.')}"
)
graph = StateGraph()
async def split_work(inputs):
items = inputs.get("items", [])
return scatter("process", items, key="item")
async def process(inputs):
item = inputs.get("item", "")
return {"processed": item.upper()}
async def collect(inputs):
# Each scattered invocation lands its result under a send_* key.
results = []
for key, value in inputs.items():
if key.startswith("send_") and isinstance(value, dict):
results.append(value.get("processed"))
return {"results": results, "count": len(results)}
graph.add_node("split", split_work)
graph.add_node("process", process)
graph.add_node("collect", collect)
graph.add_edge(START, "split")
graph.add_edge("split", "collect")
graph.add_edge("collect", END)
result = await graph.execute({"items": ["apple", "banana", "cherry"]})
print(f"Processed {result.final_state.get('count')} items")
print(f"Results: {result.final_state.get('results')}")
print()
async def example_broadcast():
"""broadcast(nodes, payload) sends one payload to several different nodes."""
print("=== Part 2b: broadcast() ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is broadcast() better than scatter() in a graph?')}"
)
graph = StateGraph()
async def prepare(inputs):
text = inputs.get("text", "")
return broadcast(["sentiment", "keywords", "length"], {"text": text})
async def sentiment(inputs):
text = inputs.get("text", "").lower()
score = "positive" if "good" in text or "great" in text else "neutral"
return {"sentiment": score}
async def keywords(inputs):
words = inputs.get("text", "").split()
return {"keywords": words[:3]}
async def length(inputs):
return {"length": len(inputs.get("text", ""))}
async def combine(inputs):
analysis = {}
for key in ["sentiment", "keywords", "length"]:
if key in inputs:
analysis[key] = inputs[key]
return {"analysis": analysis}
graph.add_node("prepare", prepare)
graph.add_node("sentiment", sentiment)
graph.add_node("keywords", keywords)
graph.add_node("length", length)
graph.add_node("combine", combine)
graph.add_edge(START, "prepare")
graph.add_edge("prepare", "combine")
graph.add_edge("combine", END)
result = await graph.execute({"text": "This is a great example of text analysis"})
print(f"Analysis: {result.final_state.get('analysis')}")
print()
# =============================================================================
# Part 3: Subgraph as a node
# =============================================================================
async def example_subgraph():
"""A complete StateGraph can be added as a node in another graph."""
print("=== Part 3: Subgraph as a node ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when should you factor a piece of graph logic out as a subgraph?')}"
)
validation_graph = StateGraph()
async def check_required(inputs):
data = inputs.get("data", {})
missing = [f for f in ["name", "email"] if f not in data]
return {"missing_fields": missing, "has_required": len(missing) == 0}
async def check_format(inputs):
data = inputs.get("data", {})
email = data.get("email", "")
return {"valid_email": "@" in email}
validation_graph.add_node("required", check_required)
validation_graph.add_node("format", check_format)
validation_graph.add_edge(START, "required")
validation_graph.add_edge("required", "format")
validation_graph.add_edge("format", END)
main_graph = StateGraph()
async def prepare_data(inputs):
return {"data": inputs}
main_graph.add_node("prepare", prepare_data)
# The subgraph plugs in like any other node — its START/END become
# entry/exit hooks inside the parent.
main_graph.add_node("validate", validation_graph)
async def process_result(inputs):
is_valid = inputs.get("has_required") and inputs.get("valid_email")
return {"status": "valid" if is_valid else "invalid"}
main_graph.add_node("result", process_result)
main_graph.add_edge(START, "prepare")
main_graph.add_edge("prepare", "validate")
main_graph.add_edge("validate", "result")
main_graph.add_edge("result", END)
result = await main_graph.execute({"name": "Alice", "email": "alice@example.com"})
print(f"Valid data: status = {result.final_state.get('status')}")
result = await main_graph.execute({"name": "Bob"})
print(f"Missing email: status = {result.final_state.get('status')}")
print()
# =============================================================================
# Part 4: Store — memory that outlives one graph run
# =============================================================================
async def example_store():
"""Graph state is per-run; Store persists across runs (or threads)."""
print("=== Part 4: Store — memory that outlives one graph run ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, what kind of state belongs in InMemoryStore vs in graph state?')}"
)
store = InMemoryStore()
graph = StateGraph()
async def greet_user(inputs):
user_id = inputs.get("user_id")
name = await store.get(("users", user_id), "name")
if name:
return {"greeting": f"Welcome back, {name}!", "known_user": True}
return {"greeting": "Hello! What's your name?", "known_user": False}
async def learn_name(inputs):
if not inputs.get("known_user"):
user_id = inputs.get("user_id")
name = inputs.get("provided_name", "Friend")
await store.put(("users", user_id), "name", name)
return {"learned": True, "stored_name": name}
return {"learned": False}
graph.add_node("greet", greet_user)
graph.add_node("learn", learn_name)
graph.add_edge(START, "greet")
graph.add_edge("greet", "learn")
graph.add_edge("learn", END)
print("Session 1:")
result = await graph.execute({"user_id": "user123", "provided_name": "Alice"})
print(f" {result.final_state.get('greeting')}")
print("\nSession 2:")
result = await graph.execute({"user_id": "user123"})
print(f" {result.final_state.get('greeting')}")
print()
# =============================================================================
# Part 5: All five primitives in one workflow
# =============================================================================
async def example_combined():
"""An order pipeline that uses Command, scatter, and Store together."""
print("=== Part 5: All five primitives in one workflow ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why is combining Command + scatter + Store typical for multi-tenant order pipelines?')}"
)
store = InMemoryStore()
graph = StateGraph()
async def classify_order(inputs):
amount = inputs.get("amount", 0)
user_id = inputs.get("user_id")
is_vip = await store.get(("users", user_id), "vip") or False
if amount > 1000 or is_vip:
return Command(
update={"priority": "high", "vip": is_vip},
goto="priority_process",
)
return Command(
update={"priority": "normal", "vip": is_vip},
goto="standard_process",
)
async def priority_process(inputs):
return scatter("handler", ["verify", "discount", "notify"], key="action")
async def standard_process(inputs):
return {"processed": True, "path": "standard"}
async def handler(inputs):
action = inputs.get("action", "")
return {f"{action}_done": True}
async def finalize(inputs):
user_id = inputs.get("user_id")
await store.put(
("users", user_id, "orders"),
f"order_{inputs.get('amount')}",
{"amount": inputs.get("amount"), "priority": inputs.get("priority")},
)
return {"status": "complete", "priority": inputs.get("priority")}
graph.add_node("classify", classify_order)
graph.add_node("priority_process", priority_process)
graph.add_node("standard_process", standard_process)
graph.add_node("handler", handler)
graph.add_node("finalize", finalize)
graph.add_edge(START, "classify")
graph.add_edge("priority_process", "finalize")
graph.add_edge("standard_process", "finalize")
graph.add_edge("finalize", END)
await store.put(("users", "vip_user"), "vip", True) # noqa: FBT003 — store.put signature is (namespace, key, value)
result = await graph.execute({"user_id": "regular", "amount": 50})
print(f"Regular user, $50: {result.final_state.get('priority')} priority")
result = await graph.execute({"user_id": "regular", "amount": 2000})
print(f"Regular user, $2000: {result.final_state.get('priority')} priority")
result = await graph.execute({"user_id": "vip_user", "amount": 10})
print(f"VIP user, $10: {result.final_state.get('priority')} priority")
print()
# =============================================================================
# Part 6: LLM-decided Command target
# =============================================================================
async def example_command_with_llm():
"""An LLM classifies a customer message; the node returns Command(goto=label)."""
print("=== Part 6: LLM-decided Command target ===\n")
graph = StateGraph()
async def triage(inputs):
import time as _t
message = inputs.get("message", "")
agent = Agent(
model=get_model(max_tokens=10),
system_prompt=(
"You are a triage classifier. Output one of: refund, ship, escalate. "
"Reply with just that single word."
),
)
t0 = _t.perf_counter()
result = agent.run_sync(message)
dt = _t.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {result.metrics.prompt_tokens}→{result.metrics.completion_tokens} tokens]"
)
label = result.message.strip().lower()
# Clamp anything unexpected so goto= always lands on a real node.
if label not in {"refund", "ship", "escalate"}:
label = "escalate"
return Command(update={"label": label}, goto=label)
async def refund(_inputs):
return {"resolution": "refund queued"}
async def ship(_inputs):
return {"resolution": "shipping label generated"}
async def escalate(_inputs):
return {"resolution": "escalated to a human agent"}
graph.add_node("triage", triage)
graph.add_node("refund", refund)
graph.add_node("ship", ship)
graph.add_node("escalate", escalate)
graph.add_edge(START, "triage")
graph.add_edge("refund", END)
graph.add_edge("ship", END)
graph.add_edge("escalate", END)
samples = [
"Charge me back, the package never arrived.",
"When will my order #482 ship?",
"I want to speak with a manager about your data policy.",
]
for msg in samples:
result = await graph.execute({"message": msg})
print(f" '{msg[:40]}…' → {result.final_state.get('resolution')}")
print()
# =============================================================================
# Main
# =============================================================================
async def main():
print("=" * 60)
print("Notebook 21: Advanced patterns")
print("=" * 60)
print()
await example_command_routing()
await example_goto_helpers()
await example_scatter()
await example_broadcast()
await example_subgraph()
await example_store()
await example_combined()
await example_command_with_llm()
print("=" * 60)
print("Next: Notebook 22 — Composition")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())