Tutorial 21: SSE Streaming¶
This tutorial demonstrates Server-Sent Events (SSE) streaming for real-time web applications.
Topics covered:
- SSE message format
- SSEHandler for buffered output
- AsyncSSEHandler for streaming
- Event serialization
- Integration with web frameworks
Run with: python examples/tutorial_21_sse_streaming.py
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 21: SSE Streaming
This tutorial demonstrates Server-Sent Events (SSE) streaming
for real-time web applications.
Topics covered:
1. SSE message format
2. SSEHandler for buffered output
3. AsyncSSEHandler for streaming
4. Event serialization
5. Integration with web frameworks
Run with:
python examples/tutorial_21_sse_streaming.py
"""
import asyncio
from datetime import UTC, datetime
from locus.core.events import (
LocusEvent,
ThinkEvent,
ToolCompleteEvent,
ToolStartEvent,
)
from locus.streaming.sse import (
AsyncSSEHandler,
SSEHandler,
SSEMessage,
create_sse_response_headers,
)
async def main():
print("=" * 60)
print("Tutorial 21: SSE Streaming")
print("=" * 60)
# =========================================================================
# Part 1: SSE Message Format
# =========================================================================
print("\n=== Part 1: SSE Message Format ===\n")
# SSE messages follow a specific wire format
message = SSEMessage(
event="thinking",
data='{"content": "Analyzing the request..."}',
id="1",
)
print("SSE Message components:")
print(f" event: {message.event}")
print(f" data: {message.data}")
print(f" id: {message.id}")
# Format for HTTP transmission
wire_format = message.format()
print("\nWire format:")
print("-" * 30)
print(wire_format)
print("-" * 30)
# =========================================================================
# Part 2: Creating SSE Messages
# =========================================================================
print("\n=== Part 2: Creating SSE Messages ===\n")
# Different types of SSE messages
messages = [
SSEMessage(event="start", data='{"session_id": "abc123"}'),
SSEMessage(event="chunk", data="Hello"),
SSEMessage(event="chunk", data=" World!"),
SSEMessage(event="done", data='{"status": "complete"}'),
]
print("Message sequence:")
for msg in messages:
print(f" [{msg.event}] {msg.data}")
# Multi-line data
multiline_msg = SSEMessage(
event="code",
data="def hello():\n print('Hello!')\n return True",
)
print("\nMulti-line message format:")
print(multiline_msg.format())
# =========================================================================
# Part 3: SSE Handler (Buffered)
# =========================================================================
print("\n=== Part 3: SSE Handler (Buffered) ===\n")
# Create handler for collecting events
handler = SSEHandler(
include_timestamp=True,
include_id=True,
id_prefix="evt_",
)
print("Handler config:")
print(f" Include timestamp: {handler.include_timestamp}")
print(f" Include ID: {handler.include_id}")
print(f" ID prefix: {handler.id_prefix}")
# Simulate events
events = [
ThinkEvent(iteration=1, reasoning="Analyzing user request"),
ToolStartEvent(tool_name="search", tool_call_id="call_001", arguments={"query": "test"}),
ToolCompleteEvent(tool_name="search", tool_call_id="call_001", result="Found 5 results"),
]
for event in events:
await handler.on_event(event)
# Mark complete
await handler.on_complete()
print(f"\nBuffered messages: {len(handler.get_messages())}")
print(f"Is complete: {handler.is_complete}")
# Get all messages
for msg in handler.get_messages():
print(f" [{msg.event}] id={msg.id}")
# =========================================================================
# Part 4: Formatted Output
# =========================================================================
print("\n=== Part 4: Formatted Output ===\n")
# Get all formatted output
full_output = handler.format_all()
print("Full SSE output (first 500 chars):")
print("-" * 40)
print(full_output[:500] + "..." if len(full_output) > 500 else full_output)
print("-" * 40)
# Pop messages (get and clear)
handler.clear()
await handler.on_event(ThinkEvent(iteration=1, reasoning="New thought"))
popped = handler.pop_messages()
remaining = handler.get_messages()
print(f"\nAfter pop: got {len(popped)}, remaining {len(remaining)}")
# =========================================================================
# Part 5: Error Handling
# =========================================================================
print("\n=== Part 5: Error Handling ===\n")
handler.clear()
# Simulate an error
await handler.on_event(ThinkEvent(iteration=1, reasoning="Starting..."))
await handler.on_error(ValueError("Something went wrong"))
print(f"Has error: {handler.has_error}")
print(f"Is complete: {handler.is_complete}")
for msg in handler.get_messages():
print(f" [{msg.event}] {msg.data[:50]}...")
# =========================================================================
# Part 6: Async SSE Handler
# =========================================================================
print("\n=== Part 6: Async SSE Handler ===\n")
# AsyncSSEHandler uses a queue for streaming
async_handler = AsyncSSEHandler(
include_timestamp=True,
include_id=True,
)
# Simulate producer
async def produce_events():
"""Simulate event production."""
await async_handler.on_event(ThinkEvent(iteration=1, reasoning="Processing..."))
await asyncio.sleep(0.1)
await async_handler.on_event(
ToolStartEvent(tool_name="analyze", tool_call_id="call_002", arguments={})
)
await asyncio.sleep(0.1)
await async_handler.on_complete()
# Simulate consumer
async def consume_events():
"""Consume and print events."""
count = 0
async for sse_text in async_handler.stream():
count += 1
# Just count in demo, real app would send to client
return count
# Run both
producer = asyncio.create_task(produce_events())
count = await consume_events()
await producer
print(f"Streamed {count} SSE messages")
# =========================================================================
# Part 7: HTTP Response Headers
# =========================================================================
print("\n=== Part 7: HTTP Response Headers ===\n")
headers = create_sse_response_headers()
print("SSE Response Headers:")
for name, value in headers.items():
print(f" {name}: {value}")
# =========================================================================
# Part 8: Custom Event Serialization
# =========================================================================
print("\n=== Part 8: Custom Serialization ===\n")
def custom_serializer(event: LocusEvent) -> dict:
"""Custom event serializer with minimal data."""
return {
"type": event.event_type,
"time": datetime.now(UTC).isoformat(),
# Add only essential fields
"data": getattr(event, "reasoning", None) or getattr(event, "result", None),
}
custom_handler = SSEHandler(custom_serializer=custom_serializer)
await custom_handler.on_event(ThinkEvent(iteration=1, reasoning="Custom serialization"))
msg = custom_handler.get_messages()[0]
print("Custom serialized event:")
print(f" {msg.data}")
# =========================================================================
# Part 9: Web Framework Integration
# =========================================================================
print("\n=== Part 9: Web Framework Integration ===\n")
print("FastAPI Example:")
print("-" * 40)
print("""
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from locus.streaming.sse import AsyncSSEHandler, create_sse_response_headers
app = FastAPI()
@app.get("/stream")
async def stream_events():
handler = AsyncSSEHandler()
async def generate():
# Start agent in background
task = asyncio.create_task(run_agent(handler))
# Stream events
async for sse_text in handler.stream():
yield sse_text
await task
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers=create_sse_response_headers(),
)
async def run_agent(handler):
# Your agent logic
await handler.on_event(ThinkEvent(iteration=1, reasoning="Working..."))
await handler.on_complete()
""")
print("-" * 40)
# =========================================================================
# Part 10: Supported Event Types
# =========================================================================
print("\n=== Part 10: Supported Event Types ===\n")
supported_events = [
# Loop events
("think", "Agent thinking/reasoning"),
("tool_start", "Tool execution started"),
("tool_complete", "Tool execution completed"),
("reflect", "Self-reflection result"),
("grounding", "Grounding evaluation"),
("terminate", "Agent terminated"),
# Model events
("model_chunk", "Streaming model output"),
("model_complete", "Model generation complete"),
# Multi-agent events
("specialist_start", "Specialist started"),
("specialist_complete", "Specialist completed"),
("orchestrator_decision", "Orchestrator routing decision"),
# Hook events
("before_invocation", "Before agent invocation"),
("after_invocation", "After agent invocation"),
]
print("Event types for SSE streaming:")
for event_type, description in supported_events:
print(f" {event_type}: {description}")
# =========================================================================
# Part 11: Best Practices
# =========================================================================
print("\n=== Part 11: Best Practices ===\n")
print("1. Always set proper SSE headers")
print("2. Include event IDs for client reconnection")
print("3. Send 'done' event on completion")
print("4. Handle errors gracefully with error events")
print("5. Use async handler for true streaming")
print("6. Keep event data small (< 65KB)")
print("7. Implement client-side reconnection logic")
print("8. Add heartbeat events for long-running ops")
# Heartbeat example
heartbeat = SSEMessage(event="heartbeat", data='{"status": "alive"}')
print(f"\nHeartbeat message:\n{heartbeat.format()}")
# =========================================================================
print("\n" + "=" * 60)
print("Congratulations! You've completed tutorials 13-21.")
print("=" * 60)
print()
print("New tutorials covered:")
print(" 13: Structured Output")
print(" 14: Reasoning Patterns")
print(" 15: Playbooks")
print(" 16: Agent Handoff")
print(" 17: Orchestrator Pattern")
print(" 18: Specialist Agents")
print(" 19: Guardrails & Security")
print(" 20: Checkpoint Backends")
print(" 21: SSE Streaming")
if __name__ == "__main__":
asyncio.run(main())