Skip to content

Tutorial 52: Observability basics — opt-in SSE telemetry

Locus ships an in-process pub/sub EventBus that publishes typed StreamEvents for every meaningful step of execution: agent thinking, tool calls, model completions, token usage, multi-agent fan-outs, checkpoints — all under one canonical event_type per component.

Telemetry is opt-in. SDK users who never enter a run_context pay one ContextVar.get() per emission site — no bus, no events, no allocations.

Pipeline::

with run_context() as rid:        ← activates emission; generates run_id
     │
     │  agent.run_sync(…)
     │      │
     │      ├─ agent.think         ← one per ReAct iteration
     │      ├─ agent.tool.started  ┐ span_id ties the pair
     │      ├─ agent.tool.completed┘
     │      ├─ agent.tokens.used   ← per model call (cost meter)
     │      └─ agent.terminate
     │
     └─ bus.subscribe(rid)         ← history replay + live stream

This tutorial covers:

  1. Running an Agent with no telemetry (the SDK-default path).
  2. Wrapping the same call in run_context() and subscribing to the bus to see the full inner cognition.
  3. The shape of the canonical events: agent.think, agent.tool.started/completed, agent.tokens.used, agent.terminate.
  4. Reading the per-run history buffer after the fact (replay semantics for late subscribers).

Why this is differentiated:

  • Zero cost when unused — the bus singleton is never instantiated unless a run_context is active, so production agents carry no telemetry overhead until you opt in.
  • History replay: bus.subscribe(run_id) delivers the last 500 events for the run before switching to live mode, so a subscriber that joins mid-run doesn't miss the beginning.
  • One import, one context manager — no hooks to register, no config to thread through the agent constructor.

Run::

python examples/tutorial_52_observability_basics.py

Difficulty: Beginner Prerequisites: tutorial_01_basic_agent (basic Agent)

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Tutorial 52: observability basics — opt-in SSE telemetry.

Locus ships an in-process pub/sub ``EventBus`` that publishes typed
``StreamEvent``s for every meaningful step of execution: agent
thinking, tool calls, model completions, token usage, multi-agent
fan-outs, checkpoints — all under one canonical event_type per
component.

Telemetry is **opt-in**. SDK users who never enter a ``run_context``
pay one ``ContextVar.get()`` per emission site — no bus, no events,
no allocations.

This tutorial covers:

1. Running an Agent with no telemetry (the SDK-default path).
2. Wrapping the same call in ``run_context()`` and subscribing to
   the bus to see the full inner cognition.
3. The shape of the canonical events: ``agent.think``,
   ``agent.tool.started/completed``, ``agent.tokens.used``,
   ``agent.terminate``.
4. Reading the per-run history buffer after the fact (replay
   semantics for late subscribers).

Difficulty: beginner. Prerequisites: tutorial 01 (basic Agent).

Run with:
    python examples/tutorial_52_observability_basics.py
"""

from __future__ import annotations

import asyncio

from config import get_model

from locus import Agent
from locus.observability import get_event_bus, run_context


# =============================================================================
# Part 1 — running without telemetry (the no-op path)
# =============================================================================


async def part1_no_telemetry() -> None:
    """No ``run_context``: every emit short-circuits before allocating
    anything. The bus singleton is never instantiated."""
    print("\n--- Part 1: no run_context — bus stays uninstantiated ---")

    agent = Agent(model=get_model(), max_iterations=2)
    result = agent.run_sync("In one sentence, what is locus?")
    print("agent reply:", result.message[:120])

    # Probe the bus internals to prove the singleton was never built.
    from locus.observability import event_bus as _bus_mod

    assert _bus_mod._event_bus is None, (
        "running an Agent without run_context must not construct the bus"
    )
    print("bus singleton is None — zero allocations spent on telemetry")


# =============================================================================
# Part 2 — opt in via run_context, subscribe, see everything
# =============================================================================


async def part2_subscribe() -> None:
    """Same Agent.run, this time inside a ``run_context``. We subscribe
    in parallel and print every event_type that lands."""
    print("\n--- Part 2: run_context active — subscribe to the bus ---")

    agent = Agent(model=get_model(), max_iterations=2)
    seen: list[str] = []

    async with run_context() as rid:
        bus = get_event_bus()

        async def consumer() -> None:
            async for ev in bus.subscribe(rid):
                seen.append(ev.event_type)
                if ev.event_type == "agent.terminate":
                    return

        consumer_task = asyncio.create_task(consumer())
        await asyncio.sleep(0)  # let the subscriber register

        # Drive the agent on the same loop. Because the contextvar
        # was set by run_context(), every yielded LocusEvent is bridged
        # to the bus by the @_bus_bridge decorator on Agent.run.
        result = await asyncio.to_thread(agent.run_sync, "Reply with the single word: hello")
        print("agent reply:", result.message[:120])

        await asyncio.wait_for(consumer_task, timeout=10.0)
        # Closing the stream ends any other subscribers cleanly.
        await bus.close_stream(rid)

    print(f"events seen ({len(seen)}):")
    for e in seen:
        print(f"  - {e}")


# =============================================================================
# Part 3 — late subscribers + history replay
# =============================================================================


async def part3_history_replay() -> None:
    """Subscribe AFTER the run finished. The bus's per-run history
    deque (cap 500 events × 200 retained runs) replays everything to
    the late subscriber, then closes cleanly."""
    print("\n--- Part 3: late subscriber — history replay ---")

    agent = Agent(model=get_model(), max_iterations=2)

    async with run_context() as rid:
        bus = get_event_bus()
        # Run first; subscribe second.
        await asyncio.to_thread(agent.run_sync, "Reply: ok")
        await bus.close_stream(rid)

        replayed: list[str] = []
        async for ev in bus.subscribe(rid):
            replayed.append(ev.event_type)

    print(f"replayed {len(replayed)} events from history (after the run finished)")
    for e in replayed[:10]:
        print(f"  - {e}")
    if len(replayed) > 10:
        print(f"  ... +{len(replayed) - 10} more")


# =============================================================================
# Main
# =============================================================================


async def main() -> None:
    await part1_no_telemetry()
    await part2_subscribe()
    await part3_history_replay()


if __name__ == "__main__":
    asyncio.run(main())