Skip to content

Streaming

Handlers that consume the LocusEvent stream from agent.run(...) and emit it somewhere useful — a terminal, an HTTP SSE response, a log buffer, a downstream collector. All handlers share the StreamHandler Protocol so they can be composed via CompositeHandler and filtered via FilteringHandler.

Base contracts

StreamHandler

Bases: Protocol

Protocol for stream event handlers.

Implementations receive events as they occur during agent execution and can process them for display, logging, or forwarding.

on_event async

on_event(event: LocusEvent) -> None

Handle a streaming event.

Parameters:

Name Type Description Default
event LocusEvent

The event to process

required
Source code in src/locus/streaming/handler.py
async def on_event(self, event: LocusEvent) -> None:
    """Handle a streaming event.

    Args:
        event: The event to process
    """
    ...

on_complete async

on_complete() -> None

Called when streaming is complete.

Use for cleanup, final output, etc.

Source code in src/locus/streaming/handler.py
async def on_complete(self) -> None:
    """Called when streaming is complete.

    Use for cleanup, final output, etc.
    """
    ...

on_error async

on_error(error: Exception) -> None

Handle a streaming error.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/handler.py
async def on_error(self, error: Exception) -> None:
    """Handle a streaming error.

    Args:
        error: The error that occurred
    """
    ...

BaseStreamHandler

Bases: ABC

Abstract base class for stream handlers.

Provides default implementations for on_complete and on_error, only requiring subclasses to implement on_event.

on_event abstractmethod async

on_event(event: LocusEvent) -> None

Handle a streaming event.

Parameters:

Name Type Description Default
event LocusEvent

The event to process

required
Source code in src/locus/streaming/handler.py
@abstractmethod
async def on_event(self, event: LocusEvent) -> None:
    """Handle a streaming event.

    Args:
        event: The event to process
    """
    ...

on_complete async

on_complete() -> None

Called when streaming is complete.

Default implementation does nothing. Override to add cleanup or final output.

Source code in src/locus/streaming/handler.py
async def on_complete(self) -> None:
    """Called when streaming is complete.

    Default implementation does nothing.
    Override to add cleanup or final output.
    """

on_error async

on_error(error: Exception) -> None

Handle a streaming error.

Default implementation does nothing. Override to add error handling.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/handler.py
async def on_error(self, error: Exception) -> None:
    """Handle a streaming error.

    Default implementation does nothing.
    Override to add error handling.

    Args:
        error: The error that occurred
    """

Composition

CompositeHandler

CompositeHandler(handlers: list[StreamHandler] | None = None)

Bases: BaseStreamHandler

Handler that delegates to multiple child handlers.

Useful for sending events to multiple destinations simultaneously.

Example

handler = CompositeHandler( ... [ ... ConsoleHandler(), ... SSEHandler(), ... LoggingHandler(), ... ] ... )

Initialize with optional list of handlers.

Parameters:

Name Type Description Default
handlers list[StreamHandler] | None

List of handlers to delegate to

None
Source code in src/locus/streaming/handler.py
def __init__(self, handlers: list[StreamHandler] | None = None):
    """Initialize with optional list of handlers.

    Args:
        handlers: List of handlers to delegate to
    """
    self.handlers: list[StreamHandler] = list(handlers) if handlers else []

add_handler

add_handler(handler: StreamHandler) -> None

Add a handler to the composite.

Parameters:

Name Type Description Default
handler StreamHandler

Handler to add

required
Source code in src/locus/streaming/handler.py
def add_handler(self, handler: StreamHandler) -> None:
    """Add a handler to the composite.

    Args:
        handler: Handler to add
    """
    self.handlers.append(handler)

remove_handler

remove_handler(handler: StreamHandler) -> None

Remove a handler from the composite.

Parameters:

Name Type Description Default
handler StreamHandler

Handler to remove

required
Source code in src/locus/streaming/handler.py
def remove_handler(self, handler: StreamHandler) -> None:
    """Remove a handler from the composite.

    Args:
        handler: Handler to remove
    """
    self.handlers.remove(handler)

