Observability
Centralised in-process event bus + SSE telemetry. The EventBus is a
singleton that fans StreamEvent instances scoped to a run id out
to multiple consumers (Web SSE, CLI tail, JSON logs) from a single
emission point.
Publishers (router, agent loop hooks, custom user code) call emit()
or bus.publish(); subscribers consume via bus.subscribe(run_id).
The workbench's SSE endpoint at /api/events/{run_id} is the public
HTTP wrapper around EventBus.subscribe.
For the agent's typed LocusEvent stream (the thing
async for event in agent.run(...) yields) see Events;
this page covers the lower-level observability bus that
LocusEvents are mirrored onto for cross-process consumption.
Event bus
EventBus
EventBus(*, max_queue_size: int = _DEFAULT_QUEUE_SIZE, history_per_run: int = _DEFAULT_HISTORY_PER_RUN, max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED)
In-process pub/sub bus.
Thread-safe under asyncio (a single asyncio.Lock guards all
queue mutations). Not safe across processes — for multi-worker
deployments, swap the implementation for one backed by Redis /
NATS / similar.
Source code in src/locus/observability/event_bus.py
| def __init__(
self,
*,
max_queue_size: int = _DEFAULT_QUEUE_SIZE,
history_per_run: int = _DEFAULT_HISTORY_PER_RUN,
max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED,
) -> None:
self._max_queue_size = max_queue_size
self._history_per_run = history_per_run
self._max_runs_retained = max_runs_retained
self._lock = asyncio.Lock()
# Per-run subscribers: each subscriber owns one bounded queue.
# Sentinel ``None`` on the queue terminates the subscriber loop.
self._queues: dict[str, list[asyncio.Queue[StreamEvent | None]]] = defaultdict(list)
# Global subscribers fire on every event regardless of run_id.
# Capped to prevent unbounded growth in monitoring deployments.
self._global_subscribers: list[asyncio.Queue[StreamEvent | None]] = []
self._max_global_subscribers = 50
# History buffer for late-joining subscribers — they get the
# last N events for the run on connect, then live events.
self._history: dict[str, deque[StreamEvent]] = {}
# Runs that have already had ``close_stream`` called. A late
# subscriber for a closed run gets the history + immediate
# sentinel, so the iterator terminates cleanly instead of
# hanging on a queue no one publishes to.
self._closed_runs: set[str] = set()
# Drop counter for tests + diagnostics.
self._dropped_events = 0
# Optional reference to the asyncio loop the bus runs on. Set
# lazily by ``emit()`` / ``emit_sync()`` on first use so worker
# threads (the ``@tool`` decorator's executor, ``asyncio.to_thread``
# callers) can reach back to the right loop via
# ``run_coroutine_threadsafe``. Never set unless someone actually
# publishes — preserves the SDK-without-SSE invariant.
self._owner_loop: asyncio.AbstractEventLoop | None = None
|
publish
async
publish(event: StreamEvent) -> None
Deliver event to every subscriber on its run + global.
Non-blocking on slow consumers: each queue.put is wrapped in a
wait_for with :data:_PUBLISH_TIMEOUT_SECONDS. If a
subscriber's queue is saturated past the timeout, the event is
dropped for that subscriber (counted in
:attr:_dropped_events) and the publish continues for
everyone else.
Source code in src/locus/observability/event_bus.py
| async def publish(self, event: StreamEvent) -> None:
"""Deliver ``event`` to every subscriber on its run + global.
Non-blocking on slow consumers: each queue.put is wrapped in a
``wait_for`` with :data:`_PUBLISH_TIMEOUT_SECONDS`. If a
subscriber's queue is saturated past the timeout, the event is
dropped for that subscriber (counted in
:attr:`_dropped_events`) and the publish continues for
everyone else.
"""
if not event.run_id:
logger.error(
"EventBus: dropping event with empty run_id (event_type=%s)",
event.event_type,
)
return
async with self._lock:
history = self._history.setdefault(event.run_id, deque(maxlen=self._history_per_run))
history.append(event)
self._evict_old_runs_locked()
run_subscribers = list(self._queues.get(event.run_id, ()))
global_subscribers = list(self._global_subscribers)
for queue in run_subscribers:
await self._deliver(queue, event)
for queue in global_subscribers:
await self._deliver(queue, event)
|
subscribe
async
subscribe(run_id: str) -> AsyncIterator[StreamEvent]
Subscribe to a single run's events.
On connect, every event currently in history for the run is
replayed in order, then live events follow. The iterator
terminates when :meth:close_stream is called for the run.
Source code in src/locus/observability/event_bus.py
| async def subscribe(self, run_id: str) -> AsyncIterator[StreamEvent]:
"""Subscribe to a single run's events.
On connect, every event currently in history for the run is
replayed in order, then live events follow. The iterator
terminates when :meth:`close_stream` is called for the run.
"""
queue = await self._register(run_id)
try:
while True:
event = await queue.get()
if event is None:
return
yield event
finally:
await self._unregister(run_id, queue)
|
subscribe_global
async
subscribe_global() -> AsyncIterator[StreamEvent]
Subscribe to every event on the bus.
Useful for monitoring dashboards. Does not replay history —
only live events. Capped at :attr:_max_global_subscribers
concurrent subscribers; over-cap connections receive an
immediate sentinel.
Source code in src/locus/observability/event_bus.py
| async def subscribe_global(self) -> AsyncIterator[StreamEvent]:
"""Subscribe to every event on the bus.
Useful for monitoring dashboards. Does not replay history —
only live events. Capped at :attr:`_max_global_subscribers`
concurrent subscribers; over-cap connections receive an
immediate sentinel.
"""
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue(maxsize=self._max_queue_size)
async with self._lock:
if len(self._global_subscribers) >= self._max_global_subscribers:
logger.warning(
"EventBus: global subscriber cap reached (%d); rejecting new subscriber",
self._max_global_subscribers,
)
return
self._global_subscribers.append(queue)
try:
while True:
event = await queue.get()
if event is None:
return
yield event
finally:
async with self._lock:
try:
self._global_subscribers.remove(queue)
except ValueError:
pass
|
close_stream
async
close_stream(run_id: str) -> None
Signal end-of-stream to every active subscriber of run_id.
Idempotent. Keeps the history buffer — a late subscriber
joining within the retention window still sees the run's
events plus an immediate sentinel terminating the iterator.
Use :meth:drop_history to forcibly forget a run.
Source code in src/locus/observability/event_bus.py
| async def close_stream(self, run_id: str) -> None:
"""Signal end-of-stream to every active subscriber of ``run_id``.
Idempotent. **Keeps** the history buffer — a late subscriber
joining within the retention window still sees the run's
events plus an immediate sentinel terminating the iterator.
Use :meth:`drop_history` to forcibly forget a run.
"""
async with self._lock:
queues = list(self._queues.get(run_id, ()))
self._queues.pop(run_id, None)
self._closed_runs.add(run_id)
for queue in queues:
try:
queue.put_nowait(None)
except asyncio.QueueFull:
# Drop the oldest queued event to make room for the
# sentinel — close must always reach the consumer.
try:
queue.get_nowait()
queue.put_nowait(None)
except (asyncio.QueueEmpty, asyncio.QueueFull):
self._dropped_events += 1
|
drop_history
async
drop_history(run_id: str) -> None
Forget a run's history buffer + closed-state.
Late subscribers after this call see nothing for run_id.
Useful for explicit cleanup; not called automatically.
Source code in src/locus/observability/event_bus.py
| async def drop_history(self, run_id: str) -> None:
"""Forget a run's history buffer + closed-state.
Late subscribers after this call see nothing for ``run_id``.
Useful for explicit cleanup; not called automatically.
"""
async with self._lock:
self._history.pop(run_id, None)
self._closed_runs.discard(run_id)
|
close_global
async
Terminate every global subscriber. Used at shutdown.
Source code in src/locus/observability/event_bus.py
| async def close_global(self) -> None:
"""Terminate every global subscriber. Used at shutdown."""
async with self._lock:
queues = list(self._global_subscribers)
self._global_subscribers.clear()
for queue in queues:
try:
queue.put_nowait(None)
except asyncio.QueueFull:
self._dropped_events += 1
|
stats
stats() -> dict[str, Any]
Snapshot of subscriber counts + drop totals.
Cheap to call. Read-only — does not lock; treat as approximate
when called concurrently with publish.
Source code in src/locus/observability/event_bus.py
| def stats(self) -> dict[str, Any]:
"""Snapshot of subscriber counts + drop totals.
Cheap to call. Read-only — does not lock; treat as approximate
when called concurrently with publish.
"""
return {
"active_runs": len(self._queues),
"global_subscribers": len(self._global_subscribers),
"history_runs": len(self._history),
"dropped_events_total": self._dropped_events,
"queue_depths": {
run_id: [q.qsize() for q in queues] for run_id, queues in self._queues.items()
},
}
|
StreamEvent
dataclass
StreamEvent(run_id: str, event_type: str, data: dict[str, Any], timestamp: datetime = (lambda: datetime.now(UTC))(), event_id: str = (lambda: str(uuid4()))())
Single event flowing through the bus.
Fields are intentionally narrow — every consumer (CLI, SSE, JSON
log) sees the same shape, and data is the open extension
point.
to_dict
to_dict() -> dict[str, Any]
JSON-serialisable shape used by the SSE wire format.
Source code in src/locus/observability/event_bus.py
| def to_dict(self) -> dict[str, Any]:
"""JSON-serialisable shape used by the SSE wire format."""
return {
"event_id": self.event_id,
"run_id": self.run_id,
"event_type": self.event_type,
"timestamp": self.timestamp.isoformat(),
"data": self.data,
}
|
get_event_bus
get_event_bus() -> EventBus
Return the process-wide :class:EventBus, creating it lazily.
Source code in src/locus/observability/event_bus.py
| def get_event_bus() -> EventBus:
"""Return the process-wide :class:`EventBus`, creating it lazily."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
|
reset_event_bus
reset_event_bus() -> None
Reset the singleton — tests only.
Calling this in production discards every active subscription
silently. The function is here purely so each pytest test can
start with a clean bus.
Source code in src/locus/observability/event_bus.py
| def reset_event_bus() -> None:
"""Reset the singleton — tests only.
Calling this in production discards every active subscription
silently. The function is here purely so each pytest test can
start with a clean bus.
"""
global _event_bus
_event_bus = None
|
Publishing
emit
async
emit(event_type: str, /, **data: Any) -> None
Publish a :class:StreamEvent if a run_id is in the current
context. No-op otherwise.
Use from any async def instrumentation site. Awaits the bus
publish so events appear in the order they were emitted on the
consumer side.
The bus singleton is imported lazily here so simply importing
this module doesn't construct it.
Source code in src/locus/observability/emit.py
| async def emit(event_type: str, /, **data: Any) -> None:
"""Publish a :class:`StreamEvent` if a run_id is in the current
context. No-op otherwise.
Use from any ``async def`` instrumentation site. Awaits the bus
publish so events appear in the order they were emitted on the
consumer side.
The bus singleton is imported lazily here so simply importing
this module doesn't construct it.
"""
rid = current_run_id()
if rid is None:
return
# Lazy import — keeps the bus singleton from being constructed
# until the first real emission.
from locus.observability.event_bus import StreamEvent, get_event_bus # noqa: PLC0415
bus = get_event_bus()
# Cache the running loop on the bus so worker-thread ``emit_sync``
# callers (sync ``@tool`` functions hopping through the executor)
# can hop publishes back into the right event loop.
if getattr(bus, "_owner_loop", None) is None:
try:
bus._owner_loop = asyncio.get_running_loop() # noqa: SLF001 — bus owns this attr
except RuntimeError:
pass
try:
await bus.publish(
StreamEvent(run_id=rid, event_type=event_type, data=data),
)
except Exception: # noqa: BLE001 — telemetry must never break the SDK
logger.debug("emit failed for %s", event_type, exc_info=True)
|
emit_sync
emit_sync(event_type: str, /, **data: Any) -> None
Sync-call equivalent of :func:emit.
Schedules a publish on the running event loop. Two cases handled:
- Called from inside the loop's thread — uses
loop.create_task
and pins the task on _BACKGROUND_TASKS.
- Called from a worker thread (
asyncio.to_thread / the @tool
decorator's executor) — uses run_coroutine_threadsafe to
hop back to the bus's event loop. The contextvar copy that the
decorator passes into the executor preserves run_id across
the thread boundary.
If neither path resolves a loop, the event is dropped — telemetry
must never spin up a fresh loop and fight asyncio singletons.
Source code in src/locus/observability/emit.py
| def emit_sync(event_type: str, /, **data: Any) -> None:
"""Sync-call equivalent of :func:`emit`.
Schedules a publish on the running event loop. Two cases handled:
* Called from inside the loop's thread — uses ``loop.create_task``
and pins the task on ``_BACKGROUND_TASKS``.
* Called from a worker thread (``asyncio.to_thread`` / the @tool
decorator's executor) — uses ``run_coroutine_threadsafe`` to
hop back to the bus's event loop. The contextvar copy that the
decorator passes into the executor preserves ``run_id`` across
the thread boundary.
If neither path resolves a loop, the event is dropped — telemetry
must never spin up a fresh loop and fight asyncio singletons.
"""
rid = current_run_id()
if rid is None:
return
from locus.observability.event_bus import StreamEvent, get_event_bus # noqa: PLC0415
bus = get_event_bus()
event = StreamEvent(run_id=rid, event_type=event_type, data=data)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop in *this* thread. We may still be inside a
# worker thread spawned by ``asyncio.to_thread`` /
# ``loop.run_in_executor`` — in which case the run_context's
# owner loop is reachable via the contextvar populated on
# ``run_context`` entry (and copied into the worker thread by
# the ``@tool`` decorator's ``ctx.run()``).
owner_loop = current_owner_loop()
if owner_loop is None or not owner_loop.is_running():
return
try:
asyncio.run_coroutine_threadsafe(bus.publish(event), owner_loop)
except Exception: # noqa: BLE001 — telemetry must never break the SDK
logger.debug("emit_sync threadsafe schedule failed", exc_info=True)
return
coro = bus.publish(event)
task = loop.create_task(coro)
_BACKGROUND_TASKS.add(task)
task.add_done_callback(_BACKGROUND_TASKS.discard)
|
Run context
Scope events to a single cognitive dispatch via run_context() — a
context manager that sets and unsets the run id for the current task /
coroutine.
run_context
async
run_context(run_id: str | None = None) -> AsyncIterator[str]
Bind a run_id to the current asyncio context.
Yields the bound id (generates a fresh uuid4 hex if run_id is
None) and restores the previous contextvar value on exit.
Example::
from locus.observability import run_context, get_event_bus
async with run_context() as rid:
# any emission inside this block tags events with `rid`
await my_pipeline.run(task)
# downstream subscribers can now `bus.subscribe(rid)`
Note
The context manager does not call
:meth:EventBus.close_stream — callers decide when to close
based on their own lifecycle (the router does it at end of
dispatch; notebook subprocesses do it on exit).
Source code in src/locus/observability/context.py
| @asynccontextmanager
async def run_context(run_id: str | None = None) -> AsyncIterator[str]:
"""Bind a ``run_id`` to the current asyncio context.
Yields the bound id (generates a fresh uuid4 hex if ``run_id`` is
``None``) and restores the previous contextvar value on exit.
Example::
from locus.observability import run_context, get_event_bus
async with run_context() as rid:
# any emission inside this block tags events with `rid`
await my_pipeline.run(task)
# downstream subscribers can now `bus.subscribe(rid)`
Note:
The context manager does **not** call
:meth:`EventBus.close_stream` — callers decide when to close
based on their own lifecycle (the router does it at end of
dispatch; notebook subprocesses do it on exit).
"""
rid = run_id or uuid4().hex
token = _run_id_var.set(rid)
# Capture the running loop so worker-thread emit_sync calls can
# hop publishes back via run_coroutine_threadsafe. Cheap — one
# ``ContextVar.set`` per dispatch, even if no one ever subscribes.
loop_token = None
try:
loop_token = _owner_loop_var.set(_asyncio.get_running_loop())
except RuntimeError:
# Caller is using ``run_context`` from sync code — no loop to
# capture, but ``emit_sync`` will simply drop in that case.
pass
try:
yield rid
finally:
if loop_token is not None:
_owner_loop_var.reset(loop_token)
_run_id_var.reset(token)
|
current_run_id
current_run_id() -> str | None
Return the run_id active on the current context, or None.
Every emission helper in :mod:locus.observability.emit calls this
and bails when it returns None. SDK users who never enter a
run context pay nothing.
Source code in src/locus/observability/context.py
| def current_run_id() -> str | None:
"""Return the run_id active on the current context, or ``None``.
Every emission helper in :mod:`locus.observability.emit` calls this
and bails when it returns ``None``. SDK users who never enter a
run context pay nothing.
"""
return _run_id_var.get()
|
set_run_id
set_run_id(run_id: str | None) -> object
Set the run_id on the current context, returning the reset token.
Lower-level than :func:run_context — exposed so the workbench
bootstrap can pin the contextvar from an env variable inside the
notebook subprocess. Most code should use the context manager.
Source code in src/locus/observability/context.py
| def set_run_id(run_id: str | None) -> object:
"""Set the run_id on the current context, returning the reset token.
Lower-level than :func:`run_context` — exposed so the workbench
bootstrap can pin the contextvar from an env variable inside the
notebook subprocess. Most code should use the context manager.
"""
return _run_id_var.set(run_id)
|
reset_run_id
reset_run_id(token: object) -> None
Restore the contextvar to whatever it was before
:func:set_run_id. Mirrors the standard ContextVar.reset.
Source code in src/locus/observability/context.py
| def reset_run_id(token: object) -> None:
"""Restore the contextvar to whatever it was before
:func:`set_run_id`. Mirrors the standard ``ContextVar.reset``."""
_run_id_var.reset(token) # type: ignore[arg-type]
|
Hook bridge
EventBusHook mirrors the agent's LocusEvent stream onto the bus
without changing existing hook code.
EventBusHook
EventBusHook(run_id: str)
Bases: HookProvider
Republishes every agent-lifecycle hook onto the event bus.
The hook never mutates events — it's pure-observation. Tools and
model calls retain whatever cancellation / retry / replacement
behaviour the surrounding code dictates.
Source code in src/locus/observability/bus_hook.py
| def __init__(self, run_id: str) -> None:
if not run_id:
raise ValueError("EventBusHook requires a non-empty run_id")
self._run_id = run_id
self._bus = get_event_bus()
|
name
property
Hook provider name for identification.
register_hooks
register_hooks() -> dict[str, bool]
Return which hooks this provider implements.
Returns:
| Type |
Description |
dict[str, bool]
|
Dictionary mapping hook names to whether they are implemented.
|
dict[str, bool]
|
Useful for optimization - registry can skip calling unimplemented hooks.
|
Source code in src/locus/hooks/provider.py
| def register_hooks(self) -> dict[str, bool]:
"""Return which hooks this provider implements.
Returns:
Dictionary mapping hook names to whether they are implemented.
Useful for optimization - registry can skip calling unimplemented hooks.
"""
return {
"on_before_invocation": True,
"on_after_invocation": True,
"on_before_tool_call": True,
"on_after_tool_call": True,
"on_iteration_start": True,
"on_iteration_end": True,
"on_before_model_call": True,
"on_after_model_call": True,
}
|