Map-Reduce Code Review¶
Three source files, three reviewer roles (security, performance, style) = nine reviewer agents running in parallel, then one synthesizer collapses everything into a single Markdown report.
This notebook covers:
Send(node, payload, metadata)— first-class graph primitive. The splitter returns a list of Sends; the executor fans them out concurrently. No queues, no manualasyncio.gather.- Each reviewer is a distinct
Agentwith a role-specific system prompt. The graph orchestrates them, not a hand-rolled loop. - The synthesizer reads each Send's output back from merged state and renders the final Markdown report.
- The whole pipeline is one
StateGraph.executecall — streaming, cancellation, checkpointing, and GSAR judgment all attach for free.
Prerequisites¶
- Notebook 17 (basic graph).
- Notebook 25 (Swarm) for the dynamic-claim counterpoint.
Run¶
The default provider is OCI Generative AI. With ~/.oci/config
present the reviewers talk to a live OCI model; canonical picks are
openai.gpt-4.1 or meta.llama-3.3-70b-instruct. Set
LOCUS_MODEL_PROVIDER=mock for offline runs.
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Notebook 31: map-reduce code review — scatter-gather with the Send primitive.
Three source files, three reviewer roles (security, performance, style)
= nine reviewer agents running in parallel, then one synthesizer
collapses everything into a single report.
- ``Send(node, payload, metadata)`` is a first-class graph primitive.
The splitter node returns a list of Sends; the executor fans them out
and runs them concurrently. No queues, no manual ``asyncio.gather``.
- Each reviewer is a distinct ``Agent`` with its own role-specific
system prompt — the graph orchestrates them, not a hand-rolled loop.
- The synthesizer reads each Send's output back from merged state and
emits a single Markdown report.
- The whole pipeline is one ``StateGraph.execute`` call. Streaming,
cancellation, checkpointing, and GSAR judgment attach for free.
Run it:
.venv/bin/python examples/notebook_36_map_reduce_code_review.py
The default provider is OCI Generative AI. With ``~/.oci/config``
present the reviewers talk to a live OCI model (canonical pick:
``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``). Set
``LOCUS_MODEL_PROVIDER=mock`` for offline runs.
Prerequisites:
- Notebook 17 (basic graph).
- Notebook 25 (Swarm) for the dynamic-claim counterpoint.
"""
from __future__ import annotations
import asyncio
from typing import Any
from config import get_model
from locus.agent import Agent, AgentConfig
from locus.core.send import Send
from locus.multiagent.graph import END, START, StateGraph
# ----------------------------------------------------------------------------
# Three independent files to review in parallel
# ----------------------------------------------------------------------------
SAMPLE_FILES = {
"auth.py": """
def login(username, password):
if password == "admin123":
return {"role": "admin", "token": username}
return {"role": "user", "token": username}
""",
"billing.py": """
def calculate_total(items, tax_rate):
total = 0
for item in items:
total = total + item.price
return total * (1 + tax_rate)
""",
"search.py": """
def search(query, db):
sql = f"SELECT * FROM products WHERE name LIKE '%{query}%'"
return db.execute(sql).fetchall()
""",
}
# ----------------------------------------------------------------------------
# Reviewer roles — each runs as its own Agent with a role-specific prompt
# ----------------------------------------------------------------------------
REVIEWER_ROLES = {
"security": (
"You are a senior application-security reviewer. Read the code and "
"list every concrete security issue you can prove with the snippet "
"(SQL injection, hardcoded credentials, missing auth, unvalidated "
"input, etc.). Be specific. Do not invent issues. End with a single "
"line: SEVERITY=<low|medium|high|critical>."
),
"performance": (
"You are a performance-engineering reviewer. List concrete inefficiencies "
"in the code (N+1 patterns, accidental quadratic loops, redundant work, "
"missing batching, blocking I/O, etc.). End with: SEVERITY=<...>."
),
"style": (
"You are a Python style/idiom reviewer focused on readability, naming, "
"and modern Python idioms. Be terse. End with: SEVERITY=<...>."
),
}
def _make_reviewer(role: str, model: Any) -> Agent:
return Agent(
config=AgentConfig(
agent_id=f"reviewer-{role}",
model=model,
system_prompt=REVIEWER_ROLES[role],
# One model call is enough for static review of a short snippet.
max_iterations=2,
max_tokens=400,
)
)
# ----------------------------------------------------------------------------
# Graph nodes
# ----------------------------------------------------------------------------
async def split_files(state: dict[str, Any]) -> list[Send]:
"""Fan out: one Send per (file, role) — 3 × 3 = 9 reviewers, all concurrent."""
files: dict[str, str] = state["files"]
roles = list(REVIEWER_ROLES)
return [
Send(
node="review_one",
payload={"file_name": fname, "code": code, "role": role},
metadata={"file": fname, "role": role},
)
for fname in files
for code, _ in [(files[fname], None)]
for role in roles
]
async def review_one(state: dict[str, Any]) -> dict[str, Any]:
"""One reviewer Agent against one (file, role).
Uses ``async for event in agent.run(...)`` instead of ``run_sync()``
so the 9 instances run truly in parallel inside the graph's
``asyncio.gather`` — ``run_sync`` would serialise them on a shared
thread-pool worker.
"""
from locus.core.events import TerminateEvent
role: str = state["role"]
file_name: str = state["file_name"]
code: str = state["code"]
model = state["__model__"]
agent = _make_reviewer(role, model)
prompt = (
f"File: {file_name}\n"
f"Role: {role}\n\n"
f"```python\n{code}\n```\n\n"
f"Review the code as the {role} reviewer."
)
final_msg: str = ""
iterations = 0
async for event in agent.run(prompt):
if isinstance(event, TerminateEvent):
final_msg = event.final_message or ""
iterations = event.iterations_used
return {
"review": {
"file": file_name,
"role": role,
"comments": final_msg.strip(),
"iterations": iterations,
}
}
async def synthesize(state: dict[str, Any]) -> dict[str, Any]:
"""Reduce: walk the merged state, collect every ``review`` payload, render."""
reviews = [v["review"] for v in state.values() if isinstance(v, dict) and "review" in v]
by_file: dict[str, list[dict[str, Any]]] = {}
for r in reviews:
by_file.setdefault(r["file"], []).append(r)
lines = ["# Code review report", ""]
for fname in sorted(by_file):
lines.append(f"## {fname}")
for r in sorted(by_file[fname], key=lambda x: x["role"]):
lines.append(f"### {r['role']}")
lines.append(r["comments"])
lines.append("")
return {"report": "\n".join(lines), "review_count": len(reviews)}
# ----------------------------------------------------------------------------
# Build the graph
# ----------------------------------------------------------------------------
def build_review_graph(model: Any) -> StateGraph:
"""Wire the three nodes: split → review_one (parallel) → synthesize → END.
The model is threaded through state under ``__model__`` rather than
captured by closure so the graph stays picklable for checkpointing.
"""
graph = StateGraph(name="code-review-crew")
graph.add_node("split", split_files)
graph.add_node("review_one", review_one)
graph.add_node("synthesize", synthesize)
graph.add_edge(START, "split")
# No explicit edge "split → review_one" — the Sends from ``split``
# carry their own routing. Once every Send finishes, control returns
# to ``split``'s adjacency, which points at ``synthesize``.
graph.add_edge("split", "synthesize")
graph.add_edge("synthesize", END)
return graph
# ----------------------------------------------------------------------------
# Driver
# ----------------------------------------------------------------------------
async def main() -> None:
print("Notebook 31: map-reduce code review — scatter-gather with Send")
print("=" * 60)
model = get_model()
graph = build_review_graph(model)
initial = {"files": SAMPLE_FILES, "__model__": model}
print(
f"\nFanning out {len(SAMPLE_FILES)} files × {len(REVIEWER_ROLES)} roles "
f"= {len(SAMPLE_FILES) * len(REVIEWER_ROLES)} reviewer agents in parallel...\n"
)
result = await graph.execute(initial)
print(
f"Graph completed in {result.duration_ms:.0f} ms across "
f"{result.iterations} graph iteration(s)"
)
print(f"Reviews collected: {result.final_state.get('review_count', 0)}")
print()
print(result.final_state.get("report", "(no report)"))
if __name__ == "__main__":
asyncio.run(main())