on_event async

on_event(event: LocusEvent) -> None

Delegate event to all handlers.

Parameters:

Name Type Description Default
event LocusEvent

The event to process

required
Source code in src/locus/streaming/handler.py
async def on_event(self, event: LocusEvent) -> None:
    """Delegate event to all handlers.

    Args:
        event: The event to process
    """
    for handler in self.handlers:
        await handler.on_event(event)

on_complete async

on_complete() -> None

Delegate completion to all handlers.

Source code in src/locus/streaming/handler.py
async def on_complete(self) -> None:
    """Delegate completion to all handlers."""
    for handler in self.handlers:
        await handler.on_complete()

on_error async

on_error(error: Exception) -> None

Delegate error to all handlers.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/handler.py
async def on_error(self, error: Exception) -> None:
    """Delegate error to all handlers.

    Args:
        error: The error that occurred
    """
    for handler in self.handlers:
        await handler.on_error(error)

FilteringHandler

FilteringHandler(delegate: StreamHandler, event_types: set[str] | None = None, exclude_types: set[str] | None = None, filter_fn: Any | None = None)

Bases: BaseStreamHandler

Handler that filters events before delegating.

Example

handler = FilteringHandler( ... delegate=ConsoleHandler(), ... event_types={"think", "tool_complete"}, ... )

Initialize the filtering handler.

Parameters:

Name Type Description Default
delegate StreamHandler

Handler to delegate matching events to

required
event_types set[str] | None

If set, only these event types are forwarded

None
exclude_types set[str] | None

If set, these event types are excluded

None
filter_fn Any | None

Optional custom filter function

None
Source code in src/locus/streaming/handler.py
def __init__(
    self,
    delegate: StreamHandler,
    event_types: set[str] | None = None,
    exclude_types: set[str] | None = None,
    filter_fn: Any | None = None,  # Callable[[LocusEvent], bool]
):
    """Initialize the filtering handler.

    Args:
        delegate: Handler to delegate matching events to
        event_types: If set, only these event types are forwarded
        exclude_types: If set, these event types are excluded
        filter_fn: Optional custom filter function
    """
    self.delegate = delegate
    self.event_types = event_types
    self.exclude_types = exclude_types or set()
    self.filter_fn = filter_fn

on_event async

on_event(event: LocusEvent) -> None

Forward event if it passes filters.

Parameters:

Name Type Description Default
event LocusEvent

The event to filter and possibly forward

required
Source code in src/locus/streaming/handler.py
async def on_event(self, event: LocusEvent) -> None:
    """Forward event if it passes filters.

    Args:
        event: The event to filter and possibly forward
    """
    if self._should_forward(event):
        await self.delegate.on_event(event)

on_complete async

on_complete() -> None

Delegate completion.

Source code in src/locus/streaming/handler.py
async def on_complete(self) -> None:
    """Delegate completion."""
    await self.delegate.on_complete()

on_error async

on_error(error: Exception) -> None

Delegate error.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/handler.py
async def on_error(self, error: Exception) -> None:
    """Delegate error.

    Args:
        error: The error that occurred
    """
    await self.delegate.on_error(error)

BufferingHandler

BufferingHandler(max_size: int | None = None)

Bases: BaseStreamHandler

Handler that buffers events for later processing.

Useful for testing or when events need to be processed in batches.

Example

handler = BufferingHandler() await handler.on_event(event1) await handler.on_event(event2) events = handler.get_events() # [event1, event2]

Initialize the buffer.

Parameters:

Name Type Description Default
max_size int | None

Maximum number of events to buffer (None for unlimited)

None
Source code in src/locus/streaming/handler.py
def __init__(self, max_size: int | None = None):
    """Initialize the buffer.

    Args:
        max_size: Maximum number of events to buffer (None for unlimited)
    """
    self.max_size = max_size
    self._events: list[LocusEvent] = []
    self._errors: list[Exception] = []
    self._complete: bool = False

