Skip to content

Agent Streaming

agent.run(prompt) returns an async iterator of events. Watch the agent think, call tools, and terminate — live, in order. This is what lets you build progress bars, dashboards, audit logs, and SSE endpoints.

What you'll learn:

  • The event types: ThinkEvent, ToolStartEvent, ToolCompleteEvent, TerminateEvent, plus model chunk events.
  • Filtering with isinstance(event, EventType).
  • Building a live console UI from the stream.
  • Rolling event counts into per-run metrics.
  • Drawing a progress bar from ToolCompleteEvent.
  • A pointer to StructuredStream for incremental Pydantic parsing.

Run it:

.venv/bin/python examples/notebook_17_agent_streaming.py

The default provider is OCI Generative AI (canonical id: openai.gpt-4.1 or meta.llama-3.3-70b-instruct). For offline runs set LOCUS_MODEL_PROVIDER=mock; OpenAI, Anthropic and Ollama also work.

Prerequisite: notebook 09.

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 11: streaming events from an agent.

Every Locus agent run is a stream of events: thinking, tool start, tool
complete, terminate. ``agent.run(prompt)`` returns an async iterator
that yields them in order. This notebook walks through the full set,
then shows useful patterns built on top — a live console UI, event
filtering, metric collection, and a progress bar.

Key ideas:
- ``ThinkEvent``: the model produced a plan (and possibly tool calls).
- ``ToolStartEvent`` / ``ToolCompleteEvent``: a tool ran; the complete
  event carries the result and duration.
- ``TerminateEvent``: the loop ended; carries the final message, stop
  reason, and iteration count.
- You filter the stream by ``isinstance(event, EventType)``.
- ``StructuredStream`` (mentioned at the end) wraps the stream and
  yields partial Pydantic instances as JSON arrives.

Run it:
    .venv/bin/python examples/notebook_17_agent_streaming.py

The default provider is OCI Generative AI (canonical example:
``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``). For an offline
run set ``LOCUS_MODEL_PROVIDER=mock``; OpenAI, Anthropic, and Ollama
work too.

