Skip to content

Events

Agent event stream

Typed, frozen Pydantic events yielded by agent.run(...). Consume with async for event in agent.run(...) or pipe into a hook.

LocusEvent

Bases: BaseModel

Base class for all Locus events.

ThinkEvent

Bases: LocusEvent

Agent produced reasoning and/or tool calls.

ToolStartEvent

Bases: LocusEvent

Tool execution started.

ToolCompleteEvent

Bases: LocusEvent

Tool execution completed.

success property

success: bool

Whether the tool execution succeeded.

ReflectEvent

Bases: LocusEvent

Reflexion evaluation completed.

GroundingEvent

Bases: LocusEvent

Grounding evaluation completed.

TerminateEvent

Bases: LocusEvent

Agent execution terminated.

ModelChunkEvent

Bases: LocusEvent

Streaming chunk from model.

ModelCompleteEvent

Bases: LocusEvent

Model completion finished.

InterruptEvent

Bases: LocusEvent

Agent paused for user input.

When a tool calls interrupt() (e.g., ask_user), the agent yields this event and pauses. The caller should present the question to the user and call agent.resume(response) to continue.

In-process SSE bus

The EventBus publishes StreamEvents for every meaningful step from every framework layer. Opt-in via run_context() — zero cost when unused.

See Observability for usage patterns and SSE event catalogue for the full wire-format reference (40+ event_type strings across 9 prefixes).

EventBus

EventBus(*, max_queue_size: int = _DEFAULT_QUEUE_SIZE, history_per_run: int = _DEFAULT_HISTORY_PER_RUN, max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED)

In-process pub/sub bus.

Thread-safe under asyncio (a single asyncio.Lock guards all queue mutations). Not safe across processes — for multi-worker deployments, swap the implementation for one backed by Redis / NATS / similar.

Source code in src/locus/observability/event_bus.py
def __init__(
    self,
    *,
    max_queue_size: int = _DEFAULT_QUEUE_SIZE,
    history_per_run: int = _DEFAULT_HISTORY_PER_RUN,
    max_runs_retained: int = _DEFAULT_MAX_RUNS_RETAINED,
) -> None:
    self._max_queue_size = max_queue_size
    self._history_per_run = history_per_run
    self._max_runs_retained = max_runs_retained

    self._lock = asyncio.Lock()

    # Per-run subscribers: each subscriber owns one bounded queue.
    # Sentinel ``None`` on the queue terminates the subscriber loop.
    self._queues: dict[str, list[asyncio.Queue[StreamEvent | None]]] = defaultdict(list)

    # Global subscribers fire on every event regardless of run_id.
    # Capped to prevent unbounded growth in monitoring deployments.
    self._global_subscribers: list[asyncio.Queue[StreamEvent | None]] = []
    self._max_global_subscribers = 50

    # History buffer for late-joining subscribers — they get the
    # last N events for the run on connect, then live events.
    self._history: dict[str, deque[StreamEvent]] = {}

    # Runs that have already had ``close_stream`` called. A late
    # subscriber for a closed run gets the history + immediate
    # sentinel, so the iterator terminates cleanly instead of
    # hanging on a queue no one publishes to.
    self._closed_runs: set[str] = set()

    # Drop counter for tests + diagnostics.
    self._dropped_events = 0

    # Optional reference to the asyncio loop the bus runs on. Set
    # lazily by ``emit()`` / ``emit_sync()`` on first use so worker
    # threads (the ``@tool`` decorator's executor, ``asyncio.to_thread``
    # callers) can reach back to the right loop via
    # ``run_coroutine_threadsafe``. Never set unless someone actually
    # publishes — preserves the SDK-without-SSE invariant.
    self._owner_loop: asyncio.AbstractEventLoop | None = None

publish async

publish(event: StreamEvent) -> None

Deliver event to every subscriber on its run + global.

Non-blocking on slow consumers: each queue.put is wrapped in a wait_for with :data:_PUBLISH_TIMEOUT_SECONDS. If a subscriber's queue is saturated past the timeout, the event is dropped for that subscriber (counted in :attr:_dropped_events) and the publish continues for everyone else.

Source code in src/locus/observability/event_bus.py
async def publish(self, event: StreamEvent) -> None:
    """Deliver ``event`` to every subscriber on its run + global.

    Non-blocking on slow consumers: each queue.put is wrapped in a
    ``wait_for`` with :data:`_PUBLISH_TIMEOUT_SECONDS`. If a
    subscriber's queue is saturated past the timeout, the event is
    dropped for that subscriber (counted in
    :attr:`_dropped_events`) and the publish continues for
    everyone else.
    """
    if not event.run_id:
        logger.error(
            "EventBus: dropping event with empty run_id (event_type=%s)",
            event.event_type,
        )
        return

    async with self._lock:
        history = self._history.setdefault(event.run_id, deque(maxlen=self._history_per_run))
        history.append(event)
        self._evict_old_runs_locked()

        run_subscribers = list(self._queues.get(event.run_id, ()))
        global_subscribers = list(self._global_subscribers)

    for queue in run_subscribers:
        await self._deliver(queue, event)
    for queue in global_subscribers:
        await self._deliver(queue, event)