is_complete property

is_complete: bool

Check if streaming completed.

on_event async

on_event(event: LocusEvent) -> None

Buffer an event.

Parameters:

Name Type Description Default
event LocusEvent

The event to buffer

required
Source code in src/locus/streaming/handler.py
async def on_event(self, event: LocusEvent) -> None:
    """Buffer an event.

    Args:
        event: The event to buffer
    """
    if self.max_size is not None and len(self._events) >= self.max_size:
        self._events.pop(0)
    self._events.append(event)

on_complete async

on_complete() -> None

Mark streaming as complete.

Source code in src/locus/streaming/handler.py
async def on_complete(self) -> None:
    """Mark streaming as complete."""
    self._complete = True

on_error async

on_error(error: Exception) -> None

Record an error.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/handler.py
async def on_error(self, error: Exception) -> None:
    """Record an error.

    Args:
        error: The error that occurred
    """
    self._errors.append(error)

get_events

get_events() -> list[LocusEvent]

Get all buffered events.

Returns:

Type Description
list[LocusEvent]

List of buffered events

Source code in src/locus/streaming/handler.py
def get_events(self) -> list[LocusEvent]:
    """Get all buffered events.

    Returns:
        List of buffered events
    """
    return list(self._events)

get_errors

get_errors() -> list[Exception]

Get all recorded errors.

Returns:

Type Description
list[Exception]

List of recorded errors

Source code in src/locus/streaming/handler.py
def get_errors(self) -> list[Exception]:
    """Get all recorded errors.

    Returns:
        List of recorded errors
    """
    return list(self._errors)

clear

clear() -> None

Clear all buffered events and errors.

Source code in src/locus/streaming/handler.py
def clear(self) -> None:
    """Clear all buffered events and errors."""
    self._events.clear()
    self._errors.clear()
    self._complete = False

Console output

ConsoleHandler

ConsoleHandler(output: IO[str] | None = None, show_reasoning: bool = True, show_tool_args: bool = False, show_tool_results: bool = True, show_timestamps: bool = False, show_progress: bool = True, use_color: bool = True, use_emoji: bool = True, max_result_length: int = 500, indent: str = '  ')

Bases: BaseStreamHandler

Stream handler that outputs to console with rich formatting.

Provides visual feedback during agent execution including: - Progress indicators - Tool call visualization - Reasoning display - Color-coded output (when terminal supports it)

Example

handler = ConsoleHandler(show_reasoning=True) await handler.on_event(think_event)

Initialize the console handler.

Parameters:

Name Type Description Default
output IO[str] | None

Output stream (defaults to sys.stdout)

None
show_reasoning bool

Whether to show agent reasoning

True
show_tool_args bool

Whether to show tool arguments

False
show_tool_results bool

Whether to show tool results

True
show_timestamps bool

Whether to show timestamps

False
show_progress bool

Whether to show progress indicators

True
use_color bool

Whether to use ANSI colors

True
use_emoji bool

Whether to use emoji symbols

True
max_result_length int

Maximum length for tool results

500
indent str

Indentation string

' '
Source code in src/locus/streaming/console.py
def __init__(
    self,
    output: IO[str] | None = None,
    show_reasoning: bool = True,
    show_tool_args: bool = False,
    show_tool_results: bool = True,
    show_timestamps: bool = False,
    show_progress: bool = True,
    use_color: bool = True,
    use_emoji: bool = True,
    max_result_length: int = 500,
    indent: str = "  ",
):
    """Initialize the console handler.

    Args:
        output: Output stream (defaults to sys.stdout)
        show_reasoning: Whether to show agent reasoning
        show_tool_args: Whether to show tool arguments
        show_tool_results: Whether to show tool results
        show_timestamps: Whether to show timestamps
        show_progress: Whether to show progress indicators
        use_color: Whether to use ANSI colors
        use_emoji: Whether to use emoji symbols
        max_result_length: Maximum length for tool results
        indent: Indentation string
    """
    self.output = output or sys.stdout
    self.show_reasoning = show_reasoning
    self.show_tool_args = show_tool_args
    self.show_tool_results = show_tool_results
    self.show_timestamps = show_timestamps
    self.show_progress = show_progress
    self.use_color = use_color and self._supports_color()
    self.use_emoji = use_emoji
    self.max_result_length = max_result_length
    self.indent = indent

    self._iteration = 0
    self._tool_count = 0
    self._active_tools: dict[str, str] = {}