Prerequisite: notebook 09.
"""

import ast
import asyncio
import operator as _op
from datetime import datetime

# Import shared config
from config import get_model, print_config

from locus.agent import Agent
from locus.core.events import (
    TerminateEvent,
    ThinkEvent,
    ToolCompleteEvent,
    ToolStartEvent,
)
from locus.tools import tool


# =============================================================================
# Part 1: every event type the agent emits
# =============================================================================


_SAFE_MATH_BIN_OPS = {
    ast.Add: _op.add,
    ast.Sub: _op.sub,
    ast.Mult: _op.mul,
    ast.Div: _op.truediv,
    ast.FloorDiv: _op.floordiv,
    ast.Mod: _op.mod,
    ast.Pow: _op.pow,
}
_SAFE_MATH_UNARY_OPS = {ast.USub: _op.neg, ast.UAdd: _op.pos}


def _safe_math_eval(expression: str) -> float:
    """Evaluate arithmetic via AST — no eval, no attribute access, no names."""
    tree = ast.parse(expression, mode="eval")

    def _eval(node: ast.AST) -> float:
        if isinstance(node, ast.Expression):
            return _eval(node.body)
        if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
            return node.value
        if isinstance(node, ast.BinOp) and type(node.op) in _SAFE_MATH_BIN_OPS:
            return _SAFE_MATH_BIN_OPS[type(node.op)](_eval(node.left), _eval(node.right))
        if isinstance(node, ast.UnaryOp) and type(node.op) in _SAFE_MATH_UNARY_OPS:
            return _SAFE_MATH_UNARY_OPS[type(node.op)](_eval(node.operand))
        raise ValueError("Unsupported expression")

    return _eval(tree)


@tool
def calculate(expression: str) -> str:
    """Evaluate a mathematical expression safely."""
    try:
        return str(_safe_math_eval(expression))
    except (ValueError, SyntaxError, ZeroDivisionError):
        return "Error: Invalid expression"


async def example_all_events():
    """Print every event with its distinguishing fields."""
    print("=== Part 1: Understanding Events ===\n")

    model = get_model(max_tokens=200)

    agent = Agent(
        model=model,
        tools=[calculate],
        system_prompt="You are a calculator. Always use the calculate tool for math.",
    )

    print("Running: 'What is 25 * 4?'\n")
    print("Events received:")

    async for event in agent.run("What is 25 * 4?"):
        print(f"\n  Event Type: {event.event_type}")
        print(f"  Timestamp:  {event.timestamp}")

        if isinstance(event, ThinkEvent):
            print(f"  Iteration:  {event.iteration}")
            print(f"  Tool Calls: {len(event.tool_calls)}")
            if event.reasoning:
                preview = (
                    event.reasoning[:80] + "..." if len(event.reasoning) > 80 else event.reasoning
                )
                print(f"  Reasoning:  {preview}")

        elif isinstance(event, ToolStartEvent):
            print(f"  Tool Name:  {event.tool_name}")
            print(f"  Arguments:  {event.arguments}")

        elif isinstance(event, ToolCompleteEvent):
            print(f"  Tool Name:  {event.tool_name}")
            print(f"  Result:     {event.result}")
            print(f"  Duration:   {event.duration_ms:.1f}ms")

        elif isinstance(event, TerminateEvent):
            print(f"  Reason:     {event.reason}")
            print(f"  Iterations: {event.iterations_used}")
            if event.final_message:
                print(f"  Answer:     {event.final_message}")

    print()


# =============================================================================
# Part 2: a live console UI driven by the event stream
# =============================================================================


async def example_console_ui():
    """Render a tiny progress UI as the agent thinks and calls tools."""
    print("=== Part 2: Console UI ===\n")

    model = get_model(max_tokens=200)

    @tool
    def search_database(query: str) -> str:
        """Search the internal database."""
        # Sleep so the UI has visible work to show.
        import time

        time.sleep(0.5)
        return f"Found 3 results for '{query}'"

    @tool
    def analyze_results(data: str) -> str:
        """Analyze search results."""
        return f"Analysis complete: {data} contains useful information"

    agent = Agent(
        model=model,
        tools=[search_database, analyze_results],
        system_prompt="You are a research assistant. Search and analyze data.",
    )

    print("Query: Find information about Python and analyze it\n")

    async for event in agent.run("Find information about Python and analyze it"):
        if isinstance(event, ThinkEvent):
            print("[Thinking...]")
            if event.tool_calls:
                for tc in event.tool_calls:
                    print(f"  Planning to call: {tc.name}")

        elif isinstance(event, ToolStartEvent):
            print(f"[Running] {event.tool_name}...", end=" ", flush=True)

        elif isinstance(event, ToolCompleteEvent):
            if event.error:
                print(f"ERROR: {event.error}")
            else:
                print(f"Done ({event.duration_ms:.0f}ms)")

        elif isinstance(event, TerminateEvent):
            print(f"\n[Complete] {event.reason}")
            if event.final_message:
                print(f"\nAnswer: {event.final_message}")

    print()


# =============================================================================
# Part 3: pick out just the events you care about
# =============================================================================


async def example_event_filtering():
    """Build an audit log by keeping only ToolStart/ToolComplete events."""
    print("=== Part 3: Event Filtering ===\n")

    model = get_model(max_tokens=200)

    agent = Agent(
        model=model,
        tools=[calculate],
        system_prompt="Use the calculate tool for math.",
    )

    tool_log = []

    async for event in agent.run("Calculate 10 + 20 + 30"):
        if isinstance(event, ToolStartEvent):
            tool_log.append(
                {
                    "action": "start",
                    "tool": event.tool_name,
                    "args": event.arguments,
                    "time": datetime.now().isoformat(),
                }
            )

        elif isinstance(event, ToolCompleteEvent):
            tool_log.append(
                {
                    "action": "complete",
                    "tool": event.tool_name,
                    "result": event.result,
                    "duration_ms": event.duration_ms,
                }
            )

    print("Tool execution log:")
    for entry in tool_log:
        print(f"  {entry}")
    print()


# =============================================================================
# Part 4: roll up metrics from the event stream
# =============================================================================


async def example_collect_metrics():
    """Count events and sum tool durations to get per-run telemetry."""
    print("=== Part 4: Collecting Metrics ===\n")

    model = get_model(max_tokens=200)

    @tool
    def step_one(data: str) -> str:
        """First processing step."""
        return f"Step 1 processed: {data}"

    @tool
    def step_two(data: str) -> str:
        """Second processing step."""
        return f"Step 2 processed: {data}"

    agent = Agent(
        model=model,
        tools=[step_one, step_two],
        system_prompt="Process data through step_one then step_two.",
    )

    metrics = {
        "think_events": 0,
        "tool_starts": 0,
        "tool_completes": 0,
        "total_tool_time_ms": 0,
        "iterations": 0,
    }

    start_time = datetime.now()

    async for event in agent.run("Process 'hello world' through both steps"):
        if isinstance(event, ThinkEvent):
            metrics["think_events"] += 1
        elif isinstance(event, ToolStartEvent):
            metrics["tool_starts"] += 1
        elif isinstance(event, ToolCompleteEvent):
            metrics["tool_completes"] += 1
            metrics["total_tool_time_ms"] += event.duration_ms or 0
        elif isinstance(event, TerminateEvent):
            metrics["iterations"] = event.iterations_used

    elapsed = (datetime.now() - start_time).total_seconds() * 1000

    print("Execution Metrics:")
    print(f"  Think events:    {metrics['think_events']}")
    print(f"  Tool starts:     {metrics['tool_starts']}")
    print(f"  Tool completes:  {metrics['tool_completes']}")
    print(f"  Tool time:       {metrics['total_tool_time_ms']:.1f}ms")
    print(f"  Iterations:      {metrics['iterations']}")
    print(f"  Total time:      {elapsed:.1f}ms")
    print()


# =============================================================================
# Part 5: a progress bar from ToolCompleteEvent
# =============================================================================


async def example_progress_tracking():
    """Draw a textual progress bar as each tool completes."""
    print("=== Part 5: Progress Tracking ===\n")

    model = get_model(max_tokens=300)

    # Deterministic tool bodies: fetch_data returns a real summary of an
    # in-memory source, process_data hashes its input. No canned strings.
    sources = {
        "users": [
            {"id": 1, "name": "Alice", "role": "engineer"},
            {"id": 2, "name": "Bob", "role": "designer"},
            {"id": 3, "name": "Charlie", "role": "researcher"},
        ],
        "orders": [
            {"id": 101, "user_id": 1, "total": 49.99},
            {"id": 102, "user_id": 2, "total": 19.99},
        ],
    }

    @tool
    def fetch_data(source: str) -> str:
        """Fetch records from a named in-memory source (users, orders, ...)."""
        import json
        import time

        time.sleep(0.3)
        rows = sources.get(source.lower())
        if rows is None:
            return f"unknown source {source!r}; try one of: {sorted(sources)}"
        return f"{len(rows)} record(s) from {source}: {json.dumps(rows)}"

    @tool
    def process_data(data: str) -> str:
        """Process fetched data — counts records and emits a checksum."""
        import hashlib
        import time

        time.sleep(0.2)
        digest = hashlib.sha256(data.encode()).hexdigest()[:12]
        return f"processed {len(data)} bytes, sha256:{digest}"

    @tool
    def store_results(results: str) -> str:
        """Store processed results."""
        import time

        time.sleep(0.1)
        return "Results stored successfully"

    agent = Agent(
        model=model,
        tools=[fetch_data, process_data, store_results],
        system_prompt="Fetch data from 'api', process it, and store the results.",
    )

    steps_done = 0
    total_steps = 3  # we expect three tool calls: fetch, process, store

    print("Processing...")

    async for event in agent.run("Fetch, process, and store data from the API"):
        if isinstance(event, ToolCompleteEvent):
            steps_done += 1
            progress = (steps_done / total_steps) * 100
            bar = "#" * int(progress / 10) + "-" * (10 - int(progress / 10))
            print(f"  [{bar}] {progress:.0f}% - {event.tool_name} complete")

        elif isinstance(event, TerminateEvent):
            print(f"\nDone! Final message: {event.final_message}")

    print()


# =============================================================================
# Main
# =============================================================================


def main():
    """Run all notebook parts."""
    print("=" * 60)
    print("Notebook 11: Agent Streaming & Events")
    print("=" * 60)
    print()

    print_config()
    print()

    asyncio.run(example_all_events())
    asyncio.run(example_console_ui())
    asyncio.run(example_event_filtering())
    asyncio.run(example_collect_metrics())
    asyncio.run(example_progress_tracking())

    # =========================================================================
    # See also: structured streaming
    # =========================================================================
    print("=== See also: StructuredStream ===\n")
    print(
        "Want incremental Pydantic instances instead of raw chunks?\n"
        "Wrap any agent stream with StructuredStream:\n"
    )
    print(
        """    from locus.streaming import StructuredStream

    stream = StructuredStream(agent.run("Top 3 vendors."), schema=VendorList)
    async for partial in stream:
        ui.render(partial)               # may have 0, 1, 2, then 3 vendors
    final: VendorList | None = stream.final
"""
    )
    print(
        "Locus buffers each ModelChunkEvent, closes any unbalanced JSON\n"
        "braces / brackets / strings, runs the result through\n"
        "schema.model_validate, and yields the parsed instance if it\n"
        "validates.\n"
    )

    print("=" * 60)
    print("Next: Notebook 12 — Agent Hooks")
    print("=" * 60)


if __name__ == "__main__":
    main()