Skip to content

Event Catalogue

Every component in Locus emits typed events under one stable prefix: agent.*, multiagent.*, composition.*, router.*, rag.*, memory.*, a2a.*, skills.*, deepagent.*. The EV_* constants in locus.observability.emit are the canonical registry — change one name and it propagates to every emission site and every consumer.

Prefix map::

agent.*          ReAct loop (think, tool, model, tokens, reflect, …)
multiagent.*     Orchestrator, Specialist, Handoff, StateGraph nodes
composition.*    SequentialPipeline, ParallelPipeline, LoopAgent
router.*         PRISM dispatch (frame → protocol → policy → compiled)
rag.*            Retriever query lifecycle
memory.*         Checkpointing + conversation management
a2a.*            Agent-to-Agent protocol (server + client)
skills.*         Skill activation
deepagent.*      Research-shaped agent (subagents, fs, todos)
  • List every EV_* constant and its category prefix (always in sync with the codebase because it's read at import time).
  • Drive a SequentialPipeline + LoopAgent that surfaces composition.* events end-to-end.

Run it (OCI Generative AI is the default; auto-detected from ~/.oci/config):

python examples/notebook_62_event_catalogue.py

Offline:

LOCUS_MODEL_PROVIDER=mock python examples/notebook_62_event_catalogue.py

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Notebook 57: Full event catalogue tour.

Every component in Locus emits typed events under one stable prefix:
agent.*, multiagent.*, composition.*, router.*, rag.*, memory.*, a2a.*,
skills.*, deepagent.*. The EV_* constants in locus.observability.emit
are the canonical registry — change one place, propagates everywhere.

- List every EV_* constant and its category prefix (always in sync with
  the codebase because it's read at import time).
- Drive a SequentialPipeline + LoopAgent that surfaces composition.*
  events end-to-end.

Run it
    # Default: OCI Generative AI auto-detected from ~/.oci/config
    python examples/notebook_62_event_catalogue.py

    # Offline / no credentials:
    LOCUS_MODEL_PROVIDER=mock python examples/notebook_62_event_catalogue.py
"""

from __future__ import annotations

import asyncio
import sys
from collections import defaultdict

from config import get_model

from locus.agent import Agent
from locus.agent.composition import LoopAgent, SequentialPipeline
from locus.observability import get_event_bus, run_context


# Part 1: enumerate the EV_* constants at runtime so the catalogue
# never drifts from the codebase.


def part1_catalogue_tour() -> None:
    print("\n--- Part 1: canonical event_type catalogue ---")

    emit_mod = sys.modules["locus.observability.emit"]
    by_prefix: dict[str, list[str]] = defaultdict(list)
    for name in dir(emit_mod):
        if not name.startswith("EV_"):
            continue
        value = getattr(emit_mod, name)
        if not isinstance(value, str):
            continue
        prefix = value.split(".", 1)[0]
        by_prefix[prefix].append(value)

    for prefix in sorted(by_prefix):
        print(f"  {prefix}.*  ({len(by_prefix[prefix])} events)")
        for ev in sorted(by_prefix[prefix]):
            print(f"    - {ev}")


# Part 2: SequentialPipeline and LoopAgent emit composition.* events
# at every stage / iteration boundary.


async def part2_composition() -> None:
    print("\n--- Part 2: composition.* events ---")

    a = Agent(model=get_model(), max_iterations=1)
    b = Agent(model=get_model(), max_iterations=1)

    pipeline = SequentialPipeline(agents=[a, b])

    async with run_context() as rid:
        bus = get_event_bus()

        async def consumer() -> None:
            seen: list[str] = []
            async for ev in bus.subscribe(rid):
                if ev.event_type.startswith("composition."):
                    seen.append(ev.event_type)
                if ev.event_type == "composition.fanout.completed":
                    break
                if ev.event_type == "composition.stage.completed" and ev.data.get("stage") == 1:
                    print("composition events seen so far:", seen)
                    return

        consumer_task = asyncio.create_task(consumer())
        await asyncio.sleep(0)

        await pipeline.run("Tell me a one-line haiku about JSON.")
        await bus.close_stream(rid)
        await consumer_task


async def main() -> None:
    part1_catalogue_tour()
    await part2_composition()


if __name__ == "__main__":
    asyncio.run(main())