Skip to content

Observability

Centralised in-process event bus + SSE telemetry. The EventBus is a singleton that fans StreamEvent instances scoped to a run id out to multiple consumers (Web SSE, CLI tail, JSON logs) from a single emission point.

Publishers (router, agent loop hooks, custom user code) call emit() or bus.publish(); subscribers consume via bus.subscribe(run_id). The workbench's SSE endpoint at /api/events/{run_id} is the public HTTP wrapper around EventBus.subscribe.

For the agent's typed LocusEvent stream (the thing async for event in agent.run(...) yields) see Events; this page covers the lower-level observability bus that LocusEvents are mirrored onto for cross-process consumption.

Event bus

EventBus

EventBus(*, max_queue_size: int = _DEFAULT_QUEUE_SIZE, history_per_run: int = _DEFAULT_HISTORY_PER_RUN, max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED)

In-process pub/sub bus.

Thread-safe under asyncio (a single asyncio.Lock guards all queue mutations). Not safe across processes — for multi-worker deployments, swap the implementation for one backed by Redis / NATS / similar.

Source code in src/locus/observability/event_bus.py
def __init__(
    self,
    *,
    max_queue_size: int = _DEFAULT_QUEUE_SIZE,
    history_per_run: int = _DEFAULT_HISTORY_PER_RUN,
    max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED,
) -> None:
    self._max_queue_size = max_queue_size
    self._history_per_run = history_per_run
    self._max_runs_retained = max_runs_retained

    self._lock = asyncio.Lock()

    # Per-run subscribers: each subscriber owns one bounded queue.
    # Sentinel ``None`` on the queue terminates the subscriber loop.
    self._queues: dict[str, list[asyncio.Queue[StreamEvent | None]]] = defaultdict(list)

    # Global subscribers fire on every event regardless of run_id.
    # Capped to prevent unbounded growth in monitoring deployments.
    self._global_subscribers: list[asyncio.Queue[StreamEvent | None]] = []
    self._max_global_subscribers = 50

    # History buffer for late-joining subscribers — they get the
    # last N events for the run on connect, then live events.
    self._history: dict[str, deque[StreamEvent]] = {}

    # Runs that have already had ``close_stream`` called. A late
    # subscriber for a closed run gets the history + immediate
    # sentinel, so the iterator terminates cleanly instead of
    # hanging on a queue no one publishes to.
    self._closed_runs: set[str] = set()

    # Drop counter for tests + diagnostics.
    self._dropped_events = 0

    # Optional reference to the asyncio loop the bus runs on. Set
    # lazily by ``emit()`` / ``emit_sync()`` on first use so worker
    # threads (the ``@tool`` decorator's executor, ``asyncio.to_thread``
    # callers) can reach back to the right loop via
    # ``run_coroutine_threadsafe``. Never set unless someone actually
    # publishes — preserves the SDK-without-SSE invariant.
    self._owner_loop: asyncio.AbstractEventLoop | None = None

publish async

publish(event: StreamEvent) -> None

Deliver event to every subscriber on its run + global.

Non-blocking on slow consumers: each queue.put is wrapped in a wait_for with :data:_PUBLISH_TIMEOUT_SECONDS. If a subscriber's queue is saturated past the timeout, the event is dropped for that subscriber (counted in :attr:_dropped_events) and the publish continues for everyone else.

Source code in src/locus/observability/event_bus.py
async def publish(self, event: StreamEvent) -> None:
    """Deliver ``event`` to every subscriber on its run + global.

    Non-blocking on slow consumers: each queue.put is wrapped in a
    ``wait_for`` with :data:`_PUBLISH_TIMEOUT_SECONDS`. If a
    subscriber's queue is saturated past the timeout, the event is
    dropped for that subscriber (counted in
    :attr:`_dropped_events`) and the publish continues for
    everyone else.
    """
    if not event.run_id:
        logger.error(
            "EventBus: dropping event with empty run_id (event_type=%s)",
            event.event_type,
        )
        return

    async with self._lock:
        history = self._history.setdefault(event.run_id, deque(maxlen=self._history_per_run))
        history.append(event)
        self._evict_old_runs_locked()

        run_subscribers = list(self._queues.get(event.run_id, ()))
        global_subscribers = list(self._global_subscribers)

    for queue in run_subscribers:
        await self._deliver(queue, event)
    for queue in global_subscribers:
        await self._deliver(queue, event)

