Skip to content

SSE Streaming

Server-Sent Events is the simplest way to push live agent updates from a Python backend to a browser. Locus ships two handlers that turn the agent event stream into SSE wire format: a buffered SSEHandler for tests and a queue-based AsyncSSEHandler for production streaming.

What you'll learn:

  • The SSE wire format and SSEMessage.
  • Buffered vs. async handlers — when to use each.
  • The right HTTP response headers (create_sse_response_headers()).
  • Shaping the wire payload with a custom serializer.
  • A drop-in FastAPI StreamingResponse example.
  • Production checklist: IDs, heartbeats, reconnection, error events.

Run it:

.venv/bin/python examples/notebook_19_sse_streaming.py

This notebook exercises only the SSE plumbing — no LLM call is made, so no provider configuration is required. Set LOCUS_MODEL_PROVIDER=mock if you want a uniform offline setup across notebooks.

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 14: stream agent events to a browser with Server-Sent Events.

SSE is the simplest way to push live updates from a Python backend to a
web client — one HTTP response, one event per line. Locus ships two
handlers that turn the agent event stream into SSE wire format: a
buffered ``SSEHandler`` and a queue-based ``AsyncSSEHandler`` for true
streaming.

Key ideas:
- An ``SSEMessage`` is a small dataclass with ``event``, ``data``, and
  ``id`` fields, plus ``.format()`` to produce the wire bytes.
- ``SSEHandler`` collects messages in memory — good for short runs and
  tests.
- ``AsyncSSEHandler.stream()`` is an async iterator you can hand
  straight to FastAPI / Starlette ``StreamingResponse``.
- ``create_sse_response_headers()`` returns the correct
  ``text/event-stream`` headers and disables proxy buffering.
- Pass ``custom_serializer=...`` to either handler to shape what the
  client sees.

Run it:
    .venv/bin/python examples/notebook_19_sse_streaming.py

This notebook does not call an LLM — it only exercises the SSE
plumbing, so no provider configuration is needed. The mock provider is
still a fine default if you set ``LOCUS_MODEL_PROVIDER=mock``.
"""

import asyncio
from datetime import UTC, datetime

from locus.core.events import (
    LocusEvent,
    ThinkEvent,
    ToolCompleteEvent,
    ToolStartEvent,
)
from locus.streaming.sse import (
    AsyncSSEHandler,
    SSEHandler,
    SSEMessage,
    create_sse_response_headers,
)


async def main():
    print("=" * 60)
    print("Notebook 14: SSE Streaming")
    print("=" * 60)

    # =========================================================================
    # Part 1: the SSE wire format
    # =========================================================================
    print("\n=== Part 1: SSE Message Format ===\n")

    message = SSEMessage(
        event="thinking",
        data='{"content": "Analyzing the request..."}',
        id="1",
    )

    print("SSE Message components:")
    print(f"  event: {message.event}")
    print(f"  data: {message.data}")
    print(f"  id: {message.id}")

    wire_format = message.format()
    print("\nWire format:")
    print("-" * 30)
    print(wire_format)
    print("-" * 30)

    # =========================================================================
    # Part 2: building messages by hand
    # =========================================================================
    print("\n=== Part 2: Creating SSE Messages ===\n")

    messages = [
        SSEMessage(event="start", data='{"session_id": "abc123"}'),
        SSEMessage(event="chunk", data="Hello"),
        SSEMessage(event="chunk", data=" World!"),
        SSEMessage(event="done", data='{"status": "complete"}'),
    ]

    print("Message sequence:")
    for msg in messages:
        print(f"  [{msg.event}] {msg.data}")

    # Multi-line payloads are valid; format() emits one data: line per line.
    multiline_msg = SSEMessage(
        event="code",
        data="def hello():\n    print('Hello!')\n    return True",
    )
    print("\nMulti-line message format:")
    print(multiline_msg.format())

    # =========================================================================
    # Part 3: buffered handler — collect everything, then flush
    # =========================================================================
    print("\n=== Part 3: SSE Handler (Buffered) ===\n")

    handler = SSEHandler(
        include_timestamp=True,
        include_id=True,
        id_prefix="evt_",
    )

    print("Handler config:")
    print(f"  Include timestamp: {handler.include_timestamp}")
    print(f"  Include ID: {handler.include_id}")
    print(f"  ID prefix: {handler.id_prefix}")

    # Stand-in events; in a real app these come from agent.run(...).
    events = [
        ThinkEvent(iteration=1, reasoning="Analyzing user request"),
        ToolStartEvent(tool_name="search", tool_call_id="call_001", arguments={"query": "test"}),
        ToolCompleteEvent(tool_name="search", tool_call_id="call_001", result="Found 5 results"),
    ]

    for event in events:
        await handler.on_event(event)

    await handler.on_complete()

    print(f"\nBuffered messages: {len(handler.get_messages())}")
    print(f"Is complete: {handler.is_complete}")

    for msg in handler.get_messages():
        print(f"  [{msg.event}] id={msg.id}")

    # =========================================================================
    # Part 4: format the buffer to wire bytes
    # =========================================================================
    print("\n=== Part 4: Formatted Output ===\n")

    full_output = handler.format_all()
    print("Full SSE output (first 500 chars):")
    print("-" * 40)
    print(full_output[:500] + "..." if len(full_output) > 500 else full_output)
    print("-" * 40)

    # pop_messages drains and returns — get_messages copies and keeps.
    handler.clear()
    await handler.on_event(ThinkEvent(iteration=1, reasoning="New thought"))
    popped = handler.pop_messages()
    remaining = handler.get_messages()
    print(f"\nAfter pop: got {len(popped)}, remaining {len(remaining)}")

    # =========================================================================
    # Part 5: report errors to the client
    # =========================================================================
    print("\n=== Part 5: Error Handling ===\n")

    handler.clear()

    await handler.on_event(ThinkEvent(iteration=1, reasoning="Starting..."))
    await handler.on_error(ValueError("Something went wrong"))

    print(f"Has error: {handler.has_error}")
    print(f"Is complete: {handler.is_complete}")

    for msg in handler.get_messages():
        print(f"  [{msg.event}] {msg.data[:50]}...")

    # =========================================================================
    # Part 6: async handler — queue-based, true streaming
    # =========================================================================
    print("\n=== Part 6: Async SSE Handler ===\n")

    # AsyncSSEHandler backs the stream with an asyncio.Queue, so producer
    # and consumer can run concurrently — the pattern web frameworks need.
    async_handler = AsyncSSEHandler(
        include_timestamp=True,
        include_id=True,
    )

    async def produce_events():
        await async_handler.on_event(ThinkEvent(iteration=1, reasoning="Processing..."))
        await asyncio.sleep(0.1)
        await async_handler.on_event(
            ToolStartEvent(tool_name="analyze", tool_call_id="call_002", arguments={})
        )
        await asyncio.sleep(0.1)
        await async_handler.on_complete()

    async def consume_events():
        count = 0
        async for sse_text in async_handler.stream():
            count += 1
            # A real app would yield sse_text from a StreamingResponse here.
        return count

    producer = asyncio.create_task(produce_events())
    count = await consume_events()
    await producer

    print(f"Streamed {count} SSE messages")

    # =========================================================================
    # Part 7: the right HTTP response headers
    # =========================================================================
    print("\n=== Part 7: HTTP Response Headers ===\n")

    headers = create_sse_response_headers()

    print("SSE Response Headers:")
    for name, value in headers.items():
        print(f"  {name}: {value}")

    # =========================================================================
    # Part 8: shape the wire payload with a custom serializer
    # =========================================================================
    print("\n=== Part 8: Custom Serialization ===\n")

    def custom_serializer(event: LocusEvent) -> dict:
        """Trim the payload to type, timestamp, and one content field."""
        return {
            "type": event.event_type,
            "time": datetime.now(UTC).isoformat(),
            "data": getattr(event, "reasoning", None) or getattr(event, "result", None),
        }

    custom_handler = SSEHandler(custom_serializer=custom_serializer)

    await custom_handler.on_event(ThinkEvent(iteration=1, reasoning="Custom serialization"))
    msg = custom_handler.get_messages()[0]

    print("Custom serialized event:")
    print(f"  {msg.data}")

    # =========================================================================
    # Part 9: drop it into FastAPI
    # =========================================================================
    print("\n=== Part 9: Web Framework Integration ===\n")

    print("FastAPI Example:")
    print("-" * 40)
    print("""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from locus.streaming.sse import AsyncSSEHandler, create_sse_response_headers