subscribe async

subscribe(run_id: str) -> AsyncIterator[StreamEvent]

Subscribe to a single run's events.

On connect, every event currently in history for the run is replayed in order, then live events follow. The iterator terminates when :meth:close_stream is called for the run.

Source code in src/locus/observability/event_bus.py
async def subscribe(self, run_id: str) -> AsyncIterator[StreamEvent]:
    """Subscribe to a single run's events.

    On connect, every event currently in history for the run is
    replayed in order, then live events follow. The iterator
    terminates when :meth:`close_stream` is called for the run.
    """
    queue = await self._register(run_id)
    try:
        while True:
            event = await queue.get()
            if event is None:
                return
            yield event
    finally:
        await self._unregister(run_id, queue)

subscribe_global async

subscribe_global() -> AsyncIterator[StreamEvent]

Subscribe to every event on the bus.

Useful for monitoring dashboards. Does not replay history — only live events. Capped at :attr:_max_global_subscribers concurrent subscribers; over-cap connections receive an immediate sentinel.

Source code in src/locus/observability/event_bus.py
async def subscribe_global(self) -> AsyncIterator[StreamEvent]:
    """Subscribe to every event on the bus.

    Useful for monitoring dashboards. Does not replay history —
    only live events. Capped at :attr:`_max_global_subscribers`
    concurrent subscribers; over-cap connections receive an
    immediate sentinel.
    """
    queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue(maxsize=self._max_queue_size)

    async with self._lock:
        if len(self._global_subscribers) >= self._max_global_subscribers:
            logger.warning(
                "EventBus: global subscriber cap reached (%d); rejecting new subscriber",
                self._max_global_subscribers,
            )
            return
        self._global_subscribers.append(queue)

    try:
        while True:
            event = await queue.get()
            if event is None:
                return
            yield event
    finally:
        async with self._lock:
            try:
                self._global_subscribers.remove(queue)
            except ValueError:
                pass

close_stream async

close_stream(run_id: str) -> None

Signal end-of-stream to every active subscriber of run_id.

Idempotent. Keeps the history buffer — a late subscriber joining within the retention window still sees the run's events plus an immediate sentinel terminating the iterator. Use :meth:drop_history to forcibly forget a run.

Source code in src/locus/observability/event_bus.py
async def close_stream(self, run_id: str) -> None:
    """Signal end-of-stream to every active subscriber of ``run_id``.

    Idempotent. **Keeps** the history buffer — a late subscriber
    joining within the retention window still sees the run's
    events plus an immediate sentinel terminating the iterator.
    Use :meth:`drop_history` to forcibly forget a run.
    """
    async with self._lock:
        queues = list(self._queues.get(run_id, ()))
        self._queues.pop(run_id, None)
        self._closed_runs.add(run_id)

    for queue in queues:
        try:
            queue.put_nowait(None)
        except asyncio.QueueFull:
            # Drop the oldest queued event to make room for the
            # sentinel — close must always reach the consumer.
            try:
                queue.get_nowait()
                queue.put_nowait(None)
            except (asyncio.QueueEmpty, asyncio.QueueFull):
                self._dropped_events += 1

drop_history async

drop_history(run_id: str) -> None

Forget a run's history buffer + closed-state.

Late subscribers after this call see nothing for run_id. Useful for explicit cleanup; not called automatically.

Source code in src/locus/observability/event_bus.py
async def drop_history(self, run_id: str) -> None:
    """Forget a run's history buffer + closed-state.

    Late subscribers after this call see nothing for ``run_id``.
    Useful for explicit cleanup; not called automatically.
    """
    async with self._lock:
        self._history.pop(run_id, None)
        self._closed_runs.discard(run_id)

close_global async

close_global() -> None

Terminate every global subscriber. Used at shutdown.

Source code in src/locus/observability/event_bus.py
async def close_global(self) -> None:
    """Terminate every global subscriber. Used at shutdown."""
    async with self._lock:
        queues = list(self._global_subscribers)
        self._global_subscribers.clear()
    for queue in queues:
        try:
            queue.put_nowait(None)
        except asyncio.QueueFull:
            self._dropped_events += 1

stats

stats() -> dict[str, Any]

Snapshot of subscriber counts + drop totals.

Cheap to call. Read-only — does not lock; treat as approximate when called concurrently with publish.

Source code in src/locus/observability/event_bus.py
def stats(self) -> dict[str, Any]:
    """Snapshot of subscriber counts + drop totals.

    Cheap to call. Read-only — does not lock; treat as approximate
    when called concurrently with publish.
    """
    return {
        "active_runs": len(self._queues),
        "global_subscribers": len(self._global_subscribers),
        "history_runs": len(self._history),
        "dropped_events_total": self._dropped_events,
        "queue_depths": {
            run_id: [q.qsize() for q in queues] for run_id, queues in self._queues.items()
        },
    }

