Skip to content

Observability Basics

Locus ships an in-process pub/sub EventBus that publishes typed StreamEvents for every meaningful step of execution: agent thinking, tool calls, model completions, token usage, multi-agent fan-outs, checkpoints — all under one canonical event_type per component.

Telemetry is opt-in. Code that never enters a run_context pays one ContextVar.get() per emission site — no bus, no events, no allocations.

Pipeline::

with run_context() as rid:        ← activates emission; generates run_id
     │
     │  agent.run_sync(…)
     │      │
     │      ├─ agent.think         ← one per ReAct iteration
     │      ├─ agent.tool.started  ┐ span_id ties the pair
     │      ├─ agent.tool.completed┘
     │      ├─ agent.tokens.used   ← per model call (cost meter)
     │      └─ agent.terminate
     │
     └─ bus.subscribe(rid)         ← history replay + live stream
  • Run an Agent with no telemetry (the SDK-default path).
  • Wrap the same call in run_context() and subscribe to the bus.
  • The canonical events: agent.think, agent.tool.started/completed, agent.tokens.used, agent.terminate.
  • Read the per-run history buffer after the fact (replay semantics for late subscribers).

Run it (OCI Generative AI is the default; auto-detected from ~/.oci/config):

python examples/notebook_59_observability_basics.py

Offline:

LOCUS_MODEL_PROVIDER=mock python examples/notebook_59_observability_basics.py

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Notebook 54: Observability basics — opt-in SSE telemetry.

Locus ships an in-process pub/sub EventBus that publishes typed
StreamEvents for every meaningful step of execution: agent thinking,
tool calls, model completions, token usage, multi-agent fan-outs,
checkpoints — all under one canonical event_type per component.

Telemetry is opt-in. Code that never enters a run_context pays one
ContextVar.get() per emission site — no bus, no events, no
allocations.

- Run an Agent with no telemetry: the SDK-default path.
- Wrap the same call in run_context() and subscribe to the bus.
- The canonical events: agent.think, agent.tool.started/completed,
  agent.tokens.used, agent.terminate.
- Read the per-run history buffer after the fact (replay semantics for
  late subscribers).

Run it
    # Default: OCI Generative AI auto-detected from ~/.oci/config
    python examples/notebook_59_observability_basics.py

    # Offline / no credentials:
    LOCUS_MODEL_PROVIDER=mock python examples/notebook_59_observability_basics.py
"""

from __future__ import annotations

import asyncio

from config import get_model

from locus.agent import Agent
from locus.observability import get_event_bus, run_context


# Part 1: no telemetry. The bus singleton never gets built.


async def part1_no_telemetry() -> None:
    """No run_context: every emit short-circuits before allocating anything."""
    print("\n--- Part 1: no run_context — bus stays uninstantiated ---")

    agent = Agent(model=get_model(), max_iterations=2)
    result = agent.run_sync("In one sentence, what is locus?")
    print("agent reply:", result.message[:120])

    # Probe the module-level singleton to prove it was never built.
    from locus.observability import event_bus as _bus_mod

    assert _bus_mod._event_bus is None, (
        "running an Agent without run_context must not construct the bus"
    )
    print("bus singleton is None — zero allocations spent on telemetry")


# Part 2: opt in by wrapping the call in run_context(); subscribe to the
# bus and watch every event_type land.


async def part2_subscribe() -> None:
    """Same Agent.run, this time inside a run_context."""
    print("\n--- Part 2: run_context active — subscribe to the bus ---")

    agent = Agent(model=get_model(), max_iterations=2)
    seen: list[str] = []

    async with run_context() as rid:
        bus = get_event_bus()

        async def consumer() -> None:
            async for ev in bus.subscribe(rid):
                seen.append(ev.event_type)
                if ev.event_type == "agent.terminate":
                    return

        consumer_task = asyncio.create_task(consumer())
        await asyncio.sleep(0)  # let the subscriber register

        # The contextvar set by run_context() means the @_bus_bridge
        # decorator on Agent.run forwards every yielded LocusEvent to
        # the bus automatically.
        result = await asyncio.to_thread(agent.run_sync, "Reply with the single word: hello")
        print("agent reply:", result.message[:120])

        await asyncio.wait_for(consumer_task, timeout=10.0)
        # Closing the stream ends any other subscribers cleanly.
        await bus.close_stream(rid)

    print(f"events seen ({len(seen)}):")
    for e in seen:
        print(f"  - {e}")


# Part 3: late subscribers see the whole run replayed from history
# (capped at 500 events x 200 retained runs).


async def part3_history_replay() -> None:
    """Subscribe after the run finished; the history deque replays."""
    print("\n--- Part 3: late subscriber — history replay ---")

    agent = Agent(model=get_model(), max_iterations=2)

    async with run_context() as rid:
        bus = get_event_bus()
        # Run first; close the stream; subscribe second.
        await asyncio.to_thread(agent.run_sync, "Reply: ok")
        await bus.close_stream(rid)

        replayed: list[str] = []
        async for ev in bus.subscribe(rid):
            replayed.append(ev.event_type)

    print(f"replayed {len(replayed)} events from history (after the run finished)")
    for e in replayed[:10]:
        print(f"  - {e}")
    if len(replayed) > 10:
        print(f"  ... +{len(replayed) - 10} more")


async def main() -> None:
    await part1_no_telemetry()
    await part2_subscribe()
    await part3_history_replay()


if __name__ == "__main__":
    asyncio.run(main())