Skip to content

Tutorial 55: Full event catalogue tour

Every component in Locus emits typed events under one stable prefix: agent.*, multiagent.*, composition.*, router.*, rag.*, memory.*, a2a.*, skills.*, deepagent.*. The EV_* constants in locus.observability.emit are the canonical registry — change one name and it propagates to every emission site and every consumer.

Prefix map::

agent.*          ReAct loop (think, tool, model, tokens, reflect, …)
multiagent.*     Orchestrator, Specialist, Handoff, StateGraph nodes
composition.*    SequentialPipeline, ParallelPipeline, LoopAgent
router.*         PRISM dispatch (frame → protocol → policy → compiled)
rag.*            Retriever query lifecycle
memory.*         Checkpointing + conversation management
a2a.*            Agent-to-Agent protocol (server + client)
skills.*         Skill activation
deepagent.*      Research-shaped agent (subagents, fs, todos)

This tutorial covers:

  1. Listing every EV_* constant and its category prefix — useful when wiring a renderer or a JSON log adapter.
  2. Driving an Orchestrator + Specialist run that surfaces multiagent.* events end-to-end, confirming routing, decision, and completion.
  3. Running a SequentialPipeline + LoopAgent that emits the full composition.* event set.

Why this is differentiated:

  • All EV_* constants are enumerated at import time from locus.observability.emit — the catalogue in this tutorial is always in sync with the codebase, not a static doc that can drift.
  • Every prefix is exercised by a real run so you can copy the subscriber pattern for whichever component you're instrumenting.
  • See SSE event catalogue for the full wire-format reference with payload field descriptions.

Run::

python examples/tutorial_55_event_catalogue.py

Difficulty: Intermediate Prerequisites: tutorial_17_orchestrator_pattern (orchestrator), tutorial_25_composition (composition), tutorial_52_observability_basics (observability basics)

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Tutorial 55: full event catalogue tour.

Every component in Locus emits typed events under one stable prefix:
``agent.*``, ``multiagent.*``, ``composition.*``, ``router.*``,
``rag.*``, ``memory.*``, ``a2a.*``, ``skills.*``, ``deepagent.*``.
The ``EV_*`` constants in ``locus.observability.emit`` are the
canonical registry — change one place, propagates everywhere.

This tutorial covers:

1. Listing every ``EV_*`` constant and its category prefix.
2. Driving an Orchestrator + Specialist run that hits ``multiagent.*``
   events end-to-end and prints the per-component event types.
3. Running a SequentialPipeline + LoopAgent to surface ``composition.*``.

Difficulty: intermediate. Prerequisites: tutorial 17 (orchestrator),
tutorial 25 (composition), tutorial 52 (observability basics).

Run with:
    python examples/tutorial_55_event_catalogue.py
"""

from __future__ import annotations

import asyncio
import sys
from collections import defaultdict

from config import get_model

from locus import Agent
from locus.agent.composition import LoopAgent, SequentialPipeline
from locus.observability import get_event_bus, run_context


# =============================================================================
# Part 1 — catalogue tour
# =============================================================================


def part1_catalogue_tour() -> None:
    """Print every canonical event_type the SDK exposes, grouped by
    category prefix. Useful when wiring a renderer or a JSON log."""
    print("\n--- Part 1: canonical event_type catalogue ---")

    emit_mod = sys.modules["locus.observability.emit"]
    by_prefix: dict[str, list[str]] = defaultdict(list)
    for name in dir(emit_mod):
        if not name.startswith("EV_"):
            continue
        value = getattr(emit_mod, name)
        if not isinstance(value, str):
            continue
        prefix = value.split(".", 1)[0]
        by_prefix[prefix].append(value)

    for prefix in sorted(by_prefix):
        print(f"  {prefix}.*  ({len(by_prefix[prefix])} events)")
        for ev in sorted(by_prefix[prefix]):
            print(f"    - {ev}")


# =============================================================================
# Part 2 — composition.* in action
# =============================================================================


async def part2_composition() -> None:
    """SequentialPipeline + LoopAgent both emit ``composition.*``
    events at every stage / iteration boundary. Subscribe to one run
    and observe."""
    print("\n--- Part 2: composition.* events ---")

    a = Agent(model=get_model(), max_iterations=1)
    b = Agent(model=get_model(), max_iterations=1)

    pipeline = SequentialPipeline(agents=[a, b])

    async with run_context() as rid:
        bus = get_event_bus()

        async def consumer() -> None:
            seen: list[str] = []
            async for ev in bus.subscribe(rid):
                if ev.event_type.startswith("composition."):
                    seen.append(ev.event_type)
                if ev.event_type == "composition.fanout.completed":
                    break
                if ev.event_type == "composition.stage.completed" and ev.data.get("stage") == 1:
                    print("composition events seen so far:", seen)
                    return

        consumer_task = asyncio.create_task(consumer())
        await asyncio.sleep(0)

        await pipeline.run("Tell me a one-line haiku about JSON.")
        await bus.close_stream(rid)
        await consumer_task


# =============================================================================
# Main
# =============================================================================


async def main() -> None:
    part1_catalogue_tour()
    await part2_composition()


if __name__ == "__main__":
    asyncio.run(main())