Observability Basics¶
Locus ships an in-process pub/sub EventBus that publishes typed
StreamEvents for every meaningful step of execution: agent thinking,
tool calls, model completions, token usage, multi-agent fan-outs,
checkpoints — all under one canonical event_type per component.
Telemetry is opt-in. Code that never enters a run_context pays one
ContextVar.get() per emission site — no bus, no events, no
allocations.
Pipeline::
with run_context() as rid: ← activates emission; generates run_id
│
│ agent.run_sync(…)
│ │
│ ├─ agent.think ← one per ReAct iteration
│ ├─ agent.tool.started ┐ span_id ties the pair
│ ├─ agent.tool.completed┘
│ ├─ agent.tokens.used ← per model call (cost meter)
│ └─ agent.terminate
│
└─ bus.subscribe(rid) ← history replay + live stream
- Run an Agent with no telemetry (the SDK-default path).
- Wrap the same call in
run_context()and subscribe to the bus. - The canonical events:
agent.think,agent.tool.started/completed,agent.tokens.used,agent.terminate. - Read the per-run history buffer after the fact (replay semantics for late subscribers).
Run it (OCI Generative AI is the default; auto-detected from ~/.oci/config):
python examples/notebook_59_observability_basics.py
Offline:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_59_observability_basics.py
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Notebook 54: Observability basics — opt-in SSE telemetry.
Locus ships an in-process pub/sub EventBus that publishes typed
StreamEvents for every meaningful step of execution: agent thinking,
tool calls, model completions, token usage, multi-agent fan-outs,
checkpoints — all under one canonical event_type per component.
Telemetry is opt-in. Code that never enters a run_context pays one
ContextVar.get() per emission site — no bus, no events, no
allocations.
- Run an Agent with no telemetry: the SDK-default path.
- Wrap the same call in run_context() and subscribe to the bus.
- The canonical events: agent.think, agent.tool.started/completed,
agent.tokens.used, agent.terminate.
- Read the per-run history buffer after the fact (replay semantics for
late subscribers).
Run it
# Default: OCI Generative AI auto-detected from ~/.oci/config
python examples/notebook_59_observability_basics.py
# Offline / no credentials:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_59_observability_basics.py
"""
from __future__ import annotations
import asyncio
from config import get_model
from locus.agent import Agent
from locus.observability import get_event_bus, run_context
# Part 1: no telemetry. The bus singleton never gets built.
async def part1_no_telemetry() -> None:
"""No run_context: every emit short-circuits before allocating anything."""
print("\n--- Part 1: no run_context — bus stays uninstantiated ---")
agent = Agent(model=get_model(), max_iterations=2)
result = agent.run_sync("In one sentence, what is locus?")
print("agent reply:", result.message[:120])
# Probe the module-level singleton to prove it was never built.
from locus.observability import event_bus as _bus_mod
assert _bus_mod._event_bus is None, (
"running an Agent without run_context must not construct the bus"
)
print("bus singleton is None — zero allocations spent on telemetry")
# Part 2: opt in by wrapping the call in run_context(); subscribe to the
# bus and watch every event_type land.
async def part2_subscribe() -> None:
"""Same Agent.run, this time inside a run_context."""
print("\n--- Part 2: run_context active — subscribe to the bus ---")
agent = Agent(model=get_model(), max_iterations=2)
seen: list[str] = []
async with run_context() as rid:
bus = get_event_bus()
async def consumer() -> None:
async for ev in bus.subscribe(rid):
seen.append(ev.event_type)
if ev.event_type == "agent.terminate":
return
consumer_task = asyncio.create_task(consumer())
await asyncio.sleep(0) # let the subscriber register
# The contextvar set by run_context() means the @_bus_bridge
# decorator on Agent.run forwards every yielded LocusEvent to
# the bus automatically.
result = await asyncio.to_thread(agent.run_sync, "Reply with the single word: hello")
print("agent reply:", result.message[:120])
await asyncio.wait_for(consumer_task, timeout=10.0)
# Closing the stream ends any other subscribers cleanly.
await bus.close_stream(rid)
print(f"events seen ({len(seen)}):")
for e in seen:
print(f" - {e}")
# Part 3: late subscribers see the whole run replayed from history
# (capped at 500 events x 200 retained runs).
async def part3_history_replay() -> None:
"""Subscribe after the run finished; the history deque replays."""
print("\n--- Part 3: late subscriber — history replay ---")
agent = Agent(model=get_model(), max_iterations=2)
async with run_context() as rid:
bus = get_event_bus()
# Run first; close the stream; subscribe second.
await asyncio.to_thread(agent.run_sync, "Reply: ok")
await bus.close_stream(rid)
replayed: list[str] = []
async for ev in bus.subscribe(rid):
replayed.append(ev.event_type)
print(f"replayed {len(replayed)} events from history (after the run finished)")
for e in replayed[:10]:
print(f" - {e}")
if len(replayed) > 10:
print(f" ... +{len(replayed) - 10} more")
async def main() -> None:
await part1_no_telemetry()
await part2_subscribe()
await part3_history_replay()
if __name__ == "__main__":
asyncio.run(main())