Streaming¶
Handlers that consume the LocusEvent stream from agent.run(...)
and emit it somewhere useful — a terminal, an HTTP SSE response, a
log buffer, a downstream collector. All handlers share the
StreamHandler Protocol so they can be composed via
CompositeHandler and filtered via FilteringHandler.
Base contracts¶
StreamHandler ¶
Bases: Protocol
Protocol for stream event handlers.
Implementations receive events as they occur during agent execution and can process them for display, logging, or forwarding.
on_event
async
¶
Handle a streaming event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to process |
required |
on_complete
async
¶
on_error
async
¶
Handle a streaming error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
BaseStreamHandler ¶
Bases: ABC
Abstract base class for stream handlers.
Provides default implementations for on_complete and on_error, only requiring subclasses to implement on_event.
on_event
abstractmethod
async
¶
Handle a streaming event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to process |
required |
on_complete
async
¶
Called when streaming is complete.
Default implementation does nothing. Override to add cleanup or final output.
on_error
async
¶
Handle a streaming error.
Default implementation does nothing. Override to add error handling.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
Composition¶
CompositeHandler ¶
Bases: BaseStreamHandler
Handler that delegates to multiple child handlers.
Useful for sending events to multiple destinations simultaneously.
Example
handler = CompositeHandler( ... [ ... ConsoleHandler(), ... SSEHandler(), ... LoggingHandler(), ... ] ... )
Initialize with optional list of handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handlers
|
list[StreamHandler] | None
|
List of handlers to delegate to |
None
|
Source code in src/locus/streaming/handler.py
add_handler ¶
Add a handler to the composite.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
StreamHandler
|
Handler to add |
required |
remove_handler ¶
Remove a handler from the composite.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
handler
|
StreamHandler
|
Handler to remove |
required |
on_event
async
¶
Delegate event to all handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to process |
required |
on_complete
async
¶
on_error
async
¶
Delegate error to all handlers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
FilteringHandler ¶
FilteringHandler(delegate: StreamHandler, event_types: set[str] | None = None, exclude_types: set[str] | None = None, filter_fn: Any | None = None)
Bases: BaseStreamHandler
Handler that filters events before delegating.
Example
handler = FilteringHandler( ... delegate=ConsoleHandler(), ... event_types={"think", "tool_complete"}, ... )
Initialize the filtering handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delegate
|
StreamHandler
|
Handler to delegate matching events to |
required |
event_types
|
set[str] | None
|
If set, only these event types are forwarded |
None
|
exclude_types
|
set[str] | None
|
If set, these event types are excluded |
None
|
filter_fn
|
Any | None
|
Optional custom filter function |
None
|
Source code in src/locus/streaming/handler.py
on_event
async
¶
Forward event if it passes filters.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to filter and possibly forward |
required |
on_complete
async
¶
on_error
async
¶
Delegate error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
BufferingHandler ¶
Bases: BaseStreamHandler
Handler that buffers events for later processing.
Useful for testing or when events need to be processed in batches.
Example
handler = BufferingHandler() await handler.on_event(event1) await handler.on_event(event2) events = handler.get_events() # [event1, event2]
Initialize the buffer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_size
|
int | None
|
Maximum number of events to buffer (None for unlimited) |
None
|
Source code in src/locus/streaming/handler.py
on_event
async
¶
Buffer an event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to buffer |
required |
Source code in src/locus/streaming/handler.py
on_complete
async
¶
on_error
async
¶
Record an error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
get_events ¶
get_errors ¶
Console output¶
ConsoleHandler ¶
ConsoleHandler(output: IO[str] | None = None, show_reasoning: bool = True, show_tool_args: bool = False, show_tool_results: bool = True, show_timestamps: bool = False, show_progress: bool = True, use_color: bool = True, use_emoji: bool = True, max_result_length: int = 500, indent: str = ' ')
Bases: BaseStreamHandler
Stream handler that outputs to console with rich formatting.
Provides visual feedback during agent execution including: - Progress indicators - Tool call visualization - Reasoning display - Color-coded output (when terminal supports it)
Example
handler = ConsoleHandler(show_reasoning=True) await handler.on_event(think_event)
Initialize the console handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output
|
IO[str] | None
|
Output stream (defaults to sys.stdout) |
None
|
show_reasoning
|
bool
|
Whether to show agent reasoning |
True
|
show_tool_args
|
bool
|
Whether to show tool arguments |
False
|
show_tool_results
|
bool
|
Whether to show tool results |
True
|
show_timestamps
|
bool
|
Whether to show timestamps |
False
|
show_progress
|
bool
|
Whether to show progress indicators |
True
|
use_color
|
bool
|
Whether to use ANSI colors |
True
|
use_emoji
|
bool
|
Whether to use emoji symbols |
True
|
max_result_length
|
int
|
Maximum length for tool results |
500
|
indent
|
str
|
Indentation string |
' '
|
Source code in src/locus/streaming/console.py
on_event
async
¶
Handle a streaming event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to process |
required |
Source code in src/locus/streaming/console.py
on_complete
async
¶
Called when streaming is complete.
Source code in src/locus/streaming/console.py
on_error
async
¶
Handle a streaming error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
MinimalConsoleHandler ¶
Bases: BaseStreamHandler
Minimal console handler showing only essential output.
Shows tool calls and final result, hiding reasoning and details.
Initialize minimal handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output
|
IO[str] | None
|
Output stream (defaults to sys.stdout) |
None
|
Source code in src/locus/streaming/console.py
on_event
async
¶
Handle events minimally.
Source code in src/locus/streaming/console.py
on_complete
async
¶
Server-Sent Events (SSE)¶
Wire-format helpers for HTTP SSE responses. Use SSEHandler with sync
frameworks, AsyncSSEHandler with FastAPI / Starlette.
SSEHandler ¶
SSEHandler(include_timestamp: bool = True, include_id: bool = True, id_prefix: str = '', custom_serializer: Any | None = None)
Bases: BaseStreamHandler
Stream handler that formats events as Server-Sent Events.
Supports all 37+ event types from the Locus event system, formatting them as SSE messages for HTTP streaming.
Example
handler = SSEHandler() await handler.on_event(think_event) async for message in handler.messages(): ... yield message.format()
The handler can be used in two modes: 1. Pull mode: Collect messages via messages() async generator 2. Push mode: Provide a callback for immediate message handling
Initialize the SSE handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_timestamp
|
bool
|
Whether to include timestamp in data |
True
|
include_id
|
bool
|
Whether to include event IDs |
True
|
id_prefix
|
str
|
Prefix for event IDs |
''
|
custom_serializer
|
Any | None
|
Optional custom event serializer |
None
|
Source code in src/locus/streaming/sse.py
on_event
async
¶
Handle a streaming event.
Converts the event to an SSE message and buffers it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to process |
required |
Source code in src/locus/streaming/sse.py
on_complete
async
¶
Handle stream completion.
Adds a special "done" event to signal completion.
Source code in src/locus/streaming/sse.py
on_error
async
¶
Handle a streaming error.
Adds an error event and marks stream as complete.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
Source code in src/locus/streaming/sse.py
get_messages ¶
pop_messages ¶
Get and clear all buffered messages.
Returns:
| Type | Description |
|---|---|
list[SSEMessage]
|
List of SSE messages (buffer is cleared) |
format_all ¶
Format all buffered messages as SSE.
Returns:
| Type | Description |
|---|---|
str
|
Complete SSE-formatted string |
AsyncSSEHandler ¶
Bases: BaseStreamHandler
Async SSE handler with async generator output.
Provides an async generator interface for streaming SSE messages, suitable for use with async web frameworks.
Example
handler = AsyncSSEHandler()
In a background task, events are sent to handler¶
async for message in handler.stream(): ... await response.write(message)
Initialize the async SSE handler.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
include_timestamp
|
bool
|
Whether to include timestamp in data |
True
|
include_id
|
bool
|
Whether to include event IDs |
True
|
id_prefix
|
str
|
Prefix for event IDs |
''
|
Source code in src/locus/streaming/sse.py
on_event
async
¶
Handle a streaming event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
LocusEvent
|
The event to process |
required |
Source code in src/locus/streaming/sse.py
on_complete
async
¶
Handle stream completion.
Source code in src/locus/streaming/sse.py
on_error
async
¶
Handle a streaming error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error
|
Exception
|
The error that occurred |
required |
Source code in src/locus/streaming/sse.py
stream
async
¶
Stream SSE-formatted messages.
Yields:
| Type | Description |
|---|---|
AsyncIterator[str]
|
SSE-formatted strings ready for HTTP response |
Source code in src/locus/streaming/sse.py
stream_messages
async
¶
SSEMessage ¶
Bases: BaseModel
A Server-Sent Event message.
Format follows the SSE specification: - event: Event type (optional) - data: Event data (required) - id: Event ID (optional) - retry: Reconnection time in ms (optional)
format ¶
Format as SSE wire protocol.
Returns:
| Type | Description |
|---|---|
str
|
SSE-formatted string ready for transmission |
Source code in src/locus/streaming/sse.py
create_sse_response_headers ¶
Create standard SSE HTTP response headers.
Returns:
| Type | Description |
|---|---|
dict[str, str]
|
Dictionary of HTTP headers for SSE response |
Source code in src/locus/streaming/sse.py
Structured streaming¶
Stream the agent's structured output (AgentConfig.output_schema) as
incremental JSON tokens so the UI can render partial fields as they
arrive.
StructuredStream ¶
StructuredStream(events: AsyncIterator[LocusEvent], schema: type[T], *, emit_unchanged: bool = False)
Bases: Generic[T]
Async iterator yielding partial Pydantic instances from an agent run.
Build a structured stream wrapper.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
events
|
AsyncIterator[LocusEvent]
|
The event iterator from |
required |
schema
|
type[T]
|
The Pydantic schema to coerce the streamed JSON into. |
required |
emit_unchanged
|
bool
|
When |
False
|
Source code in src/locus/streaming/structured.py
stream_structured ¶
stream_structured(events: AsyncIterator[LocusEvent], schema: type[T], *, emit_unchanged: bool = False) -> StructuredStream[T]
Convenience factory mirroring :class:StructuredStream.
Equivalent to StructuredStream(events, schema) but reads as a verb in
user code: async for snap in stream_structured(agent.run(...), Foo): ....