Events
Agent event stream
Typed, frozen Pydantic events yielded by agent.run(...). Consume
with async for event in agent.run(...) or pipe into a hook.
LocusEvent
Bases: BaseModel
Base class for all Locus events.
ThinkEvent
Bases: LocusEvent
Agent produced reasoning and/or tool calls.
Bases: LocusEvent
Tool execution completed.
Whether the tool execution succeeded.
ReflectEvent
Bases: LocusEvent
Reflexion evaluation completed.
GroundingEvent
Bases: LocusEvent
Grounding evaluation completed.
TerminateEvent
Bases: LocusEvent
Agent execution terminated.
ModelChunkEvent
Bases: LocusEvent
Streaming chunk from model.
ModelCompleteEvent
Bases: LocusEvent
Model completion finished.
InterruptEvent
Bases: LocusEvent
Agent paused for user input.
When a tool calls interrupt() (e.g., ask_user), the agent yields this
event and pauses. The caller should present the question to the user
and call agent.resume(response) to continue.
In-process SSE bus
The EventBus publishes StreamEvents for every meaningful step from
every framework layer. Opt-in via run_context() — zero cost when
unused.
See Observability for usage patterns and
SSE event catalogue for the full wire-format
reference (40+ event_type strings across 9 prefixes).
EventBus
EventBus(*, max_queue_size: int = _DEFAULT_QUEUE_SIZE, history_per_run: int = _DEFAULT_HISTORY_PER_RUN, max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED)
In-process pub/sub bus.
Thread-safe under asyncio (a single asyncio.Lock guards all
queue mutations). Not safe across processes — for multi-worker
deployments, swap the implementation for one backed by Redis /
NATS / similar.
Source code in src/locus/observability/event_bus.py
| def __init__(
self,
*,
max_queue_size: int = _DEFAULT_QUEUE_SIZE,
history_per_run: int = _DEFAULT_HISTORY_PER_RUN,
max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED,
) -> None:
self._max_queue_size = max_queue_size
self._history_per_run = history_per_run
self._max_runs_retained = max_runs_retained
self._lock = asyncio.Lock()
# Per-run subscribers: each subscriber owns one bounded queue.
# Sentinel ``None`` on the queue terminates the subscriber loop.
self._queues: dict[str, list[asyncio.Queue[StreamEvent | None]]] = defaultdict(list)
# Global subscribers fire on every event regardless of run_id.
# Capped to prevent unbounded growth in monitoring deployments.
self._global_subscribers: list[asyncio.Queue[StreamEvent | None]] = []
self._max_global_subscribers = 50
# History buffer for late-joining subscribers — they get the
# last N events for the run on connect, then live events.
self._history: dict[str, deque[StreamEvent]] = {}
# Runs that have already had ``close_stream`` called. A late
# subscriber for a closed run gets the history + immediate
# sentinel, so the iterator terminates cleanly instead of
# hanging on a queue no one publishes to.
self._closed_runs: set[str] = set()
# Drop counter for tests + diagnostics.
self._dropped_events = 0
# Optional reference to the asyncio loop the bus runs on. Set
# lazily by ``emit()`` / ``emit_sync()`` on first use so worker
# threads (the ``@tool`` decorator's executor, ``asyncio.to_thread``
# callers) can reach back to the right loop via
# ``run_coroutine_threadsafe``. Never set unless someone actually
# publishes — preserves the SDK-without-SSE invariant.
self._owner_loop: asyncio.AbstractEventLoop | None = None
|
publish
async
publish(event: StreamEvent) -> None
Deliver event to every subscriber on its run + global.
Non-blocking on slow consumers: each queue.put is wrapped in a
wait_for with :data:_PUBLISH_TIMEOUT_SECONDS. If a
subscriber's queue is saturated past the timeout, the event is
dropped for that subscriber (counted in
:attr:_dropped_events) and the publish continues for
everyone else.
Source code in src/locus/observability/event_bus.py
| async def publish(self, event: StreamEvent) -> None:
"""Deliver ``event`` to every subscriber on its run + global.
Non-blocking on slow consumers: each queue.put is wrapped in a
``wait_for`` with :data:`_PUBLISH_TIMEOUT_SECONDS`. If a
subscriber's queue is saturated past the timeout, the event is
dropped for that subscriber (counted in
:attr:`_dropped_events`) and the publish continues for
everyone else.
"""
if not event.run_id:
logger.error(
"EventBus: dropping event with empty run_id (event_type=%s)",
event.event_type,
)
return
async with self._lock:
history = self._history.setdefault(event.run_id, deque(maxlen=self._history_per_run))
history.append(event)
self._evict_old_runs_locked()
run_subscribers = list(self._queues.get(event.run_id, ()))
global_subscribers = list(self._global_subscribers)
for queue in run_subscribers:
await self._deliver(queue, event)
for queue in global_subscribers:
await self._deliver(queue, event)
|
subscribe
async
subscribe(run_id: str) -> AsyncIterator[StreamEvent]
Subscribe to a single run's events.
On connect, every event currently in history for the run is
replayed in order, then live events follow. The iterator
terminates when :meth:close_stream is called for the run.
Source code in src/locus/observability/event_bus.py
| async def subscribe(self, run_id: str) -> AsyncIterator[StreamEvent]:
"""Subscribe to a single run's events.
On connect, every event currently in history for the run is
replayed in order, then live events follow. The iterator
terminates when :meth:`close_stream` is called for the run.
"""
queue = await self._register(run_id)
try:
while True:
event = await queue.get()
if event is None:
return
yield event
finally:
await self._unregister(run_id, queue)
|
subscribe_global
async
subscribe_global() -> AsyncIterator[StreamEvent]
Subscribe to every event on the bus.
Useful for monitoring dashboards. Does not replay history —
only live events. Capped at :attr:_max_global_subscribers
concurrent subscribers; over-cap connections receive an
immediate sentinel.
Source code in src/locus/observability/event_bus.py
| async def subscribe_global(self) -> AsyncIterator[StreamEvent]:
"""Subscribe to every event on the bus.
Useful for monitoring dashboards. Does not replay history —
only live events. Capped at :attr:`_max_global_subscribers`
concurrent subscribers; over-cap connections receive an
immediate sentinel.
"""
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue(maxsize=self._max_queue_size)
async with self._lock:
if len(self._global_subscribers) >= self._max_global_subscribers:
logger.warning(
"EventBus: global subscriber cap reached (%d); rejecting new subscriber",
self._max_global_subscribers,
)
return
self._global_subscribers.append(queue)
try:
while True:
event = await queue.get()
if event is None:
return
yield event
finally:
async with self._lock:
try:
self._global_subscribers.remove(queue)
except ValueError:
pass
|
close_stream
async
close_stream(run_id: str) -> None
Signal end-of-stream to every active subscriber of run_id.
Idempotent. Keeps the history buffer — a late subscriber
joining within the retention window still sees the run's
events plus an immediate sentinel terminating the iterator.
Use :meth:drop_history to forcibly forget a run.
Source code in src/locus/observability/event_bus.py
| async def close_stream(self, run_id: str) -> None:
"""Signal end-of-stream to every active subscriber of ``run_id``.
Idempotent. **Keeps** the history buffer — a late subscriber
joining within the retention window still sees the run's
events plus an immediate sentinel terminating the iterator.
Use :meth:`drop_history` to forcibly forget a run.
"""
async with self._lock:
queues = list(self._queues.get(run_id, ()))
self._queues.pop(run_id, None)
self._closed_runs.add(run_id)
for queue in queues:
try:
queue.put_nowait(None)
except asyncio.QueueFull:
# Drop the oldest queued event to make room for the
# sentinel — close must always reach the consumer.
try:
queue.get_nowait()
queue.put_nowait(None)
except (asyncio.QueueEmpty, asyncio.QueueFull):
self._dropped_events += 1
|
drop_history
async
drop_history(run_id: str) -> None
Forget a run's history buffer + closed-state.
Late subscribers after this call see nothing for run_id.
Useful for explicit cleanup; not called automatically.
Source code in src/locus/observability/event_bus.py
| async def drop_history(self, run_id: str) -> None:
"""Forget a run's history buffer + closed-state.
Late subscribers after this call see nothing for ``run_id``.
Useful for explicit cleanup; not called automatically.
"""
async with self._lock:
self._history.pop(run_id, None)
self._closed_runs.discard(run_id)
|
close_global
async
Terminate every global subscriber. Used at shutdown.
Source code in src/locus/observability/event_bus.py
| async def close_global(self) -> None:
"""Terminate every global subscriber. Used at shutdown."""
async with self._lock:
queues = list(self._global_subscribers)
self._global_subscribers.clear()
for queue in queues:
try:
queue.put_nowait(None)
except asyncio.QueueFull:
self._dropped_events += 1
|
stats
stats() -> dict[str, Any]
Snapshot of subscriber counts + drop totals.
Cheap to call. Read-only — does not lock; treat as approximate
when called concurrently with publish.
Source code in src/locus/observability/event_bus.py
| def stats(self) -> dict[str, Any]:
"""Snapshot of subscriber counts + drop totals.
Cheap to call. Read-only — does not lock; treat as approximate
when called concurrently with publish.
"""
return {
"active_runs": len(self._queues),
"global_subscribers": len(self._global_subscribers),
"history_runs": len(self._history),
"dropped_events_total": self._dropped_events,
"queue_depths": {
run_id: [q.qsize() for q in queues] for run_id, queues in self._queues.items()
},
}
|
StreamEvent
dataclass
StreamEvent(run_id: str, event_type: str, data: dict[str, Any], timestamp: datetime = (lambda: datetime.now(UTC))(), event_id: str = (lambda: str(uuid4()))())
Single event flowing through the bus.
Fields are intentionally narrow — every consumer (CLI, SSE, JSON
log) sees the same shape, and data is the open extension
point.
to_dict
to_dict() -> dict[str, Any]
JSON-serialisable shape used by the SSE wire format.
Source code in src/locus/observability/event_bus.py
| def to_dict(self) -> dict[str, Any]:
"""JSON-serialisable shape used by the SSE wire format."""
return {
"event_id": self.event_id,
"run_id": self.run_id,
"event_type": self.event_type,
"timestamp": self.timestamp.isoformat(),
"data": self.data,
}
|
run_context
async
run_context(run_id: str | None = None) -> AsyncIterator[str]
Bind a run_id to the current asyncio context.
Yields the bound id (generates a fresh uuid4 hex if run_id is
None) and restores the previous contextvar value on exit.
Example::
from locus.observability import run_context, get_event_bus
async with run_context() as rid:
# any emission inside this block tags events with `rid`
await my_pipeline.run(task)
# downstream subscribers can now `bus.subscribe(rid)`
Note
The context manager does not call
:meth:EventBus.close_stream — callers decide when to close
based on their own lifecycle (the router does it at end of
dispatch; tutorial subprocesses do it on exit).
Source code in src/locus/observability/context.py
| @asynccontextmanager
async def run_context(run_id: str | None = None) -> AsyncIterator[str]:
"""Bind a ``run_id`` to the current asyncio context.
Yields the bound id (generates a fresh uuid4 hex if ``run_id`` is
``None``) and restores the previous contextvar value on exit.
Example::
from locus.observability import run_context, get_event_bus
async with run_context() as rid:
# any emission inside this block tags events with `rid`
await my_pipeline.run(task)
# downstream subscribers can now `bus.subscribe(rid)`
Note:
The context manager does **not** call
:meth:`EventBus.close_stream` — callers decide when to close
based on their own lifecycle (the router does it at end of
dispatch; tutorial subprocesses do it on exit).
"""
rid = run_id or uuid4().hex
token = _run_id_var.set(rid)
# Capture the running loop so worker-thread emit_sync calls can
# hop publishes back via run_coroutine_threadsafe. Cheap — one
# ``ContextVar.set`` per dispatch, even if no one ever subscribes.
loop_token = None
try:
loop_token = _owner_loop_var.set(_asyncio.get_running_loop())
except RuntimeError:
# Caller is using ``run_context`` from sync code — no loop to
# capture, but ``emit_sync`` will simply drop in that case.
pass
try:
yield rid
finally:
if loop_token is not None:
_owner_loop_var.reset(loop_token)
_run_id_var.reset(token)
|
current_run_id
current_run_id() -> str | None
Return the run_id active on the current context, or None.
Every emission helper in :mod:locus.observability.emit calls this
and bails when it returns None. SDK users who never enter a
run context pay nothing.
Source code in src/locus/observability/context.py
| def current_run_id() -> str | None:
"""Return the run_id active on the current context, or ``None``.
Every emission helper in :mod:`locus.observability.emit` calls this
and bails when it returns ``None``. SDK users who never enter a
run context pay nothing.
"""
return _run_id_var.get()
|
EventBusHook
EventBusHook(run_id: str)
Bases: HookProvider
Republishes every agent-lifecycle hook onto the event bus.
The hook never mutates events — it's pure-observation. Tools and
model calls retain whatever cancellation / retry / replacement
behaviour the surrounding code dictates.
Source code in src/locus/observability/bus_hook.py
| def __init__(self, run_id: str) -> None:
if not run_id:
raise ValueError("EventBusHook requires a non-empty run_id")
self._run_id = run_id
self._bus = get_event_bus()
|
name
property
Hook provider name for identification.
register_hooks
register_hooks() -> dict[str, bool]
Return which hooks this provider implements.
Returns:
| Type |
Description |
dict[str, bool]
|
Dictionary mapping hook names to whether they are implemented.
|
dict[str, bool]
|
Useful for optimization - registry can skip calling unimplemented hooks.
|
Source code in src/locus/hooks/provider.py
| def register_hooks(self) -> dict[str, bool]:
"""Return which hooks this provider implements.
Returns:
Dictionary mapping hook names to whether they are implemented.
Useful for optimization - registry can skip calling unimplemented hooks.
"""
return {
"on_before_invocation": True,
"on_after_invocation": True,
"on_before_tool_call": True,
"on_after_tool_call": True,
"on_iteration_start": True,
"on_iteration_end": True,
"on_before_model_call": True,
"on_after_model_call": True,
}
|