StreamEvent dataclass

StreamEvent(run_id: str, event_type: str, data: dict[str, Any], timestamp: datetime = (lambda: datetime.now(UTC))(), event_id: str = (lambda: str(uuid4()))())

Single event flowing through the bus.

Fields are intentionally narrow — every consumer (CLI, SSE, JSON log) sees the same shape, and data is the open extension point.

to_dict

to_dict() -> dict[str, Any]

JSON-serialisable shape used by the SSE wire format.

Source code in src/locus/observability/event_bus.py
def to_dict(self) -> dict[str, Any]:
    """JSON-serialisable shape used by the SSE wire format."""
    return {
        "event_id": self.event_id,
        "run_id": self.run_id,
        "event_type": self.event_type,
        "timestamp": self.timestamp.isoformat(),
        "data": self.data,
    }

run_context async

run_context(run_id: str | None = None) -> AsyncIterator[str]

Bind a run_id to the current asyncio context.

Yields the bound id (generates a fresh uuid4 hex if run_id is None) and restores the previous contextvar value on exit.

Example::

from locus.observability import run_context, get_event_bus

async with run_context() as rid:
    # any emission inside this block tags events with `rid`
    await my_pipeline.run(task)
    # downstream subscribers can now `bus.subscribe(rid)`
Note

The context manager does not call :meth:EventBus.close_stream — callers decide when to close based on their own lifecycle (the router does it at end of dispatch; tutorial subprocesses do it on exit).

Source code in src/locus/observability/context.py
@asynccontextmanager
async def run_context(run_id: str | None = None) -> AsyncIterator[str]:
    """Bind a ``run_id`` to the current asyncio context.

    Yields the bound id (generates a fresh uuid4 hex if ``run_id`` is
    ``None``) and restores the previous contextvar value on exit.

    Example::

        from locus.observability import run_context, get_event_bus

        async with run_context() as rid:
            # any emission inside this block tags events with `rid`
            await my_pipeline.run(task)
            # downstream subscribers can now `bus.subscribe(rid)`

    Note:
        The context manager does **not** call
        :meth:`EventBus.close_stream` — callers decide when to close
        based on their own lifecycle (the router does it at end of
        dispatch; tutorial subprocesses do it on exit).
    """
    rid = run_id or uuid4().hex
    token = _run_id_var.set(rid)
    # Capture the running loop so worker-thread emit_sync calls can
    # hop publishes back via run_coroutine_threadsafe. Cheap — one
    # ``ContextVar.set`` per dispatch, even if no one ever subscribes.
    loop_token = None
    try:
        loop_token = _owner_loop_var.set(_asyncio.get_running_loop())
    except RuntimeError:
        # Caller is using ``run_context`` from sync code — no loop to
        # capture, but ``emit_sync`` will simply drop in that case.
        pass
    try:
        yield rid
    finally:
        if loop_token is not None:
            _owner_loop_var.reset(loop_token)
        _run_id_var.reset(token)

current_run_id

current_run_id() -> str | None

Return the run_id active on the current context, or None.

Every emission helper in :mod:locus.observability.emit calls this and bails when it returns None. SDK users who never enter a run context pay nothing.

Source code in src/locus/observability/context.py
def current_run_id() -> str | None:
    """Return the run_id active on the current context, or ``None``.

    Every emission helper in :mod:`locus.observability.emit` calls this
    and bails when it returns ``None``. SDK users who never enter a
    run context pay nothing.
    """
    return _run_id_var.get()

EventBusHook

EventBusHook(run_id: str)

Bases: HookProvider

Republishes every agent-lifecycle hook onto the event bus.

The hook never mutates events — it's pure-observation. Tools and model calls retain whatever cancellation / retry / replacement behaviour the surrounding code dictates.

Source code in src/locus/observability/bus_hook.py
def __init__(self, run_id: str) -> None:
    if not run_id:
        raise ValueError("EventBusHook requires a non-empty run_id")
    self._run_id = run_id
    self._bus = get_event_bus()

name property

name: str

Hook provider name for identification.

register_hooks

register_hooks() -> dict[str, bool]

Return which hooks this provider implements.

Returns:

Type Description
dict[str, bool]

Dictionary mapping hook names to whether they are implemented.

dict[str, bool]

Useful for optimization - registry can skip calling unimplemented hooks.

Source code in src/locus/hooks/provider.py
def register_hooks(self) -> dict[str, bool]:
    """Return which hooks this provider implements.

    Returns:
        Dictionary mapping hook names to whether they are implemented.
        Useful for optimization - registry can skip calling unimplemented hooks.
    """
    return {
        "on_before_invocation": True,
        "on_after_invocation": True,
        "on_before_tool_call": True,
        "on_after_tool_call": True,
        "on_iteration_start": True,
        "on_iteration_end": True,
        "on_before_model_call": True,
        "on_after_model_call": True,
    }