Tutorial 04: Agent Streaming & Events¶
This tutorial covers:
- Understanding Locus events
- Streaming agent execution
- Building a real-time console UI
- Custom event handlers
Prerequisites: Tutorial 03 (Agent Memory) Difficulty: Intermediate
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 04: Agent Streaming & Events
This tutorial covers:
- Understanding Locus events
- Streaming agent execution
- Building a real-time console UI
- Custom event handlers
Prerequisites: Tutorial 03 (Agent Memory)
Difficulty: Intermediate
"""
import ast
import asyncio
import operator as _op
from datetime import datetime
# Import shared config
from config import get_model, print_config
from locus.agent import Agent
from locus.core.events import (
TerminateEvent,
ThinkEvent,
ToolCompleteEvent,
ToolStartEvent,
)
from locus.tools import tool
# =============================================================================
# Part 1: Understanding Events
# =============================================================================
_SAFE_MATH_BIN_OPS = {
ast.Add: _op.add,
ast.Sub: _op.sub,
ast.Mult: _op.mul,
ast.Div: _op.truediv,
ast.FloorDiv: _op.floordiv,
ast.Mod: _op.mod,
ast.Pow: _op.pow,
}
_SAFE_MATH_UNARY_OPS = {ast.USub: _op.neg, ast.UAdd: _op.pos}
def _safe_math_eval(expression: str) -> float:
"""AST-based evaluator for arithmetic expressions. No names, calls, or attribute access."""
tree = ast.parse(expression, mode="eval")
def _eval(node: ast.AST) -> float:
if isinstance(node, ast.Expression):
return _eval(node.body)
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.BinOp) and type(node.op) in _SAFE_MATH_BIN_OPS:
return _SAFE_MATH_BIN_OPS[type(node.op)](_eval(node.left), _eval(node.right))
if isinstance(node, ast.UnaryOp) and type(node.op) in _SAFE_MATH_UNARY_OPS:
return _SAFE_MATH_UNARY_OPS[type(node.op)](_eval(node.operand))
raise ValueError("Unsupported expression")
return _eval(tree)
@tool
def calculate(expression: str) -> str:
"""Evaluate a mathematical expression safely."""
try:
return str(_safe_math_eval(expression))
except (ValueError, SyntaxError, ZeroDivisionError):
return "Error: Invalid expression"
async def example_all_events():
"""See all event types during execution."""
print("=== Part 1: Understanding Events ===\n")
model = get_model(max_tokens=200)
agent = Agent(
model=model,
tools=[calculate],
system_prompt="You are a calculator. Always use the calculate tool for math.",
)
print("Running: 'What is 25 * 4?'\n")
print("Events received:")
async for event in agent.run("What is 25 * 4?"):
print(f"\n Event Type: {event.event_type}")
print(f" Timestamp: {event.timestamp}")
if isinstance(event, ThinkEvent):
print(f" Iteration: {event.iteration}")
print(f" Tool Calls: {len(event.tool_calls)}")
if event.reasoning:
preview = (
event.reasoning[:80] + "..." if len(event.reasoning) > 80 else event.reasoning
)
print(f" Reasoning: {preview}")
elif isinstance(event, ToolStartEvent):
print(f" Tool Name: {event.tool_name}")
print(f" Arguments: {event.arguments}")
elif isinstance(event, ToolCompleteEvent):
print(f" Tool Name: {event.tool_name}")
print(f" Result: {event.result}")
print(f" Duration: {event.duration_ms:.1f}ms")
elif isinstance(event, TerminateEvent):
print(f" Reason: {event.reason}")
print(f" Iterations: {event.iterations_used}")
if event.final_message:
print(f" Answer: {event.final_message}")
print()
# =============================================================================
# Part 2: Building a Console UI
# =============================================================================
async def example_console_ui():
"""Build a real-time console interface."""
print("=== Part 2: Console UI ===\n")
model = get_model(max_tokens=200)
@tool
def search_database(query: str) -> str:
"""Search the internal database."""
# Simulate slow search
import time
time.sleep(0.5)
return f"Found 3 results for '{query}'"
@tool
def analyze_results(data: str) -> str:
"""Analyze search results."""
return f"Analysis complete: {data} contains useful information"
agent = Agent(
model=model,
tools=[search_database, analyze_results],
system_prompt="You are a research assistant. Search and analyze data.",
)
print("Query: Find information about Python and analyze it\n")
async for event in agent.run("Find information about Python and analyze it"):
if isinstance(event, ThinkEvent):
print("[Thinking...]")
if event.tool_calls:
for tc in event.tool_calls:
print(f" Planning to call: {tc.name}")
elif isinstance(event, ToolStartEvent):
print(f"[Running] {event.tool_name}...", end=" ", flush=True)
elif isinstance(event, ToolCompleteEvent):
if event.error:
print(f"ERROR: {event.error}")
else:
print(f"Done ({event.duration_ms:.0f}ms)")
elif isinstance(event, TerminateEvent):
print(f"\n[Complete] {event.reason}")
if event.final_message:
print(f"\nAnswer: {event.final_message}")
print()
# =============================================================================
# Part 3: Event Filtering
# =============================================================================
async def example_event_filtering():
"""Filter for specific event types."""
print("=== Part 3: Event Filtering ===\n")
model = get_model(max_tokens=200)
agent = Agent(
model=model,
tools=[calculate],
system_prompt="Use the calculate tool for math.",
)
# Only process tool events
tool_log = []
async for event in agent.run("Calculate 10 + 20 + 30"):
if isinstance(event, ToolStartEvent):
tool_log.append(
{
"action": "start",
"tool": event.tool_name,
"args": event.arguments,
"time": datetime.now().isoformat(),
}
)
elif isinstance(event, ToolCompleteEvent):
tool_log.append(
{
"action": "complete",
"tool": event.tool_name,
"result": event.result,
"duration_ms": event.duration_ms,
}
)
print("Tool execution log:")
for entry in tool_log:
print(f" {entry}")
print()
# =============================================================================
# Part 4: Collecting Metrics
# =============================================================================
async def example_collect_metrics():
"""Collect performance metrics during execution."""
print("=== Part 4: Collecting Metrics ===\n")
model = get_model(max_tokens=200)
@tool
def step_one(data: str) -> str:
"""First processing step."""
return f"Step 1 processed: {data}"
@tool
def step_two(data: str) -> str:
"""Second processing step."""
return f"Step 2 processed: {data}"
agent = Agent(
model=model,
tools=[step_one, step_two],
system_prompt="Process data through step_one then step_two.",
)
# Metrics collectors
metrics = {
"think_events": 0,
"tool_starts": 0,
"tool_completes": 0,
"total_tool_time_ms": 0,
"iterations": 0,
}
start_time = datetime.now()
async for event in agent.run("Process 'hello world' through both steps"):
if isinstance(event, ThinkEvent):
metrics["think_events"] += 1
elif isinstance(event, ToolStartEvent):
metrics["tool_starts"] += 1
elif isinstance(event, ToolCompleteEvent):
metrics["tool_completes"] += 1
metrics["total_tool_time_ms"] += event.duration_ms or 0
elif isinstance(event, TerminateEvent):
metrics["iterations"] = event.iterations_used
elapsed = (datetime.now() - start_time).total_seconds() * 1000
print("Execution Metrics:")
print(f" Think events: {metrics['think_events']}")
print(f" Tool starts: {metrics['tool_starts']}")
print(f" Tool completes: {metrics['tool_completes']}")
print(f" Tool time: {metrics['total_tool_time_ms']:.1f}ms")
print(f" Iterations: {metrics['iterations']}")
print(f" Total time: {elapsed:.1f}ms")
print()
# =============================================================================
# Part 5: Progress Tracking
# =============================================================================
async def example_progress_tracking():
"""Show progress during long operations."""
print("=== Part 5: Progress Tracking ===\n")
model = get_model(max_tokens=300)
# Real, deterministic tool implementations — no canned strings.
# fetch_data simulates network latency but returns a real summary
# of the requested source. process_data does an actual hash so the
# result is reproducible from the input.
sources = {
"users": [
{"id": 1, "name": "Alice", "role": "engineer"},
{"id": 2, "name": "Bob", "role": "designer"},
{"id": 3, "name": "Charlie", "role": "researcher"},
],
"orders": [
{"id": 101, "user_id": 1, "total": 49.99},
{"id": 102, "user_id": 2, "total": 19.99},
],
}
@tool
def fetch_data(source: str) -> str:
"""Fetch records from a named in-memory source (users, orders, ...)."""
import json
import time
time.sleep(0.3)
rows = sources.get(source.lower())
if rows is None:
return f"unknown source {source!r}; try one of: {sorted(sources)}"
return f"{len(rows)} record(s) from {source}: {json.dumps(rows)}"
@tool
def process_data(data: str) -> str:
"""Process fetched data — counts records and emits a checksum."""
import hashlib
import time
time.sleep(0.2)
digest = hashlib.sha256(data.encode()).hexdigest()[:12]
return f"processed {len(data)} bytes, sha256:{digest}"
@tool
def store_results(results: str) -> str:
"""Store processed results."""
import time
time.sleep(0.1)
return "Results stored successfully"
agent = Agent(
model=model,
tools=[fetch_data, process_data, store_results],
system_prompt="Fetch data from 'api', process it, and store the results.",
)
steps_done = 0
total_steps = 3 # Expected number of tool calls
print("Processing...")
async for event in agent.run("Fetch, process, and store data from the API"):
if isinstance(event, ToolCompleteEvent):
steps_done += 1
progress = (steps_done / total_steps) * 100
bar = "#" * int(progress / 10) + "-" * (10 - int(progress / 10))
print(f" [{bar}] {progress:.0f}% - {event.tool_name} complete")
elif isinstance(event, TerminateEvent):
print(f"\nDone! Final message: {event.final_message}")
print()
# =============================================================================
# Main
# =============================================================================
def main():
"""Run all tutorial parts."""
print("=" * 60)
print("Tutorial 04: Agent Streaming & Events")
print("=" * 60)
print()
print_config()
print()
asyncio.run(example_all_events())
asyncio.run(example_console_ui())
asyncio.run(example_event_filtering())
asyncio.run(example_collect_metrics())
asyncio.run(example_progress_tracking())
# =========================================================================
# See also: structured streaming
# =========================================================================
print("=== See also: StructuredStream ===\n")
print(
"When you want incremental Pydantic instances during streaming\n"
"(not raw chunks), wrap any event iterator with StructuredStream:\n"
)
print(
""" from locus.streaming import StructuredStream
stream = StructuredStream(agent.run("Top 3 vendors."), schema=VendorList)
async for partial in stream:
ui.render(partial) # may have 0, 1, 2, then 3 vendors
final: VendorList | None = stream.final
"""
)
print(
"Each ModelChunkEvent is appended to a buffer; locus auto-closes\n"
"any unbalanced braces / brackets / strings, runs the result\n"
"through schema.model_validate, and yields the parsed instance\n"
"if it succeeds.\n"
)
print("=" * 60)
print("Next: Tutorial 05 - Agent Hooks")
print("=" * 60)
if __name__ == "__main__":
main()