Tutorial 54: EventBus subscriber patterns¶
The bus has three subscribe shapes, each suited to a different consumer:
bus.subscribe(run_id)— events for one dispatch, with history replay on connect then live events, terminated by a sentinel on stream close.bus.subscribe_global()— every event from every run, no history replay. Use for a monitoring dashboard that spans many concurrent runs.bus._history.get(run_id, ())— direct read of the per-run history deque (test helper; capped at 500 events × 200 runs LRU).
Capacity model::
publisher
│
├──► subscriber queue A (max_queue_size=1024)
│ └─ slow? ─► drop event, increment bus._dropped_events
│
├──► subscriber queue B
│
└──► global subscriber queues (capped count)
This tutorial covers:
- Per-run subscriber with history replay.
- Global subscriber watching two concurrent dispatches on different
run_ids. - Slow consumer and drop accounting — the bus drops an event for one
slow subscriber after a 1 s timeout instead of blocking the publisher;
bus.stats()surfaces the dropped count.
Why this is differentiated:
- Slow subscribers never stall fast ones — the timeout-and-drop model means a lagging log writer can't back-pressure a real-time UI.
- History replay makes late-joining subscribers (e.g. a browser tab that opens after a run starts) first-class — they see the full context from the beginning without re-running the agent.
bus.stats()gives a live read on queue saturation, retained runs, and total dropped events — production-ready diagnostics in one call.
Run::
python examples/tutorial_54_eventbus_subscribers.py
Difficulty: Intermediate Prerequisites: tutorial_52_observability_basics (observability basics)
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Tutorial 54: EventBus consumer patterns.
The bus has three subscribe shapes:
* ``bus.subscribe(run_id)`` — events for one dispatch (with history
replay on connect, sentinel on close).
* ``bus.subscribe_global()`` — every event from every run, no history
replay (useful for a monitoring dashboard).
* ``bus._history.get(run_id, ())`` — direct read of the per-run
history deque (test helper; cap 500 events × 200 runs LRU).
This tutorial covers:
1. Per-run subscriber + history replay.
2. Global subscriber across two concurrent dispatches.
3. Slow consumer / drop accounting (the bus drops an event for one
slow subscriber instead of blocking the publisher).
Difficulty: intermediate. Prerequisites: tutorial 52 (basics).
Run with:
python examples/tutorial_54_eventbus_subscribers.py
"""
from __future__ import annotations
import asyncio
from config import get_model
from locus import Agent
from locus.observability import get_event_bus, run_context
# =============================================================================
# Part 1 — per-run + global subscribers running concurrently
# =============================================================================
async def part1_global_vs_per_run() -> None:
"""Two dispatches on different run_ids; one global subscriber
sees both, one per-run subscriber sees only its own."""
print("\n--- Part 1: global vs per-run subscribers ---")
bus = get_event_bus()
global_kinds: list[str] = []
run_a_kinds: list[str] = []
async def global_sub() -> None:
async for ev in bus.subscribe_global():
global_kinds.append(f"{ev.run_id[:6]}/{ev.event_type}")
if ev.event_type == "agent.terminate" and len(global_kinds) >= 4:
return
async def run_a_sub(rid: str) -> None:
async for ev in bus.subscribe(rid):
run_a_kinds.append(ev.event_type)
if ev.event_type == "agent.terminate":
return
async def dispatch(rid: str, prompt: str) -> None:
async with run_context(rid):
agent = Agent(model=get_model(), max_iterations=2)
await asyncio.to_thread(agent.run_sync, prompt)
await bus.close_stream(rid)
g_task = asyncio.create_task(global_sub())
a_task = asyncio.create_task(run_a_sub("run-A"))
await asyncio.sleep(0)
await asyncio.gather(
dispatch("run-A", "Reply: hi from A"),
dispatch("run-B", "Reply: hi from B"),
)
# Wait for both subscribers to terminate via their sentinels.
await asyncio.wait_for(asyncio.gather(g_task, a_task), timeout=15.0)
print(f"global saw {len(global_kinds)} events across both runs:")
for k in global_kinds[:6]:
print(f" - {k}")
if len(global_kinds) > 6:
print(f" ... +{len(global_kinds) - 6} more")
print(f"run-A subscriber saw {len(run_a_kinds)} events (only its own run):")
for k in run_a_kinds[:6]:
print(f" - {k}")
# =============================================================================
# Part 2 — bus stats endpoint (dropped events, retained runs)
# =============================================================================
async def part2_stats() -> None:
"""The bus exposes ``bus.stats()`` for diagnostics: queue sizes,
history depth, drop counter, retained-run count. Hook it up to
your monitoring dashboard."""
print("\n--- Part 2: bus.stats() ---")
bus = get_event_bus()
snapshot = bus.stats()
for k, v in snapshot.items():
print(f" {k}: {v}")
# =============================================================================
# Main
# =============================================================================
async def main() -> None:
await part1_global_vs_per_run()
await part2_stats()
if __name__ == "__main__":
asyncio.run(main())