Skip to content

Tutorial 25: Agent Composition — Sequential, Parallel, and Loop Pipelines

This tutorial covers:

  • SequentialPipeline: chain agents in order, output feeds next
  • ParallelPipeline: run agents concurrently, merge results
  • LoopAgent: iterate until a condition is met
  • Convenience functions: sequential(), parallel(), loop()

Prerequisites:

  • Configure model via environment variables (see examples/.env.example)

Difficulty: Intermediate

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 25: Agent Composition — Sequential, Parallel, and Loop Pipelines

This tutorial covers:
- SequentialPipeline: chain agents in order, output feeds next
- ParallelPipeline: run agents concurrently, merge results
- LoopAgent: iterate until a condition is met
- Convenience functions: sequential(), parallel(), loop()

Prerequisites:
- Configure model via environment variables (see examples/.env.example)

Difficulty: Intermediate
"""

import asyncio

from config import get_model

from locus.agent import (
    Agent,
    AgentConfig,
    LoopAgent,
    ParallelPipeline,
    SequentialPipeline,
)


# =============================================================================
# Part 1: Sequential Pipeline — Researcher → Writer
# =============================================================================


async def example_sequential():
    """Chain agents so each one's output feeds the next."""
    print("=== Part 1: Sequential Pipeline ===\n")

    model = get_model()

    researcher = Agent(
        config=AgentConfig(
            system_prompt="You are a researcher. Provide 3 key facts about the topic.",
            max_iterations=3,
            model=model,
        )
    )
    writer = Agent(
        config=AgentConfig(
            system_prompt="You are a writer. Take the research and write a short paragraph.",
            max_iterations=3,
            model=model,
        )
    )

    pipeline = SequentialPipeline(agents=[researcher, writer])
    result = await pipeline.run("Benefits of regular exercise")

    print(f"Stage 1 (Researcher): {result.outputs[0][:100]}...")
    print(f"Stage 2 (Writer): {result.outputs[1][:100]}...")
    print(f"Duration: {result.duration_ms:.0f}ms")


# =============================================================================
# Part 2: Parallel Pipeline — Multiple perspectives
# =============================================================================


async def example_parallel():
    """Run agents concurrently and merge their results."""
    print("\n=== Part 2: Parallel Pipeline ===\n")

    model = get_model()

    pros = Agent(
        config=AgentConfig(
            system_prompt="List 2 pros of the topic. Be concise.",
            max_iterations=3,
            model=model,
        )
    )
    cons = Agent(
        config=AgentConfig(
            system_prompt="List 2 cons of the topic. Be concise.",
            max_iterations=3,
            model=model,
        )
    )

    pipeline = ParallelPipeline(agents=[pros, cons])
    result = await pipeline.run("Remote work for engineers")

    print(f"Pros: {result.outputs[0][:100]}...")
    print(f"Cons: {result.outputs[1][:100]}...")
    print(f"Merged: {result.final_output[:150]}...")


# =============================================================================
# Part 3: Loop Agent — Iterate until done
# =============================================================================


async def example_loop():
    """Run an agent in a loop until a condition is met."""
    print("\n=== Part 3: Loop Agent ===\n")

    model = get_model()

    improver = Agent(
        config=AgentConfig(
            system_prompt=(
                "You improve text quality. When the text is good enough, "
                "include the word APPROVED at the end."
            ),
            max_iterations=3,
            model=model,
        )
    )

    loop = LoopAgent(
        agent=improver,
        condition=lambda output: "APPROVED" in output.upper(),
        max_loops=3,
        loop_prompt="Improve this text. Say APPROVED when done:\n{previous_output}",
    )

    result = await loop.run("The quick brown fox jumps over the lazy dog.")
    print(f"Iterations: {len(result.outputs)}")
    print(f"Final: {result.final_output[:100]}...")


# =============================================================================
# Run all examples
# =============================================================================


if __name__ == "__main__":
    asyncio.run(example_sequential())
    asyncio.run(example_parallel())
    asyncio.run(example_loop())