subscribe async

subscribe(run_id: str) -> AsyncIterator[StreamEvent]

Subscribe to a single run's events.

On connect, every event currently in history for the run is replayed in order, then live events follow. The iterator terminates when :meth:close_stream is called for the run.

Source code in src/locus/observability/event_bus.py
async def subscribe(self, run_id: str) -> AsyncIterator[StreamEvent]:
    """Subscribe to a single run's events.

    On connect, every event currently in history for the run is
    replayed in order, then live events follow. The iterator
    terminates when :meth:`close_stream` is called for the run.
    """
    queue = await self._register(run_id)
    try:
        while True:
            event = await queue.get()
            if event is None:
                return
            yield event
    finally:
        await self._unregister(run_id, queue)

subscribe_global async

subscribe_global() -> AsyncIterator[StreamEvent]

Subscribe to every event on the bus.

Useful for monitoring dashboards. Does not replay history — only live events. Capped at :attr:_max_global_subscribers concurrent subscribers; over-cap connections receive an immediate sentinel.

Source code in src/locus/observability/event_bus.py
async def subscribe_global(self) -> AsyncIterator[StreamEvent]:
    """Subscribe to every event on the bus.

    Useful for monitoring dashboards. Does not replay history —
    only live events. Capped at :attr:`_max_global_subscribers`
    concurrent subscribers; over-cap connections receive an
    immediate sentinel.
    """
    queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue(maxsize=self._max_queue_size)

    async with self._lock:
        if len(self._global_subscribers) >= self._max_global_subscribers:
            logger.warning(
                "EventBus: global subscriber cap reached (%d); rejecting new subscriber",
                self._max_global_subscribers,
            )
            return
        self._global_subscribers.append(queue)

    try:
        while True:
            event = await queue.get()
            if event is None:
                return
            yield event
    finally:
        async with self._lock:
            try:
                self._global_subscribers.remove(queue)
            except ValueError:
                pass

close_stream async

close_stream(run_id: str) -> None

Signal end-of-stream to every active subscriber of run_id.

Idempotent. Keeps the history buffer — a late subscriber joining within the retention window still sees the run's events plus an immediate sentinel terminating the iterator. Use :meth:drop_history to forcibly forget a run.

Source code in src/locus/observability/event_bus.py
async def close_stream(self, run_id: str) -> None:
    """Signal end-of-stream to every active subscriber of ``run_id``.

    Idempotent. **Keeps** the history buffer — a late subscriber
    joining within the retention window still sees the run's
    events plus an immediate sentinel terminating the iterator.
    Use :meth:`drop_history` to forcibly forget a run.
    """
    async with self._lock:
        queues = list(self._queues.get(run_id, ()))
        self._queues.pop(run_id, None)
        self._closed_runs.add(run_id)

    for queue in queues:
        try:
            queue.put_nowait(None)
        except asyncio.QueueFull:
            # Drop the oldest queued event to make room for the
            # sentinel — close must always reach the consumer.
            try:
                queue.get_nowait()
                queue.put_nowait(None)
            except (asyncio.QueueEmpty, asyncio.QueueFull):
                self._dropped_events += 1

drop_history async

drop_history(run_id: str) -> None

Forget a run's history buffer + closed-state.

Late subscribers after this call see nothing for run_id. Useful for explicit cleanup; not called automatically.

Source code in src/locus/observability/event_bus.py
async def drop_history(self, run_id: str) -> None:
    """Forget a run's history buffer + closed-state.

    Late subscribers after this call see nothing for ``run_id``.
    Useful for explicit cleanup; not called automatically.
    """
    async with self._lock:
        self._history.pop(run_id, None)
        self._closed_runs.discard(run_id)

close_global async

close_global() -> None

Terminate every global subscriber. Used at shutdown.

