Composition Patterns¶
Chain agents, run them in parallel, or loop one until it's satisfied.
When the work decomposes cleanly into agent-shaped pieces, you don't
need a full StateGraph. The three pipeline classes here are
batteries-included composition primitives that take a list of Agent
instances and orchestrate them for you.
What you'll see:
SequentialPipeline— each agent's output becomes the next agent's input.ParallelPipeline— run agents concurrently, then merge their results.LoopAgent— run one agent repeatedly until a stop condition fires.- One-liner helpers:
sequential(),parallel(),loop().
Runs on the same OCI GenAI default as the rest of the notebooks:
LOCUS_MODEL_ID=openai.gpt-4.1 python examples/notebook_27_composition.py
# or, fully offline:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_27_composition.py
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Chain agents, run them in parallel, or loop one until it's satisfied.
When the work decomposes cleanly into agent-shaped pieces, you don't
need a full StateGraph. The three pipeline classes here are batteries-
included composition primitives that take a list of Agent instances
and orchestrate them for you.
- SequentialPipeline — each agent's output becomes the next agent's input.
- ParallelPipeline — run agents concurrently, then merge their results.
- LoopAgent — run one agent repeatedly until a stop condition fires.
- Helpers sequential() / parallel() / loop() exist for one-liners.
Run it:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_27_composition.py
The default provider is OCI GenAI — auto-detected from ~/.oci/config.
Set LOCUS_MODEL_PROVIDER=mock for offline runs. Pick an OCI model with
LOCUS_MODEL_ID=openai.gpt-4.1 (or meta.llama-3.3-70b-instruct, etc.).
"""
import asyncio
from config import get_model
from locus.agent import (
Agent,
AgentConfig,
LoopAgent,
ParallelPipeline,
SequentialPipeline,
)
# =============================================================================
# Part 1: Sequential — researcher then writer
# =============================================================================
async def example_sequential():
"""Researcher gathers facts; writer turns them into prose."""
print("=== Part 1: Sequential — researcher then writer ===\n")
model = get_model()
researcher = Agent(
config=AgentConfig(
system_prompt="You are a researcher. Provide 3 key facts about the topic.",
max_iterations=3,
model=model,
)
)
writer = Agent(
config=AgentConfig(
system_prompt="You are a writer. Take the research and write a short paragraph.",
max_iterations=3,
model=model,
)
)
pipeline = SequentialPipeline(agents=[researcher, writer])
result = await pipeline.run("Benefits of regular exercise")
print(f"Stage 1 (Researcher): {result.outputs[0][:100]}...")
print(f"Stage 2 (Writer): {result.outputs[1][:100]}...")
print(f"Duration: {result.duration_ms:.0f}ms")
# =============================================================================
# Part 2: Parallel — pros vs cons in one call
# =============================================================================
async def example_parallel():
"""Two agents form independent perspectives; the pipeline merges them."""
print("\n=== Part 2: Parallel — pros vs cons in one call ===\n")
model = get_model()
pros = Agent(
config=AgentConfig(
system_prompt="List 2 pros of the topic. Be concise.",
max_iterations=3,
model=model,
)
)
cons = Agent(
config=AgentConfig(
system_prompt="List 2 cons of the topic. Be concise.",
max_iterations=3,
model=model,
)
)
pipeline = ParallelPipeline(agents=[pros, cons])
result = await pipeline.run("Remote work for engineers")
print(f"Pros: {result.outputs[0][:100]}...")
print(f"Cons: {result.outputs[1][:100]}...")
print(f"Merged: {result.final_output[:150]}...")
# =============================================================================
# Part 3: Loop — iterate until APPROVED or max_loops
# =============================================================================
async def example_loop():
"""LoopAgent re-runs the same agent, feeding back the previous output."""
print("\n=== Part 3: Loop — iterate until APPROVED or max_loops ===\n")
model = get_model()
improver = Agent(
config=AgentConfig(
system_prompt=(
"You improve text quality. When the text is good enough, "
"include the word APPROVED at the end."
),
max_iterations=3,
model=model,
)
)
loop = LoopAgent(
agent=improver,
condition=lambda output: "APPROVED" in output.upper(),
max_loops=3,
loop_prompt="Improve this text. Say APPROVED when done:\n{previous_output}",
)
result = await loop.run("The quick brown fox jumps over the lazy dog.")
print(f"Iterations: {len(result.outputs)}")
print(f"Final: {result.final_output[:100]}...")
if __name__ == "__main__":
asyncio.run(example_sequential())
asyncio.run(example_parallel())
asyncio.run(example_loop())