Agent Yield Bridge¶
Every Agent.run is decorated with @_bus_bridge so the nine typed
events it yields get republished on the bus as agent.* events when a
run_context is open. No hook registration, no config flag — the
bridge is always there; it only fires when telemetry is active.
Event mapping::
LocusEvent (inner stream) → bus event_type
─────────────────────────────────────────────────
ThinkEvent → agent.think
ToolStartEvent → agent.tool.started ┐ share span_id
ToolCompleteEvent → agent.tool.completed ┘
ReflectEvent → agent.reflect
GroundingEvent → agent.grounding
ModelChunkEvent → agent.model.chunk (streaming)
ModelCompleteEvent → agent.model.completed
+ agent.tokens.used (extra event)
InterruptEvent → agent.interrupt
TerminateEvent → agent.terminate
- How nine yielded
LocusEventtypes map toagent.*bus events. - Tool-call telemetry with
span_idpairing —agent.tool.startedandagent.tool.completedshare an id so consumers can compute durations without subtracting timestamps. - Token usage from
result.metrics— the canonical source for cost meters and budget enforcers.
Run it (OCI Generative AI is the default; auto-detected from ~/.oci/config):
python examples/notebook_60_agent_yield_bridge.py
Offline:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_60_agent_yield_bridge.py
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Notebook 55: Agent yield bridge and token usage.
Every Agent.run is decorated with @_bus_bridge so the nine typed events
it yields (ThinkEvent, ToolStartEvent, ToolCompleteEvent, ReflectEvent,
GroundingEvent, ModelChunkEvent, ModelCompleteEvent, InterruptEvent,
TerminateEvent) get republished on the bus as agent.* events when a
run_context is open. ModelCompleteEvent additionally fires
agent.tokens.used so cost dashboards subscribe without parsing the
completion payload.
- How nine yielded LocusEvent types map to agent.* bus events.
- Tool-call telemetry with span_id pairing (agent.tool.started and
agent.tool.completed share an id).
- Token usage from result.metrics — the canonical source for cost
meters and budget enforcers.
Run it
# Default: OCI Generative AI auto-detected from ~/.oci/config
python examples/notebook_60_agent_yield_bridge.py
# Offline / no credentials:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_60_agent_yield_bridge.py
"""
from __future__ import annotations
import asyncio
from config import get_model
from locus.agent import Agent
from locus.observability import get_event_bus, run_context
from locus.tools import tool
@tool
def add_numbers(a: int, b: int) -> int:
"""Return the sum of two integers."""
return a + b
@tool
def multiply_numbers(a: int, b: int) -> int:
"""Return the product of two integers."""
return a * b
# Part 1: full agent.* lifecycle on one tool-using run. Print every
# event with its span_id so you can see start/complete pairing.
async def part1_full_lifecycle() -> None:
print("\n--- Part 1: full agent.* lifecycle ---")
agent = Agent(
model=get_model(),
tools=[add_numbers, multiply_numbers],
max_iterations=4,
system_prompt=(
"You answer with one tool call at a time. After all tool calls, give the final answer."
),
)
async with run_context() as rid:
bus = get_event_bus()
async def consumer() -> None:
async for ev in bus.subscribe(rid):
span = ev.data.get("span_id", "")
tag = f" span={span[:8]}" if span else ""
if ev.event_type.startswith("agent."):
print(f" {ev.event_type}{tag}")
if ev.event_type == "agent.terminate":
return
consumer_task = asyncio.create_task(consumer())
await asyncio.sleep(0)
result = None
async for event in agent.run("Compute (3 + 4) and then (5 * 7), and tell me both."):
from locus.core.events import TerminateEvent
if isinstance(event, TerminateEvent):
result = event
print(f"agent reply: {result.final_message[:160] if result else '(no reply)'}")
await asyncio.wait_for(consumer_task, timeout=20.0)
await bus.close_stream(rid)
# Part 2: token usage as a cost meter. result.metrics is authoritative;
# agent.tokens.used SSE events are for streaming consumers that want
# per-call deltas instead of the final total.
async def part2_token_meter() -> None:
print("\n--- Part 2: token meter via result.metrics ---")
running_prompt = running_completion = running_total = 0
# Multi-run session: accumulate token totals across calls.
prompts = [
"In one sentence: what is JSON?",
"In one sentence: what is a REST API?",
]
for prompt in prompts:
agent = Agent(model=get_model(), max_iterations=2)
result = agent.run_sync(prompt)
m = result.metrics
running_prompt += m.prompt_tokens
running_completion += m.completion_tokens
running_total += m.total_tokens
print(
f" run: prompt={m.prompt_tokens:4d} "
f"completion={m.completion_tokens:3d} "
f"total={m.total_tokens:4d} | '{prompt[:40]}'"
)
print(
f" ─── session total: prompt={running_prompt} "
f"completion={running_completion} total={running_total}"
)
async def main() -> None:
await part1_full_lifecycle()
await part2_token_meter()
if __name__ == "__main__":
asyncio.run(main())