Skip to content

EventBus Subscribers

The bus has three subscribe shapes, each suited to a different consumer:

  • bus.subscribe(run_id) — events for one dispatch, with history replay on connect then live events, terminated by a sentinel on stream close.
  • bus.subscribe_global() — every event from every run, no history replay. Good fit for a monitoring dashboard that spans concurrent runs.
  • bus._history.get(run_id, ()) — direct read of the per-run history deque (test helper; capped at 500 events × 200 runs LRU).

Capacity model::

publisher
   │
   ├──► subscriber queue A  (max_queue_size=1024)
   │       └─ slow? ─► drop event, increment bus._dropped_events
   │
   ├──► subscriber queue B
   │
   └──► global subscriber queues (capped count)
  • Per-run subscriber alongside a global subscriber on two concurrent dispatches.
  • bus.stats() snapshot — queue sizes, history depth, drop counter, retained-run count.

Run it (OCI Generative AI is the default; auto-detected from ~/.oci/config):

python examples/notebook_61_eventbus_subscribers.py

Offline:

LOCUS_MODEL_PROVIDER=mock python examples/notebook_61_eventbus_subscribers.py

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Notebook 56: EventBus subscriber patterns.

The bus has three subscribe shapes, each suited to a different consumer:

- bus.subscribe(run_id): events for one dispatch, with history replay
  on connect then live events, terminated by a sentinel on stream close.
- bus.subscribe_global(): every event from every run, no history
  replay. Good fit for a monitoring dashboard that spans concurrent runs.
- bus._history.get(run_id, ()): direct read of the per-run history
  deque (test helper; capped at 500 events x 200 runs LRU).

- Per-run subscriber alongside a global subscriber on two concurrent
  dispatches.
- bus.stats() snapshot — queue sizes, history depth, drop counter,
  retained-run count.

Run it
    # Default: OCI Generative AI auto-detected from ~/.oci/config
    python examples/notebook_61_eventbus_subscribers.py

    # Offline / no credentials:
    LOCUS_MODEL_PROVIDER=mock python examples/notebook_61_eventbus_subscribers.py
"""

from __future__ import annotations

import asyncio

from config import get_model

from locus.agent import Agent
from locus.observability import get_event_bus, run_context


# Part 1: a global subscriber sees both dispatches; a per-run
# subscriber sees only its own.


async def part1_global_vs_per_run() -> None:
    print("\n--- Part 1: global vs per-run subscribers ---")

    bus = get_event_bus()
    global_kinds: list[str] = []
    run_a_kinds: list[str] = []

    async def global_sub() -> None:
        async for ev in bus.subscribe_global():
            global_kinds.append(f"{ev.run_id[:6]}/{ev.event_type}")
            if ev.event_type == "agent.terminate" and len(global_kinds) >= 4:
                return

    async def run_a_sub(rid: str) -> None:
        async for ev in bus.subscribe(rid):
            run_a_kinds.append(ev.event_type)
            if ev.event_type == "agent.terminate":
                return

    async def dispatch(rid: str, prompt: str) -> None:
        async with run_context(rid):
            agent = Agent(model=get_model(), max_iterations=2)
            await asyncio.to_thread(agent.run_sync, prompt)
            await bus.close_stream(rid)

    g_task = asyncio.create_task(global_sub())
    a_task = asyncio.create_task(run_a_sub("run-A"))
    await asyncio.sleep(0)

    await asyncio.gather(
        dispatch("run-A", "Reply: hi from A"),
        dispatch("run-B", "Reply: hi from B"),
    )
    # Both subscribers exit on their close-stream sentinels.
    await asyncio.wait_for(asyncio.gather(g_task, a_task), timeout=15.0)

    print(f"global saw {len(global_kinds)} events across both runs:")
    for k in global_kinds[:6]:
        print(f"  - {k}")
    if len(global_kinds) > 6:
        print(f"  ... +{len(global_kinds) - 6} more")

    print(f"run-A subscriber saw {len(run_a_kinds)} events (only its own run):")
    for k in run_a_kinds[:6]:
        print(f"  - {k}")


# Part 2: bus.stats() snapshot. Pipe these into a monitoring dashboard.


async def part2_stats() -> None:
    print("\n--- Part 2: bus.stats() ---")

    bus = get_event_bus()
    snapshot = bus.stats()
    for k, v in snapshot.items():
        print(f"  {k}: {v}")


async def main() -> None:
    await part1_global_vs_per_run()
    await part2_stats()


if __name__ == "__main__":
    asyncio.run(main())