app = FastAPI()

@app.get("/stream")
async def stream_events():
    handler = AsyncSSEHandler()

    async def generate():
        # Start agent in background
        task = asyncio.create_task(run_agent(handler))

        # Stream events
        async for sse_text in handler.stream():
            yield sse_text

        await task

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers=create_sse_response_headers(),
    )

async def run_agent(handler):
    # Your agent logic
    await handler.on_event(ThinkEvent(iteration=1, reasoning="Working..."))
    await handler.on_complete()
""")
    print("-" * 40)

    # =========================================================================
    # Part 10: every Locus event type the handlers know how to render
    # =========================================================================
    print("\n=== Part 10: Supported Event Types ===\n")

    supported_events = [
        # Loop events
        ("think", "Agent thinking/reasoning"),
        ("tool_start", "Tool execution started"),
        ("tool_complete", "Tool execution completed"),
        ("reflect", "Self-reflection result"),
        ("grounding", "Grounding evaluation"),
        ("terminate", "Agent terminated"),
        # Model events
        ("model_chunk", "Streaming model output"),
        ("model_complete", "Model generation complete"),
        # Multi-agent events
        ("specialist_start", "Specialist started"),
        ("specialist_complete", "Specialist completed"),
        ("orchestrator_decision", "Orchestrator routing decision"),
        # Hook events
        ("before_invocation", "Before agent invocation"),
        ("after_invocation", "After agent invocation"),
    ]

    print("Event types for SSE streaming:")
    for event_type, description in supported_events:
        print(f"  {event_type}: {description}")

    # =========================================================================
    # Part 11: production checklist
    # =========================================================================
    print("\n=== Part 11: Best Practices ===\n")

    print("1. Always set proper SSE headers")
    print("2. Include event IDs so clients can reconnect with Last-Event-ID")
    print("3. Send a 'done' event when the agent terminates")
    print("4. Send error events on failure — never leave the stream hanging")
    print("5. Use AsyncSSEHandler for real streaming, not the buffered one")
    print("6. Keep individual event payloads small (< 65KB)")
    print("7. Implement client-side reconnection")
    print("8. Send periodic heartbeats during long tool calls")

    heartbeat = SSEMessage(event="heartbeat", data='{"status": "alive"}')
    print(f"\nHeartbeat message:\n{heartbeat.format()}")

    print("\n" + "=" * 60)
    print("Notebook 14 complete.")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())