Source code in src/locus/observability/event_bus.py
async def close_global(self) -> None:
    """Terminate every global subscriber. Used at shutdown."""
    async with self._lock:
        queues = list(self._global_subscribers)
        self._global_subscribers.clear()
    for queue in queues:
        try:
            queue.put_nowait(None)
        except asyncio.QueueFull:
            self._dropped_events += 1

stats

stats() -> dict[str, Any]

Snapshot of subscriber counts + drop totals.

Cheap to call. Read-only — does not lock; treat as approximate when called concurrently with publish.

Source code in src/locus/observability/event_bus.py
def stats(self) -> dict[str, Any]:
    """Snapshot of subscriber counts + drop totals.

    Cheap to call. Read-only — does not lock; treat as approximate
    when called concurrently with publish.
    """
    return {
        "active_runs": len(self._queues),
        "global_subscribers": len(self._global_subscribers),
        "history_runs": len(self._history),
        "dropped_events_total": self._dropped_events,
        "queue_depths": {
            run_id: [q.qsize() for q in queues] for run_id, queues in self._queues.items()
        },
    }

StreamEvent dataclass

StreamEvent(run_id: str, event_type: str, data: dict[str, Any], timestamp: datetime = (lambda: datetime.now(UTC))(), event_id: str = (lambda: str(uuid4()))())

Single event flowing through the bus.

Fields are intentionally narrow — every consumer (CLI, SSE, JSON log) sees the same shape, and data is the open extension point.

to_dict

to_dict() -> dict[str, Any]

JSON-serialisable shape used by the SSE wire format.

Source code in src/locus/observability/event_bus.py
def to_dict(self) -> dict[str, Any]:
    """JSON-serialisable shape used by the SSE wire format."""
    return {
        "event_id": self.event_id,
        "run_id": self.run_id,
        "event_type": self.event_type,
        "timestamp": self.timestamp.isoformat(),
        "data": self.data,
    }

get_event_bus

get_event_bus() -> EventBus

Return the process-wide :class:EventBus, creating it lazily.

Source code in src/locus/observability/event_bus.py
def get_event_bus() -> EventBus:
    """Return the process-wide :class:`EventBus`, creating it lazily."""
    global _event_bus
    if _event_bus is None:
        _event_bus = EventBus()
    return _event_bus

reset_event_bus

reset_event_bus() -> None

Reset the singleton — tests only.

Calling this in production discards every active subscription silently. The function is here purely so each pytest test can start with a clean bus.

Source code in src/locus/observability/event_bus.py
def reset_event_bus() -> None:
    """Reset the singleton — tests only.

    Calling this in production discards every active subscription
    silently. The function is here purely so each pytest test can
    start with a clean bus.
    """
    global _event_bus
    _event_bus = None

Publishing

emit async

emit(event_type: str, /, **data: Any) -> None

Publish a :class:StreamEvent if a run_id is in the current context. No-op otherwise.

Use from any async def instrumentation site. Awaits the bus publish so events appear in the order they were emitted on the consumer side.

The bus singleton is imported lazily here so simply importing this module doesn't construct it.

Source code in src/locus/observability/emit.py
async def emit(event_type: str, /, **data: Any) -> None:
    """Publish a :class:`StreamEvent` if a run_id is in the current
    context. No-op otherwise.

    Use from any ``async def`` instrumentation site. Awaits the bus
    publish so events appear in the order they were emitted on the
    consumer side.

    The bus singleton is imported lazily here so simply importing
    this module doesn't construct it.
    """
    rid = current_run_id()
    if rid is None:
        return
    # Lazy import — keeps the bus singleton from being constructed
    # until the first real emission.
    from locus.observability.event_bus import StreamEvent, get_event_bus  # noqa: PLC0415

    bus = get_event_bus()
    # Cache the running loop on the bus so worker-thread ``emit_sync``
    # callers (sync ``@tool`` functions hopping through the executor)
    # can hop publishes back into the right event loop.
    if getattr(bus, "_owner_loop", None) is None:
        try:
            bus._owner_loop = asyncio.get_running_loop()  # noqa: SLF001 — bus owns this attr
        except RuntimeError:
            pass
    try:
        await bus.publish(
            StreamEvent(run_id=rid, event_type=event_type, data=data),
        )
    except Exception:  # noqa: BLE001 — telemetry must never break the SDK
        logger.debug("emit failed for %s", event_type, exc_info=True)