on_event async

on_event(event: LocusEvent) -> None

Handle a streaming event.

Parameters:

Name Type Description Default
event LocusEvent

The event to process

required
Source code in src/locus/streaming/console.py
async def on_event(self, event: LocusEvent) -> None:
    """Handle a streaming event.

    Args:
        event: The event to process
    """
    handler = getattr(self, f"_handle_{event.event_type}", None)
    if handler:
        handler(event)
    else:
        self._handle_unknown(event)

on_complete async

on_complete() -> None

Called when streaming is complete.

Source code in src/locus/streaming/console.py
async def on_complete(self) -> None:
    """Called when streaming is complete."""
    self._write("")  # Empty line
    self._write(
        f"{self._symbol('terminate')} "
        f"{self._color('Execution complete', 'green')} "
        f"({self._tool_count} tool calls)"
    )

on_error async

on_error(error: Exception) -> None

Handle a streaming error.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/console.py
async def on_error(self, error: Exception) -> None:
    """Handle a streaming error.

    Args:
        error: The error that occurred
    """
    self._write(f"{self._symbol('error')} {self._color('Error:', 'red')} {error!s}")

MinimalConsoleHandler

MinimalConsoleHandler(output: IO[str] | None = None)

Bases: BaseStreamHandler

Minimal console handler showing only essential output.

Shows tool calls and final result, hiding reasoning and details.

Initialize minimal handler.

Parameters:

Name Type Description Default
output IO[str] | None

Output stream (defaults to sys.stdout)

None
Source code in src/locus/streaming/console.py
def __init__(self, output: IO[str] | None = None):
    """Initialize minimal handler.

    Args:
        output: Output stream (defaults to sys.stdout)
    """
    self.output = output or sys.stdout
    self._result: str | None = None

on_event async

on_event(event: LocusEvent) -> None

Handle events minimally.

Source code in src/locus/streaming/console.py
async def on_event(self, event: LocusEvent) -> None:
    """Handle events minimally."""
    if isinstance(event, ToolStartEvent):
        self.output.write(f"• {event.tool_name}\n")
        self.output.flush()
    elif isinstance(event, ToolCompleteEvent) and event.error:
        self.output.write(f"  Error: {event.error}\n")
        self.output.flush()
    elif isinstance(event, TerminateEvent):
        self.output.write(f"\nCompleted in {event.iterations_used} iterations\n")
        self.output.flush()

on_complete async

on_complete() -> None

Handle completion.

Source code in src/locus/streaming/console.py
async def on_complete(self) -> None:
    """Handle completion."""

on_error async

on_error(error: Exception) -> None

Handle error.

Source code in src/locus/streaming/console.py
async def on_error(self, error: Exception) -> None:
    """Handle error."""
    self.output.write(f"Error: {error}\n")
    self.output.flush()

Server-Sent Events (SSE)

Wire-format helpers for HTTP SSE responses. Use SSEHandler with sync frameworks, AsyncSSEHandler with FastAPI / Starlette.

SSEHandler

SSEHandler(include_timestamp: bool = True, include_id: bool = True, id_prefix: str = '', custom_serializer: Any | None = None)

Bases: BaseStreamHandler

Stream handler that formats events as Server-Sent Events.

Supports all 37+ event types from the Locus event system, formatting them as SSE messages for HTTP streaming.

Example

handler = SSEHandler() await handler.on_event(think_event) async for message in handler.messages(): ... yield message.format()

