Skip to content

Observability

What the agent did, how long each step took, and what it cost — two built-in hooks plus the standard OpenTelemetry stack cover every piece you need. No vendor lock-in: locus emits OTLP, you point it at whatever backend you run.

When to wire what

Need Add
Structured per-event lines for log aggregators (Loki, Splunk, OCI Logging) StructuredLoggingHook
OTLP traces and metrics for dashboards (Grafana, Honeycomb, OCI APM) TelemetryHook
Per-run token totals on every result nothing — AgentResult.metrics already has it
Per-run trace ID surfaced to the user (for support tickets) telemetry hook + log the active span's trace ID

Getting started

Structured logs

import logging
from locus import Agent
from locus.hooks.builtin import StructuredLoggingHook

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[search, summarise],
    hooks=[StructuredLoggingHook(level=logging.INFO)],
)

Every event in the run is emitted as a structured JSON line. Sample (ToolCompleteEvent):

{
  "ts": "2026-05-02T01:31:02Z",
  "thread_id": "th-001",
  "run_id": "run-9c14b1",
  "agent_id": "procurement",
  "event": "tool_complete",
  "tool": "search_vendors",
  "duration_ms": 412,
  "result_size": 2148
}

Pipe stdout to your log aggregator. locus doesn't own the transport — you choose between stdlib logging, structlog, or opentelemetry-logs.

Traces and metrics over OTLP

from locus.hooks.builtin import TelemetryHook

agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[search, summarise],
    hooks=[
        TelemetryHook(
            service_name="procurement-agent",
            record_arguments=False,    # set True to attach tool args to spans
            record_results=False,      # set True for results (watch PII)
        ),
    ],
)

Spans are emitted for every agent invocation, every ReAct iteration, every tool call, and every model call. Metrics include:

Counter What it counts
locus.invocations Calls to agent.run(...)
locus.iterations ReAct iterations across all runs
locus.tool_calls Tool invocations
locus.tool_errors Tool calls that raised
Histogram What it measures
locus.invocation.duration Wall-clock per agent.run(...)
locus.tool_call.duration Wall-clock per tool body

Configure the exporter the standard OpenTelemetry way — set OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_RESOURCE_ATTRIBUTES, etc. before constructing the agent. Anything OTLP works: Honeycomb, Tempo, Grafana Cloud, OCI APM.

Install the optional extra:

pip install "locus-sdk[telemetry]"

Token cost — already on every result

result = agent.run_sync("Plan Q3 launch.")
print(f"prompt:     {result.metrics.prompt_tokens}")
print(f"completion: {result.metrics.completion_tokens}")
print(f"total:      {result.metrics.total_tokens}")
print(f"iterations: {result.metrics.iterations}")

Multiply by your provider's per-token rate to get a per-run cost. For dashboards, key on agent_id plus the same metrics the TelemetryHook already emits — no glue code needed.

PII and tool arguments

record_arguments=True and record_results=True are off by default because tool args and results often contain user input — emails, account numbers, free-text. Turn them on selectively, and only after you've verified your tracing backend has appropriate retention and access controls. For PII redaction inside the agent before anything leaves, see Safety.

Common gotchas

Symptom Likely cause
TelemetryHook raises ImportError pip install "locus-sdk[telemetry]" to get the OpenTelemetry SDK.
No spans show up in your backend Exporter not configured. Set OTEL_EXPORTER_OTLP_ENDPOINT (and OTEL_EXPORTER_OTLP_HEADERS if your backend needs auth) before creating the agent.
Spans land but metrics don't Some OTLP receivers reject metrics on the trace endpoint. Set OTEL_EXPORTER_OTLP_METRICS_ENDPOINT separately if needed.
Token totals are zero The provider isn't returning usage in the response (older Ollama builds, some self-hosted endpoints). The locus loop can't make up the numbers.
Tool args land in your logs unintentionally Either record_arguments=True or your structured logger is dumping the full event dict. Configure either explicitly.

Source and tutorials


In-process SSE (EventBus)

For workbench streaming, real-time dashboards, or any use case where you need to watch the full inner cognition of a run without standing up an OTLP stack, locus ships a zero-dependency in-process pub/sub bus.

How it works

Every emission site in the SDK reads current_run_id() from a ContextVar. When no run_context() is active the emit returns immediately — no bus, no allocation, one ContextVar.get() per call site. The singleton is never instantiated.

from locus.observability import run_context, get_event_bus

async with run_context() as rid:
    # Subscribe before or during a run — history replay delivers the last
    # 500 events on connect, then switches to live mode.
    async for event in get_event_bus().subscribe(rid):
        print(event.event_type, event.data)

The agent yield bridge

Agent.run is decorated with @_bus_bridge. When a run_context is active, every LocusEvent the agent yields is silently republished on the bus as a canonical agent.* event — no hook registration, no config flag:

Inner event Bus event_type
ThinkEvent agent.think
ToolStartEvent agent.tool.started (with span_id)
ToolCompleteEvent agent.tool.completed (matching span_id)
ReflectEvent agent.reflect
GroundingEvent agent.grounding
ModelCompleteEvent agent.model.completed + agent.tokens.used
TerminateEvent agent.terminate

span_id on started/completed pairs lets consumers compute durations and survive interleaved events from concurrent runs without subtracting timestamps.

EventBusHook — when you can't use run_context

For non-async host code or agents you don't control at construction time:

from locus.observability import EventBusHook, get_event_bus
from locus import Agent

run_id = "my-run-1"
agent = Agent(
    model="oci:openai.gpt-5.5",
    tools=[search, summarise],
    hooks=[EventBusHook(run_id=run_id)],
)
result = agent.run_sync("Diagnose the checkout slowdown.")

# Read the history after the fact.
bus = get_event_bus()
for event in bus._history.get(run_id, []):
    print(event.event_type, event.data)

EventBusHook bridges every agent-lifecycle hook (on_before_invocation, on_iteration_start/end, on_before/after_tool_call, on_before/after_model_call) onto the bus. It never mutates events or breaks execution.

Subscribe shapes

Method Scope History replay
bus.subscribe(run_id) One run Yes — last 500 events, then live
bus.subscribe_global() All runs No — live only
bus._history.get(run_id, ()) One run Yes — direct deque read (tests)

Capacity and drop accounting

The bus is bounded. When a subscriber's queue fills, the bus drops the event for that subscriber (1 s timeout) rather than blocking the publisher. Fast subscribers are unaffected.

stats = bus.stats()
print(stats["dropped_events"])    # cumulative drops across all subscribers
print(stats["retained_runs"])     # number of runs with live history
print(stats["subscriber_count"])  # current active subscribers

Default limits (all configurable at EventBus(...) construction time):

Parameter Default
max_queue_size 1024 events per subscriber
history_per_run 500 events
max_runs_retained 200 runs (LRU eviction)

Full event catalogue

See SSE event catalogue for the complete wire-format reference — all EV_* constants, payload fields, span discipline, and the cost table for every subscription scenario.


See also

  • Hooks — both observability hooks plug into the same lifecycle as guardrails / steering / retry.
  • Events — what gets emitted before any hook runs.
  • SSE event catalogue — full wire-format reference for every event_type.
  • Safety — PII redaction before logs leave the box.