emit_sync

emit_sync(event_type: str, /, **data: Any) -> None

Sync-call equivalent of :func:emit.

Schedules a publish on the running event loop. Two cases handled:

  • Called from inside the loop's thread — uses loop.create_task and pins the task on _BACKGROUND_TASKS.
  • Called from a worker thread (asyncio.to_thread / the @tool decorator's executor) — uses run_coroutine_threadsafe to hop back to the bus's event loop. The contextvar copy that the decorator passes into the executor preserves run_id across the thread boundary.

If neither path resolves a loop, the event is dropped — telemetry must never spin up a fresh loop and fight asyncio singletons.

Source code in src/locus/observability/emit.py
def emit_sync(event_type: str, /, **data: Any) -> None:
    """Sync-call equivalent of :func:`emit`.

    Schedules a publish on the running event loop. Two cases handled:

    * Called from inside the loop's thread — uses ``loop.create_task``
      and pins the task on ``_BACKGROUND_TASKS``.
    * Called from a worker thread (``asyncio.to_thread`` / the @tool
      decorator's executor) — uses ``run_coroutine_threadsafe`` to
      hop back to the bus's event loop. The contextvar copy that the
      decorator passes into the executor preserves ``run_id`` across
      the thread boundary.

    If neither path resolves a loop, the event is dropped — telemetry
    must never spin up a fresh loop and fight asyncio singletons.
    """
    rid = current_run_id()
    if rid is None:
        return
    from locus.observability.event_bus import StreamEvent, get_event_bus  # noqa: PLC0415

    bus = get_event_bus()
    event = StreamEvent(run_id=rid, event_type=event_type, data=data)
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # No running loop in *this* thread. We may still be inside a
        # worker thread spawned by ``asyncio.to_thread`` /
        # ``loop.run_in_executor`` — in which case the run_context's
        # owner loop is reachable via the contextvar populated on
        # ``run_context`` entry (and copied into the worker thread by
        # the ``@tool`` decorator's ``ctx.run()``).
        owner_loop = current_owner_loop()
        if owner_loop is None or not owner_loop.is_running():
            return
        try:
            asyncio.run_coroutine_threadsafe(bus.publish(event), owner_loop)
        except Exception:  # noqa: BLE001 — telemetry must never break the SDK
            logger.debug("emit_sync threadsafe schedule failed", exc_info=True)
        return

    coro = bus.publish(event)
    task = loop.create_task(coro)
    _BACKGROUND_TASKS.add(task)
    task.add_done_callback(_BACKGROUND_TASKS.discard)

Run context

Scope events to a single cognitive dispatch via run_context() — a context manager that sets and unsets the run id for the current task / coroutine.

run_context async

run_context(run_id: str | None = None) -> AsyncIterator[str]

Bind a run_id to the current asyncio context.

Yields the bound id (generates a fresh uuid4 hex if run_id is None) and restores the previous contextvar value on exit.

Example::

from locus.observability import run_context, get_event_bus

async with run_context() as rid:
    # any emission inside this block tags events with `rid`
    await my_pipeline.run(task)
    # downstream subscribers can now `bus.subscribe(rid)`
Note

The context manager does not call :meth:EventBus.close_stream — callers decide when to close based on their own lifecycle (the router does it at end of dispatch; notebook subprocesses do it on exit).

Source code in src/locus/observability/context.py
@asynccontextmanager
async def run_context(run_id: str | None = None) -> AsyncIterator[str]:
    """Bind a ``run_id`` to the current asyncio context.

    Yields the bound id (generates a fresh uuid4 hex if ``run_id`` is
    ``None``) and restores the previous contextvar value on exit.

    Example::

        from locus.observability import run_context, get_event_bus

        async with run_context() as rid:
            # any emission inside this block tags events with `rid`
            await my_pipeline.run(task)
            # downstream subscribers can now `bus.subscribe(rid)`

    Note:
        The context manager does **not** call
        :meth:`EventBus.close_stream` — callers decide when to close
        based on their own lifecycle (the router does it at end of
        dispatch; notebook subprocesses do it on exit).
    """
    rid = run_id or uuid4().hex
    token = _run_id_var.set(rid)
    # Capture the running loop so worker-thread emit_sync calls can
    # hop publishes back via run_coroutine_threadsafe. Cheap — one
    # ``ContextVar.set`` per dispatch, even if no one ever subscribes.
    loop_token = None
    try:
        loop_token = _owner_loop_var.set(_asyncio.get_running_loop())
    except RuntimeError:
        # Caller is using ``run_context`` from sync code — no loop to
        # capture, but ``emit_sync`` will simply drop in that case.
        pass
    try:
        yield rid
    finally:
        if loop_token is not None:
            _owner_loop_var.reset(loop_token)
        _run_id_var.reset(token)

current_run_id

current_run_id() -> str | None

Return the run_id active on the current context, or None.

Every emission helper in :mod:locus.observability.emit calls this and bails when it returns None. SDK users who never enter a run context pay nothing.

Source code in src/locus/observability/context.py
def current_run_id() -> str | None:
    """Return the run_id active on the current context, or ``None``.

    Every emission helper in :mod:`locus.observability.emit` calls this
    and bails when it returns ``None``. SDK users who never enter a
    run context pay nothing.
    """
    return _run_id_var.get()

set_run_id

set_run_id(run_id: str | None) -> object

Set the run_id on the current context, returning the reset token.

Lower-level than :func:run_context — exposed so the workbench bootstrap can pin the contextvar from an env variable inside the notebook subprocess. Most code should use the context manager.

Source code in src/locus/observability/context.py
def set_run_id(run_id: str | None) -> object:
    """Set the run_id on the current context, returning the reset token.

    Lower-level than :func:`run_context` — exposed so the workbench
    bootstrap can pin the contextvar from an env variable inside the
    notebook subprocess. Most code should use the context manager.
    """
    return _run_id_var.set(run_id)

reset_run_id

reset_run_id(token: object) -> None

Restore the contextvar to whatever it was before :func:set_run_id. Mirrors the standard ContextVar.reset.

Source code in src/locus/observability/context.py
def reset_run_id(token: object) -> None:
    """Restore the contextvar to whatever it was before
    :func:`set_run_id`. Mirrors the standard ``ContextVar.reset``."""
    _run_id_var.reset(token)  # type: ignore[arg-type]

Hook bridge

EventBusHook mirrors the agent's LocusEvent stream onto the bus without changing existing hook code.

EventBusHook

EventBusHook(run_id: str)

Bases: HookProvider

Republishes every agent-lifecycle hook onto the event bus.

The hook never mutates events — it's pure-observation. Tools and model calls retain whatever cancellation / retry / replacement behaviour the surrounding code dictates.

Source code in src/locus/observability/bus_hook.py
def __init__(self, run_id: str) -> None:
    if not run_id:
        raise ValueError("EventBusHook requires a non-empty run_id")
    self._run_id = run_id
    self._bus = get_event_bus()

name property

name: str

Hook provider name for identification.

register_hooks

register_hooks() -> dict[str, bool]

Return which hooks this provider implements.

Returns:

Type Description
dict[str, bool]

Dictionary mapping hook names to whether they are implemented.

dict[str, bool]

Useful for optimization - registry can skip calling unimplemented hooks.

Source code in src/locus/hooks/provider.py
def register_hooks(self) -> dict[str, bool]:
    """Return which hooks this provider implements.

    Returns:
        Dictionary mapping hook names to whether they are implemented.
        Useful for optimization - registry can skip calling unimplemented hooks.
    """
    return {
        "on_before_invocation": True,
        "on_after_invocation": True,
        "on_before_tool_call": True,
        "on_after_tool_call": True,
        "on_iteration_start": True,
        "on_iteration_end": True,
        "on_before_model_call": True,
        "on_after_model_call": True,
    }