The handler can be used in two modes: 1. Pull mode: Collect messages via messages() async generator 2. Push mode: Provide a callback for immediate message handling

Initialize the SSE handler.

Parameters:

Name Type Description Default
include_timestamp bool

Whether to include timestamp in data

True
include_id bool

Whether to include event IDs

True
id_prefix str

Prefix for event IDs

''
custom_serializer Any | None

Optional custom event serializer

None
Source code in src/locus/streaming/sse.py
def __init__(
    self,
    include_timestamp: bool = True,
    include_id: bool = True,
    id_prefix: str = "",
    custom_serializer: Any | None = None,  # Callable[[LocusEvent], dict]
):
    """Initialize the SSE handler.

    Args:
        include_timestamp: Whether to include timestamp in data
        include_id: Whether to include event IDs
        id_prefix: Prefix for event IDs
        custom_serializer: Optional custom event serializer
    """
    self.include_timestamp = include_timestamp
    self.include_id = include_id
    self.id_prefix = id_prefix
    self.custom_serializer = custom_serializer

    self._messages: list[SSEMessage] = []
    self._event_counter = 0
    self._complete = False
    self._error: Exception | None = None

is_complete property

is_complete: bool

Check if streaming is complete.

has_error property

has_error: bool

Check if an error occurred.

on_event async

on_event(event: LocusEvent) -> None

Handle a streaming event.

Converts the event to an SSE message and buffers it.

Parameters:

Name Type Description Default
event LocusEvent

The event to process

required
Source code in src/locus/streaming/sse.py
async def on_event(self, event: LocusEvent) -> None:
    """Handle a streaming event.

    Converts the event to an SSE message and buffers it.

    Args:
        event: The event to process
    """
    data = self._serialize_event(event)
    message = self._create_message(event.event_type, data)
    self._messages.append(message)

on_complete async

on_complete() -> None

Handle stream completion.

Adds a special "done" event to signal completion.

Source code in src/locus/streaming/sse.py
async def on_complete(self) -> None:
    """Handle stream completion.

    Adds a special "done" event to signal completion.
    """
    self._complete = True
    message = self._create_message(
        "done",
        {"status": "complete", "total_events": self._event_counter},
    )
    self._messages.append(message)

on_error async

on_error(error: Exception) -> None

Handle a streaming error.

Adds an error event and marks stream as complete.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/sse.py
async def on_error(self, error: Exception) -> None:
    """Handle a streaming error.

    Adds an error event and marks stream as complete.

    Args:
        error: The error that occurred
    """
    self._error = error
    self._complete = True
    message = self._create_message("error", _build_error_payload(error))
    self._messages.append(message)

get_messages

get_messages() -> list[SSEMessage]

Get all buffered messages.

Returns:

Type Description
list[SSEMessage]

List of SSE messages

Source code in src/locus/streaming/sse.py
def get_messages(self) -> list[SSEMessage]:
    """Get all buffered messages.

    Returns:
        List of SSE messages
    """
    return list(self._messages)

pop_messages

pop_messages() -> list[SSEMessage]

Get and clear all buffered messages.

Returns:

Type Description
list[SSEMessage]

List of SSE messages (buffer is cleared)

Source code in src/locus/streaming/sse.py
def pop_messages(self) -> list[SSEMessage]:
    """Get and clear all buffered messages.

    Returns:
        List of SSE messages (buffer is cleared)
    """
    messages = self._messages
    self._messages = []
    return messages

format_all

format_all() -> str

Format all buffered messages as SSE.

Returns:

Type Description
str

Complete SSE-formatted string

Source code in src/locus/streaming/sse.py
def format_all(self) -> str:
    """Format all buffered messages as SSE.

    Returns:
        Complete SSE-formatted string
    """
    return "".join(msg.format() for msg in self._messages)

clear

clear() -> None

Clear all buffered messages and reset state.

Source code in src/locus/streaming/sse.py
def clear(self) -> None:
    """Clear all buffered messages and reset state."""
    self._messages.clear()
    self._event_counter = 0
    self._complete = False
    self._error = None

AsyncSSEHandler

AsyncSSEHandler(include_timestamp: bool = True, include_id: bool = True, id_prefix: str = '')

Bases: BaseStreamHandler

Async SSE handler with async generator output.

Provides an async generator interface for streaming SSE messages, suitable for use with async web frameworks.

Example

handler = AsyncSSEHandler()

In a background task, events are sent to handler

async for message in handler.stream(): ... await response.write(message)

Initialize the async SSE handler.

Parameters:

Name Type Description Default
include_timestamp bool

Whether to include timestamp in data

True
include_id bool

Whether to include event IDs

True
id_prefix str

Prefix for event IDs

''
Source code in src/locus/streaming/sse.py
def __init__(
    self,
    include_timestamp: bool = True,
    include_id: bool = True,
    id_prefix: str = "",
):
    """Initialize the async SSE handler.

    Args:
        include_timestamp: Whether to include timestamp in data
        include_id: Whether to include event IDs
        id_prefix: Prefix for event IDs
    """
    self.include_timestamp = include_timestamp
    self.include_id = include_id
    self.id_prefix = id_prefix

    self._event_counter = 0
    self._complete = False
    self._error: Exception | None = None

    # Use asyncio.Queue for async message passing
    import asyncio

    self._queue: asyncio.Queue[SSEMessage | None] = asyncio.Queue()

is_complete property

is_complete: bool

Check if streaming is complete.

on_event async

on_event(event: LocusEvent) -> None

Handle a streaming event.

Parameters:

Name Type Description Default
event LocusEvent

The event to process

required
Source code in src/locus/streaming/sse.py
async def on_event(self, event: LocusEvent) -> None:
    """Handle a streaming event.

    Args:
        event: The event to process
    """
    data = self._serialize_event(event)
    message = self._create_message(event.event_type, data)
    await self._queue.put(message)

on_complete async

on_complete() -> None

Handle stream completion.

Source code in src/locus/streaming/sse.py
async def on_complete(self) -> None:
    """Handle stream completion."""
    self._complete = True
    message = self._create_message(
        "done",
        {"status": "complete", "total_events": self._event_counter},
    )
    await self._queue.put(message)
    await self._queue.put(None)  # Signal end

on_error async

on_error(error: Exception) -> None

Handle a streaming error.

Parameters:

Name Type Description Default
error Exception

The error that occurred

required
Source code in src/locus/streaming/sse.py
async def on_error(self, error: Exception) -> None:
    """Handle a streaming error.

    Args:
        error: The error that occurred
    """
    self._error = error
    self._complete = True
    message = self._create_message("error", _build_error_payload(error))
    await self._queue.put(message)
    await self._queue.put(None)  # Signal end

stream async

stream() -> AsyncIterator[str]

Stream SSE-formatted messages.

Yields:

Type Description
AsyncIterator[str]

SSE-formatted strings ready for HTTP response

Source code in src/locus/streaming/sse.py
async def stream(self) -> AsyncIterator[str]:
    """Stream SSE-formatted messages.

    Yields:
        SSE-formatted strings ready for HTTP response
    """
    while True:
        message = await self._queue.get()
        if message is None:
            break
        yield message.format()

stream_messages async

stream_messages() -> AsyncIterator[SSEMessage]

Stream SSE message objects.

Yields:

Type Description
AsyncIterator[SSEMessage]

SSEMessage objects

Source code in src/locus/streaming/sse.py
async def stream_messages(self) -> AsyncIterator[SSEMessage]:
    """Stream SSE message objects.

    Yields:
        SSEMessage objects
    """
    while True:
        message = await self._queue.get()
        if message is None:
            break
        yield message

SSEMessage

Bases: BaseModel

A Server-Sent Event message.

Format follows the SSE specification: - event: Event type (optional) - data: Event data (required) - id: Event ID (optional) - retry: Reconnection time in ms (optional)

format

format() -> str

Format as SSE wire protocol.

Returns:

Type Description
str

SSE-formatted string ready for transmission

Source code in src/locus/streaming/sse.py
def format(self) -> str:
    """Format as SSE wire protocol.

    Returns:
        SSE-formatted string ready for transmission
    """
    lines: list[str] = []

    if self.event is not None:
        lines.append(f"event: {self.event}")

    if self.id is not None:
        lines.append(f"id: {self.id}")

    if self.retry is not None:
        lines.append(f"retry: {self.retry}")

    # Data can be multi-line, each line needs "data: " prefix
    for line in self.data.split("\n"):
        lines.append(f"data: {line}")

    # End with empty line
    lines.append("")
    lines.append("")

    return "\n".join(lines)

create_sse_response_headers

create_sse_response_headers() -> dict[str, str]

Create standard SSE HTTP response headers.

Returns:

Type Description
dict[str, str]

Dictionary of HTTP headers for SSE response

Source code in src/locus/streaming/sse.py
def create_sse_response_headers() -> dict[str, str]:
    """Create standard SSE HTTP response headers.

    Returns:
        Dictionary of HTTP headers for SSE response
    """
    return {
        "Content-Type": "text/event-stream",
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no",  # Disable nginx buffering
    }

Structured streaming

Stream the agent's structured output (AgentConfig.output_schema) as incremental JSON tokens so the UI can render partial fields as they arrive.

StructuredStream

StructuredStream(events: AsyncIterator[LocusEvent], schema: type[T], *, emit_unchanged: bool = False)

Bases: Generic[T]

Async iterator yielding partial Pydantic instances from an agent run.

Build a structured stream wrapper.

Parameters:

Name Type Description Default
events AsyncIterator[LocusEvent]

The event iterator from Agent.run(...).

required
schema type[T]

The Pydantic schema to coerce the streamed JSON into.

required
emit_unchanged bool

When False (default) we only yield a partial if the parsed model differs from the previous emit; when True every new chunk that successfully parses is yielded. The default reduces churn for UIs that rerender on each yield.

False
Source code in src/locus/streaming/structured.py
def __init__(
    self,
    events: AsyncIterator[LocusEvent],
    schema: type[T],
    *,
    emit_unchanged: bool = False,
) -> None:
    """Build a structured stream wrapper.

    Args:
        events: The event iterator from ``Agent.run(...)``.
        schema: The Pydantic schema to coerce the streamed JSON into.
        emit_unchanged: When ``False`` (default) we only yield a partial
            if the parsed model differs from the previous emit; when
            ``True`` every new chunk that successfully parses is yielded.
            The default reduces churn for UIs that rerender on each yield.
    """
    self._events = events
    self._schema = schema
    self._emit_unchanged = emit_unchanged
    self._buffer = ""
    self._final: T | None = None
    self._final_message: str | None = None
    self._terminate_reason: str | None = None
    self._last_emit: T | None = None

final property

final: T | None

The final fully-validated instance (or None if parsing failed).

terminate_reason property

terminate_reason: str | None

The TerminateEvent.reason that ended the stream, if any.

stream_structured

stream_structured(events: AsyncIterator[LocusEvent], schema: type[T], *, emit_unchanged: bool = False) -> StructuredStream[T]

Convenience factory mirroring :class:StructuredStream.

Equivalent to StructuredStream(events, schema) but reads as a verb in user code: async for snap in stream_structured(agent.run(...), Foo): ....

Source code in src/locus/streaming/structured.py
def stream_structured(
    events: AsyncIterator[LocusEvent],
    schema: type[T],
    *,
    emit_unchanged: bool = False,
) -> StructuredStream[T]:
    """Convenience factory mirroring :class:`StructuredStream`.

    Equivalent to ``StructuredStream(events, schema)`` but reads as a verb in
    user code: ``async for snap in stream_structured(agent.run(...), Foo): ...``.
    """
    return StructuredStream(events, schema, emit_unchanged=emit_unchanged)