Skip to content

Core primitives

The shared types every other module is built on. Most consumers don't import from locus.core directly — the re-exports at the package root (from locus import Message, etc.) are the supported entry points — but the typed protocols, error classes, and control-flow helpers below are the foundation underneath every API.

Configuration

LocusSettings

Bases: BaseSettings

Root settings for Locus SDK.

from_dict classmethod

from_dict(data: dict[str, Any]) -> LocusSettings

Create settings from a dictionary.

Source code in src/locus/core/config.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> LocusSettings:
    """Create settings from a dictionary."""
    return cls.model_validate(data)

to_dict

to_dict() -> dict[str, Any]

Export settings to dictionary.

Source code in src/locus/core/config.py
def to_dict(self) -> dict[str, Any]:
    """Export settings to dictionary."""
    return self.model_dump()

Protocols

The duck-typed contracts that providers, tools, and checkpointers implement. Use these for type hints when accepting "any model" / "any checkpointer" / "any tool" without binding to a concrete class.

ModelProtocol

Bases: Protocol

Protocol for LLM providers.

Implementations must support both completion and streaming.

complete async

complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse

Complete a chat request.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history

required
tools list[dict[str, Any]] | None

Tool schemas in OpenAI format

None
**kwargs Any

Provider-specific options

{}

Returns:

Type Description
ModelResponse

Model response with message and metadata

Source code in src/locus/core/protocols.py
async def complete(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> ModelResponse:
    """
    Complete a chat request.

    Args:
        messages: Conversation history
        tools: Tool schemas in OpenAI format
        **kwargs: Provider-specific options

    Returns:
        Model response with message and metadata
    """
    ...

stream

stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]

Stream a chat response.

Parameters:

Name Type Description Default
messages list[Message]

Conversation history

required
tools list[dict[str, Any]] | None

Tool schemas in OpenAI format

None
**kwargs Any

Provider-specific options

{}

Yields:

Type Description
AsyncIterator[ModelChunkEvent]

Streaming chunks with content and/or tool calls

Source code in src/locus/core/protocols.py
def stream(
    self,
    messages: list[Message],
    tools: list[dict[str, Any]] | None = None,
    **kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
    """
    Stream a chat response.

    Args:
        messages: Conversation history
        tools: Tool schemas in OpenAI format
        **kwargs: Provider-specific options

    Yields:
        Streaming chunks with content and/or tool calls
    """
    ...

ToolProtocol

Bases: Protocol

Protocol for tools that can be called by agents.

Tools must have a name, description, parameters schema, and be callable.

name property

name: str

Unique name of the tool.

description property

description: str

Description of what the tool does.

parameters property

parameters: dict[str, Any]

JSON Schema for tool parameters.

execute async

execute(**kwargs: Any) -> Any

Execute the tool with given arguments.

Parameters:

Name Type Description Default
**kwargs Any

Tool arguments matching the parameters schema

{}

Returns:

Type Description
Any

Tool result (will be converted to string for LLM)

Source code in src/locus/core/protocols.py
async def execute(self, **kwargs: Any) -> Any:
    """
    Execute the tool with given arguments.

    Args:
        **kwargs: Tool arguments matching the parameters schema

    Returns:
        Tool result (will be converted to string for LLM)
    """
    ...

CheckpointerProtocol

Bases: Protocol

Protocol for state persistence.

Implementations handle saving and loading agent state. Extended methods are optional based on capabilities.

capabilities property

capabilities: CheckpointerCapabilities

Return the capabilities of this checkpointer.

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Unique identifier for the conversation thread

required
checkpoint_id str | None

Optional specific checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for querying/filtering

None

Returns:

Type Description
str

Checkpoint ID that can be used to restore

Source code in src/locus/core/protocols.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state.

    Args:
        state: Current agent state
        thread_id: Unique identifier for the conversation thread
        checkpoint_id: Optional specific checkpoint ID
        metadata: Optional metadata for querying/filtering

    Returns:
        Checkpoint ID that can be used to restore
    """
    ...

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint (latest if None)

None

Returns:

Type Description
AgentState | None

Restored state or None if not found

Source code in src/locus/core/protocols.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint (latest if None)

    Returns:
        Restored state or None if not found
    """
    ...

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints for a thread.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/core/protocols.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints for a thread.

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    ...

Messages

The wire format the agent loop emits to model providers and receives back.

Message

Bases: BaseModel

A message in the conversation.

system classmethod

system(content: str) -> Message

Create a system message.

Source code in src/locus/core/messages.py
@classmethod
def system(cls, content: str) -> Message:
    """Create a system message."""
    return cls(role=Role.SYSTEM, content=content)

user classmethod

user(content: str) -> Message

Create a user message.

Source code in src/locus/core/messages.py
@classmethod
def user(cls, content: str) -> Message:
    """Create a user message."""
    return cls(role=Role.USER, content=content)

assistant classmethod

assistant(content: str | None = None, tool_calls: list[ToolCall] | None = None) -> Message

Create an assistant message.

Source code in src/locus/core/messages.py
@classmethod
def assistant(
    cls,
    content: str | None = None,
    tool_calls: list[ToolCall] | None = None,
) -> Message:
    """Create an assistant message."""
    return cls(
        role=Role.ASSISTANT,
        content=content,
        tool_calls=tool_calls or [],
    )

tool classmethod

tool(result: ToolResult) -> Message

Create a tool result message.

Source code in src/locus/core/messages.py
@classmethod
def tool(cls, result: ToolResult) -> Message:
    """Create a tool result message."""
    return cls(
        role=Role.TOOL,
        content=result.content,
        tool_call_id=result.tool_call_id,
        name=result.name,
    )

to_openai_format

to_openai_format() -> dict[str, Any]

Convert to OpenAI API format.

Source code in src/locus/core/messages.py
def to_openai_format(self) -> dict[str, Any]:
    """Convert to OpenAI API format."""
    msg: dict[str, Any] = {"role": self.role.value}

    if self.content is not None:
        msg["content"] = self.content

    if self.tool_calls:
        msg["tool_calls"] = [tc.to_openai_format() for tc in self.tool_calls]

    if self.tool_call_id:
        msg["tool_call_id"] = self.tool_call_id

    if self.name:
        msg["name"] = self.name

    return msg

Role

Bases: StrEnum

Message role in conversation.

ToolCall

Bases: BaseModel

A tool call requested by the model.

to_openai_format

to_openai_format() -> dict[str, Any]

Convert to OpenAI API format.

Source code in src/locus/core/messages.py
def to_openai_format(self) -> dict[str, Any]:
    """Convert to OpenAI API format."""
    return {
        "id": self.id,
        "type": "function",
        "function": {
            "name": self.name,
            "arguments": json.dumps(self.arguments),
        },
    }

ToolResult

Bases: BaseModel

Result from a tool execution.

success property

success: bool

Whether the tool execution succeeded.

Errors

LocusError is the common base — every other exception below is a subclass. Catch LocusError when you want one handler for the whole SDK; catch the specific subclass when you need to recover differently for, e.g., a checkpoint miss vs. a model throttle.

LocusError

LocusError(message: str, *, cause: BaseException | None = None)

Bases: Exception

Base class for every exception raised by Locus.

The kind class attribute is a short stable identifier suitable for structured logging and metrics. Subclasses override it.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

ConfigError

ConfigError(message: str, *, cause: BaseException | None = None)

Bases: LocusError

Configuration is invalid or missing a required value.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

ValidationError

ValidationError(message: str, *, cause: BaseException | None = None)

Bases: LocusError

Caller passed invalid or inconsistent input at a public API boundary.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

Model errors

ModelError

ModelError(message: str, *, cause: BaseException | None = None, reason: Any = None)

Bases: LocusError

Base for LLM provider failures.

Instances may optionally carry a :class:~locus.models.failover.FailoverReason that tells retry / credential-rotation / compaction layers what to do, independent of the exception class. See :func:locus.models.failover.classify for the classifier that produces the reason from an arbitrary provider SDK exception.

Source code in src/locus/core/errors.py
def __init__(
    self,
    message: str,
    *,
    cause: BaseException | None = None,
    reason: Any = None,
) -> None:
    super().__init__(message, cause=cause)
    self.reason = reason

ModelAuthError

ModelAuthError(message: str, *, cause: BaseException | None = None, reason: Any = None)

Bases: ModelError

Authentication / authorization against the model provider failed.

Source code in src/locus/core/errors.py
def __init__(
    self,
    message: str,
    *,
    cause: BaseException | None = None,
    reason: Any = None,
) -> None:
    super().__init__(message, cause=cause)
    self.reason = reason

ModelResponseError

ModelResponseError(message: str, *, cause: BaseException | None = None, reason: Any = None)

Bases: ModelError

Provider returned an unusable response (malformed JSON, empty, refused).

Source code in src/locus/core/errors.py
def __init__(
    self,
    message: str,
    *,
    cause: BaseException | None = None,
    reason: Any = None,
) -> None:
    super().__init__(message, cause=cause)
    self.reason = reason

ModelThrottledError

ModelThrottledError(message: str, *, cause: BaseException | None = None, reason: Any = None)

Bases: ModelError

Provider is rate-limiting or refusing capacity.

Source code in src/locus/core/errors.py
def __init__(
    self,
    message: str,
    *,
    cause: BaseException | None = None,
    reason: Any = None,
) -> None:
    super().__init__(message, cause=cause)
    self.reason = reason

Tool errors

ToolError

ToolError(message: str, *, cause: BaseException | None = None)

Bases: LocusError

Base for tool-related failures (registration, schema, execution).

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

ToolExecutionError

ToolExecutionError(message: str, *, cause: BaseException | None = None)

Bases: ToolError

Tool raised an exception during execution.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

ToolNotFoundError

ToolNotFoundError(message: str, *, cause: BaseException | None = None)

Bases: ToolError

Requested tool is not registered with the agent.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

ToolValidationError

ToolValidationError(message: str, *, cause: BaseException | None = None)

Bases: ToolError

Tool arguments failed schema validation.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

Checkpoint errors

CheckpointError

CheckpointError(message: str, *, cause: BaseException | None = None)

Bases: LocusError

Base for checkpointer / storage-backend failures.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

CheckpointNotFoundError

CheckpointNotFoundError(message: str, *, cause: BaseException | None = None)

Bases: CheckpointError

Requested thread / checkpoint does not exist.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

CheckpointSerializationError

CheckpointSerializationError(message: str, *, cause: BaseException | None = None)

Bases: CheckpointError

Saving or loading failed during (de)serialization.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

RAG errors

RAGError

RAGError(message: str, *, cause: BaseException | None = None)

Bases: LocusError

Base for RAG-subsystem failures (embeddings, vector stores, retrieval).

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

EmbeddingError

EmbeddingError(message: str, *, cause: BaseException | None = None)

Bases: RAGError

Embedding-provider call failed.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

VectorStoreError

VectorStoreError(message: str, *, cause: BaseException | None = None)

Bases: RAGError

Vector-store read/write failed.

Source code in src/locus/core/errors.py
def __init__(self, message: str, *, cause: BaseException | None = None) -> None:
    super().__init__(message)
    if cause is not None:
        self.__cause__ = cause

Control flow

Command and friends are the return values graph / loop nodes use to steer execution: goto("node"), end(value), resume_with(payload). The agent loop also uses interrupt() for human-in-the-loop pauses.

Command

Command

Bases: BaseModel

Control flow command for graph execution.

A Command can: 1. Update state with new values 2. Direct execution to specific node(s) 3. Resume from an interrupt with a value

This enables complex control flow patterns like: - Dynamic routing based on state - Branching to multiple parallel nodes - Returning from interrupts with data

Attributes:

Name Type Description
update dict[str, Any]

State updates to apply (merged with reducers if defined)

goto str | list[str] | None

Target node(s) to execute next. Can be: - str: Single node ID - list[str]: Multiple nodes (parallel execution) - None: Continue with normal graph flow

resume Any

Value to pass back when resuming from interrupt

graph str | None

For subgraph commands, which graph to route to

Example - Simple routing

Command(goto="next_node")

Example - State update with routing

Command(update={"processed": True, "result": data}, goto="output_node")

Example - Parallel fan-out

Command(goto=["worker_1", "worker_2", "worker_3"])

Example - Resume from interrupt

Command(resume="approved")

has_update property

has_update: bool

Whether this command includes state updates.

has_goto property

has_goto: bool

Whether this command specifies routing.

has_resume property

has_resume: bool

Whether this command is resuming from interrupt.

is_parallel_goto property

is_parallel_goto: bool

Whether goto targets multiple nodes.

goto_nodes property

goto_nodes: list[str]

Get list of target nodes (normalizes single/list).

with_update

with_update(**kwargs: Any) -> Command

Return new Command with additional updates merged.

Source code in src/locus/core/command.py
def with_update(self, **kwargs: Any) -> Command:
    """Return new Command with additional updates merged."""
    new_update = {**self.update, **kwargs}
    return self.model_copy(update={"update": new_update})

with_goto

with_goto(target: str | list[str]) -> Command

Return new Command with different goto target.

Source code in src/locus/core/command.py
def with_goto(self, target: str | list[str]) -> Command:
    """Return new Command with different goto target."""
    return self.model_copy(update={"goto": target})

End

Bases: Command

Special command indicating graph completion.

Use this to explicitly terminate graph execution.

Example

async def final_node(inputs): return End(update={"final_result": inputs["data"]})

has_update property

has_update: bool

Whether this command includes state updates.

has_goto property

has_goto: bool

Whether this command specifies routing.

has_resume property

has_resume: bool

Whether this command is resuming from interrupt.

is_parallel_goto property

is_parallel_goto: bool

Whether goto targets multiple nodes.

goto_nodes property

goto_nodes: list[str]

Get list of target nodes (normalizes single/list).

with_update

with_update(**kwargs: Any) -> Command

Return new Command with additional updates merged.

Source code in src/locus/core/command.py
def with_update(self, **kwargs: Any) -> Command:
    """Return new Command with additional updates merged."""
    new_update = {**self.update, **kwargs}
    return self.model_copy(update={"update": new_update})

with_goto

with_goto(target: str | list[str]) -> Command

Return new Command with different goto target.

Source code in src/locus/core/command.py
def with_goto(self, target: str | list[str]) -> Command:
    """Return new Command with different goto target."""
    return self.model_copy(update={"goto": target})

Continue

Bases: Command

Special command indicating normal flow continuation.

Useful when you want to update state but continue with default routing logic.

Example

async def process_node(inputs): result = process(inputs) return Continue(update={"processed": result})

has_update property

has_update: bool

Whether this command includes state updates.

has_goto property

has_goto: bool

Whether this command specifies routing.

has_resume property

has_resume: bool

Whether this command is resuming from interrupt.

is_parallel_goto property

is_parallel_goto: bool

Whether goto targets multiple nodes.

goto_nodes property

goto_nodes: list[str]

Get list of target nodes (normalizes single/list).

with_update

with_update(**kwargs: Any) -> Command

Return new Command with additional updates merged.

Source code in src/locus/core/command.py
def with_update(self, **kwargs: Any) -> Command:
    """Return new Command with additional updates merged."""
    new_update = {**self.update, **kwargs}
    return self.model_copy(update={"update": new_update})

with_goto

with_goto(target: str | list[str]) -> Command

Return new Command with different goto target.

Source code in src/locus/core/command.py
def with_goto(self, target: str | list[str]) -> Command:
    """Return new Command with different goto target."""
    return self.model_copy(update={"goto": target})

goto

goto(target: str | list[str], **updates: Any) -> Command

Create a Command that routes to target node(s).

Parameters:

Name Type Description Default
target str | list[str]

Node ID or list of node IDs

required
**updates Any

Optional state updates

{}

Returns:

Type Description
Command

Command with goto and optional updates

Example

goto("next_node") goto(["worker_1", "worker_2"], task_id=123)

Source code in src/locus/core/command.py
def goto(target: str | list[str], **updates: Any) -> Command:
    """
    Create a Command that routes to target node(s).

    Args:
        target: Node ID or list of node IDs
        **updates: Optional state updates

    Returns:
        Command with goto and optional updates

    Example:
        >>> goto("next_node")
        >>> goto(["worker_1", "worker_2"], task_id=123)
    """
    return Command(update=updates, goto=target)

end

end(**updates: Any) -> End

Create an End command to terminate graph.

Parameters:

Name Type Description Default
**updates Any

Final state updates

{}

Returns:

Type Description
End

End command

Example

end(result="success", data=processed_data)

Source code in src/locus/core/command.py
def end(**updates: Any) -> End:
    """
    Create an End command to terminate graph.

    Args:
        **updates: Final state updates

    Returns:
        End command

    Example:
        >>> end(result="success", data=processed_data)
    """
    return End(update=updates)

resume_with

resume_with(value: Any, **updates: Any) -> Command

Create a Command to resume from interrupt.

Parameters:

Name Type Description Default
value Any

Value to pass to interrupted node

required
**updates Any

Optional state updates

{}

Returns:

Type Description
Command

Command with resume value

Example

resume_with("approved") resume_with({"action": "modify", "changes": data})

Source code in src/locus/core/command.py
def resume_with(value: Any, **updates: Any) -> Command:
    """
    Create a Command to resume from interrupt.

    Args:
        value: Value to pass to interrupted node
        **updates: Optional state updates

    Returns:
        Command with resume value

    Example:
        >>> resume_with("approved")
        >>> resume_with({"action": "modify", "changes": data})
    """
    return Command(update=updates, resume=value)

is_command

is_command(value: Any) -> bool

Check if a value is a Command instance.

Source code in src/locus/core/command.py
def is_command(value: Any) -> bool:
    """Check if a value is a Command instance."""
    return isinstance(value, Command)

normalize_node_output

normalize_node_output(output: Any) -> tuple[dict[str, Any], Command | None]

Normalize node output to (state_update, command).

Nodes can return: - dict: Treated as state update, no routing - Command: Extract update and routing - None: No update, no routing - Other: Wrapped as {"result": value}

Parameters:

Name Type Description Default
output Any

Raw output from node execution

required

Returns:

Type Description
tuple[dict[str, Any], Command | None]

Tuple of (state_update_dict, optional_command)

Source code in src/locus/core/command.py
def normalize_node_output(output: Any) -> tuple[dict[str, Any], Command | None]:
    """
    Normalize node output to (state_update, command).

    Nodes can return:
    - dict: Treated as state update, no routing
    - Command: Extract update and routing
    - None: No update, no routing
    - Other: Wrapped as {"result": value}

    Args:
        output: Raw output from node execution

    Returns:
        Tuple of (state_update_dict, optional_command)
    """
    if output is None:
        return {}, None

    if isinstance(output, Command):
        return dict(output.update), output

    if isinstance(output, dict):
        return output, None

    # Pydantic BaseModel → treat as state update dict (like LangGraph does)
    try:
        from pydantic import BaseModel as _BaseModel

        if isinstance(output, _BaseModel):
            return output.model_dump(mode="python", exclude_none=False), None
    except Exception:  # noqa: BLE001
        pass

    # Wrap other values
    return {"result": output}, None

Interrupt (HITL)

interrupt

interrupt(payload: Any, **metadata: Any) -> Any

Pause graph execution and wait for human input.

When called, this function: 1. If resuming: Returns the resume value immediately 2. If not resuming: Raises InterruptException to pause execution

The graph executor catches the exception, saves state, and returns the interrupt to the caller. When the caller provides a response and resumes, this function returns that response.

Parameters:

Name Type Description Default
payload Any

Data to present to the human. Can be: - str: Simple message/question - dict: Structured data (action details, options, etc.) - Any serializable value

required
**metadata Any

Additional context (not displayed, for tracking)

{}

Returns:

Type Description
Any

The value passed when resuming (via Command(resume=...))

Raises:

Type Description
InterruptException

When not resuming, to pause execution

Example - Simple approval

approval = interrupt("Approve this action?") if approval == "yes": ... execute_action()

Example - Structured data

response = interrupt( ... { ... "type": "confirmation", ... "action": "delete_account", ... "account_id": "12345", ... "options": ["confirm", "cancel", "modify"], ... } ... )

Source code in src/locus/core/interrupt.py
def interrupt(payload: Any, **metadata: Any) -> Any:
    """
    Pause graph execution and wait for human input.

    When called, this function:
    1. If resuming: Returns the resume value immediately
    2. If not resuming: Raises InterruptException to pause execution

    The graph executor catches the exception, saves state,
    and returns the interrupt to the caller. When the caller
    provides a response and resumes, this function returns
    that response.

    Args:
        payload: Data to present to the human. Can be:
            - str: Simple message/question
            - dict: Structured data (action details, options, etc.)
            - Any serializable value
        **metadata: Additional context (not displayed, for tracking)

    Returns:
        The value passed when resuming (via Command(resume=...))

    Raises:
        InterruptException: When not resuming, to pause execution

    Example - Simple approval:
        >>> approval = interrupt("Approve this action?")
        >>> if approval == "yes":
        ...     execute_action()

    Example - Structured data:
        >>> response = interrupt(
        ...     {
        ...         "type": "confirmation",
        ...         "action": "delete_account",
        ...         "account_id": "12345",
        ...         "options": ["confirm", "cancel", "modify"],
        ...     }
        ... )

    Example - With metadata:
        >>> result = interrupt(
        ...     {"question": "Select priority"}, urgency="high", deadline="2024-01-01"
        ... )
    """
    # Check if we're resuming from an interrupt
    if _is_resuming.get():
        resume_val = _resume_value.get()
        # Clear resume state after use
        _is_resuming.set(False)
        _resume_value.set(None)
        return resume_val

    # Not resuming - create and raise interrupt
    node_id = _current_node_id.get()
    graph_id = _current_graph_id.get()

    value = InterruptValue(
        payload=payload,
        node_id=node_id,
        graph_id=graph_id,
        metadata=metadata,
    )

    raise InterruptException(value)

InterruptException

InterruptException(value: InterruptValue)

Bases: Exception

Exception raised to pause graph execution for human input.

This is caught by the graph executor, which saves the current state and returns the interrupt value to the caller.

The caller can then present the interrupt to a human and resume execution with their response.

Source code in src/locus/core/interrupt.py
def __init__(self, value: InterruptValue):
    self.value = value
    super().__init__(f"Interrupt requested: {value.interrupt_id}")

InterruptValue

Bases: BaseModel

Value captured when an interrupt occurs.

Contains all information needed to display to the user and resume execution later.

Attributes:

Name Type Description
interrupt_id str

Unique identifier for this interrupt

payload Any

Data to present to the human (question, context, etc.)

node_id str | None

ID of the node that raised the interrupt

graph_id str | None

ID of the graph being executed

created_at datetime

When the interrupt occurred

metadata dict[str, Any]

Additional context for the interrupt

to_display

to_display() -> dict[str, Any]

Convert to a display-friendly format.

Source code in src/locus/core/interrupt.py
def to_display(self) -> dict[str, Any]:
    """Convert to a display-friendly format."""
    return {
        "interrupt_id": self.interrupt_id,
        "payload": self.payload,
        "node_id": self.node_id,
        "graph_id": self.graph_id,
        "created_at": self.created_at.isoformat(),
        "metadata": self.metadata,
    }

InterruptState

Bases: BaseModel

State saved when graph is interrupted.

This is stored in the checkpoint to enable resumption.

Attributes:

Name Type Description
interrupt InterruptValue

The interrupt value that paused execution

node_id str

Node to resume from

pending_nodes list[str]

Nodes that were scheduled but not yet executed

partial_results dict[str, Any]

Results from nodes completed before interrupt

GraphInterrupted

GraphInterrupted(interrupt_state: InterruptState, checkpoint_id: str | None = None)

Bases: Exception

Raised when graph execution is paused for human input.

Contains all information needed to resume execution.

Source code in src/locus/core/interrupt.py
def __init__(
    self,
    interrupt_state: InterruptState,
    checkpoint_id: str | None = None,
):
    self.interrupt_state = interrupt_state
    self.checkpoint_id = checkpoint_id
    super().__init__(
        f"Graph interrupted at node '{interrupt_state.node_id}': "
        f"{interrupt_state.interrupt.interrupt_id}"
    )

InterruptHandler

Base class for handling interrupts.

Subclass this to create custom interrupt handlers (e.g., CLI prompts, web callbacks, message queues).

Example

class CLIInterruptHandler(InterruptHandler): async def handle(self, interrupt: InterruptValue) -> Any: print(f"Interrupt: {interrupt.payload}") return input("Your response: ")

handle async

handle(interrupt: InterruptValue) -> Any

Handle an interrupt and return the response.

Parameters:

Name Type Description Default
interrupt InterruptValue

The interrupt value to handle

required

Returns:

Type Description
Any

Response value to pass back to the interrupted node

Source code in src/locus/core/interrupt.py
async def handle(self, interrupt: InterruptValue) -> Any:
    """
    Handle an interrupt and return the response.

    Args:
        interrupt: The interrupt value to handle

    Returns:
        Response value to pass back to the interrupted node
    """
    raise NotImplementedError("Subclasses must implement handle()")

can_handle async

can_handle(interrupt: InterruptValue) -> bool

Check if this handler can process the interrupt.

Source code in src/locus/core/interrupt.py
async def can_handle(self, interrupt: InterruptValue) -> bool:
    """Check if this handler can process the interrupt."""
    return True

AutoApproveHandler

AutoApproveHandler(response: Any = 'approved')

Bases: InterruptHandler

Interrupt handler that auto-approves everything.

Useful for testing and automated pipelines.

Source code in src/locus/core/interrupt.py
def __init__(self, response: Any = "approved"):
    self.response = response

handle async

handle(interrupt: InterruptValue) -> Any

Return configured response.

Source code in src/locus/core/interrupt.py
async def handle(self, interrupt: InterruptValue) -> Any:
    """Return configured response."""
    return self.response

can_handle async

can_handle(interrupt: InterruptValue) -> bool

Check if this handler can process the interrupt.

Source code in src/locus/core/interrupt.py
async def can_handle(self, interrupt: InterruptValue) -> bool:
    """Check if this handler can process the interrupt."""
    return True

Send (map-reduce / fan-out)

Send lets a single node dispatch multiple child invocations and gather their results. broadcast / scatter are convenience builders.

Send

Bases: BaseModel

Directive to send data to a specific node for parallel execution.

When a node returns a list of Send objects, the graph executor spawns parallel executions of the target nodes with the given payloads.

Attributes:

Name Type Description
node str

Target node ID to execute

payload dict[str, Any]

Data to pass to the target node

send_id str

Unique identifier for this send (for result tracking)

metadata dict[str, Any]

Additional context for the send operation

Example - Simple send

Send("worker", {"task": "process_data"})

Example - With tracking

Send( ... node="analyzer", ... payload={"data": chunk}, ... metadata={"chunk_index": 0, "total_chunks": 10}, ... )

with_payload

with_payload(**kwargs: Any) -> Send

Return new Send with additional payload data.

Source code in src/locus/core/send.py
def with_payload(self, **kwargs: Any) -> Send:
    """Return new Send with additional payload data."""
    new_payload = {**self.payload, **kwargs}
    return self.model_copy(update={"payload": new_payload})

with_metadata

with_metadata(**kwargs: Any) -> Send

Return new Send with additional metadata.

Source code in src/locus/core/send.py
def with_metadata(self, **kwargs: Any) -> Send:
    """Return new Send with additional metadata."""
    new_metadata = {**self.metadata, **kwargs}
    return self.model_copy(update={"metadata": new_metadata})

SendBatch

Bases: BaseModel

A batch of Send operations to execute together.

Used internally by the graph executor to group sends that should be processed in parallel.

Attributes:

Name Type Description
sends list[Send]

List of Send operations

source_node str

Node that generated these sends

aggregator_node str | None

Optional node to receive all results

target_nodes property

target_nodes: list[str]

Get unique target nodes.

count property

count: int

Number of sends in batch.

group_by_node

group_by_node() -> dict[str, list[Send]]

Group sends by target node.

Source code in src/locus/core/send.py
def group_by_node(self) -> dict[str, list[Send]]:
    """Group sends by target node."""
    groups: dict[str, list[Send]] = {}
    for send in self.sends:
        if send.node not in groups:
            groups[send.node] = []
        groups[send.node].append(send)
    return groups

SendResult

Bases: BaseModel

Result from a Send operation.

Tracks the outcome of a parallel node execution spawned by Send.

Attributes:

Name Type Description
send_id str

ID of the original Send

node str

Target node that was executed

success bool

Whether execution succeeded

result Any

Output from the node

error str | None

Error message if failed

duration_ms float | None

Execution time in milliseconds

send

send(node: str, **payload: Any) -> Send

Create a Send to a target node.

Parameters:

Name Type Description Default
node str

Target node ID

required
**payload Any

Data to pass to the node

{}

Returns:

Type Description
Send

Send instance

Example

send("worker", task="process", data=[1, 2, 3])

Source code in src/locus/core/send.py
def send(node: str, **payload: Any) -> Send:
    """
    Create a Send to a target node.

    Args:
        node: Target node ID
        **payload: Data to pass to the node

    Returns:
        Send instance

    Example:
        >>> send("worker", task="process", data=[1, 2, 3])
    """
    return Send(node=node, payload=payload)

broadcast

broadcast(nodes: list[str], payload: dict[str, Any] | None = None) -> list[Send]

Create Sends to multiple nodes with same payload.

Parameters:

Name Type Description Default
nodes list[str]

List of target node IDs

required
payload dict[str, Any] | None

Shared payload for all nodes

None

Returns:

Type Description
list[Send]

List of Send instances

Example

broadcast(["worker1", "worker2", "worker3"], {"task": data})

Source code in src/locus/core/send.py
def broadcast(nodes: list[str], payload: dict[str, Any] | None = None) -> list[Send]:
    """
    Create Sends to multiple nodes with same payload.

    Args:
        nodes: List of target node IDs
        payload: Shared payload for all nodes

    Returns:
        List of Send instances

    Example:
        >>> broadcast(["worker1", "worker2", "worker3"], {"task": data})
    """
    payload = payload or {}
    return [Send(node=node, payload=payload) for node in nodes]

scatter

scatter(node: str, items: list[Any], key: str = 'item', include_index: bool = True) -> list[Send]

Scatter items to same node with different payloads.

Parameters:

Name Type Description Default
node str

Target node ID

required
items list[Any]

Items to distribute

required
key str

Key name for each item in payload

'item'
include_index bool

Whether to include index in payload

True

Returns:

Type Description
list[Send]

List of Send instances

Example

scatter("processor", [data1, data2, data3]) [Send(node="processor", payload={"item": data1, "index": 0}), ...]

Source code in src/locus/core/send.py
def scatter(
    node: str,
    items: list[Any],
    key: str = "item",
    include_index: bool = True,
) -> list[Send]:
    """
    Scatter items to same node with different payloads.

    Args:
        node: Target node ID
        items: Items to distribute
        key: Key name for each item in payload
        include_index: Whether to include index in payload

    Returns:
        List of Send instances

    Example:
        >>> scatter("processor", [data1, data2, data3])
        [Send(node="processor", payload={"item": data1, "index": 0}), ...]
    """
    sends = []
    for i, item in enumerate(items):
        payload: dict[str, Any] = {key: item}
        if include_index:
            payload["index"] = i
            payload["total"] = len(items)
        sends.append(Send(node=node, payload=payload))
    return sends

is_send

is_send(value: Any) -> bool

Check if a value is a Send instance.

Source code in src/locus/core/send.py
def is_send(value: Any) -> bool:
    """Check if a value is a Send instance."""
    return isinstance(value, Send)

is_send_list

is_send_list(value: Any) -> bool

Check if a value is a list of Send instances.

Source code in src/locus/core/send.py
def is_send_list(value: Any) -> bool:
    """Check if a value is a list of Send instances."""
    return isinstance(value, list) and all(isinstance(v, Send) for v in value)

normalize_sends

normalize_sends(value: Any) -> list[Send] | None

Normalize output to list of Sends if applicable.

Parameters:

Name Type Description Default
value Any

Node output

required

Returns:

Type Description
list[Send] | None

List of Send objects, or None if not send output

Example

normalize_sends(Send("node", {})) [Send(node="node", ...)] normalize_sends([Send("a", {}), Send("b", {})]) [Send(node="a", ...), Send(node="b", ...)] normalize_sends({"result": 1}) None

Source code in src/locus/core/send.py
def normalize_sends(value: Any) -> list[Send] | None:
    """
    Normalize output to list of Sends if applicable.

    Args:
        value: Node output

    Returns:
        List of Send objects, or None if not send output

    Example:
        >>> normalize_sends(Send("node", {}))
        [Send(node="node", ...)]
        >>> normalize_sends([Send("a", {}), Send("b", {})])
        [Send(node="a", ...), Send(node="b", ...)]
        >>> normalize_sends({"result": 1})
        None
    """
    if isinstance(value, Send):
        return [value]
    if is_send_list(value):
        # ``is_send_list`` is a custom TypeGuard-style check; mypy can't
        # propagate the narrowing back to ``value`` here, so the return
        # would be Any otherwise.
        return value  # type: ignore[no-any-return]
    return None

extract_send_results

extract_send_results(results: list[SendResult], key: str = 'result') -> dict[str, Any]

Extract results from Send operations into a dict.

Parameters:

Name Type Description Default
results list[SendResult]

List of SendResult objects

required
key str

Key to extract from each result

'result'

Returns:

Type Description
dict[str, Any]

Dict mapping send_id to extracted value

Example

results = [SendResult(send_id="s1", result={"data": 1}, ...)] extract_send_results(results) {"s1": {"data": 1}}

Source code in src/locus/core/send.py
def extract_send_results(
    results: list[SendResult],
    key: str = "result",
) -> dict[str, Any]:
    """
    Extract results from Send operations into a dict.

    Args:
        results: List of SendResult objects
        key: Key to extract from each result

    Returns:
        Dict mapping send_id to extracted value

    Example:
        >>> results = [SendResult(send_id="s1", result={"data": 1}, ...)]
        >>> extract_send_results(results)
        {"s1": {"data": 1}}
    """
    return {r.send_id: r.result for r in results if r.success}

aggregate_send_results

aggregate_send_results(results: list[SendResult], reducer: Any = None) -> Any

Aggregate results from Send operations.

Parameters:

Name Type Description Default
results list[SendResult]

List of SendResult objects

required
reducer Any

Optional reducer function (current, update) -> merged

None

Returns:

Type Description
Any

Aggregated result

Example - Collect as list

aggregate_send_results(results) [result1, result2, result3]

Example - With reducer

aggregate_send_results(results, reducer=lambda a, b: {a, b})

Source code in src/locus/core/send.py
def aggregate_send_results(
    results: list[SendResult],
    reducer: Any = None,
) -> Any:
    """
    Aggregate results from Send operations.

    Args:
        results: List of SendResult objects
        reducer: Optional reducer function (current, update) -> merged

    Returns:
        Aggregated result

    Example - Collect as list:
        >>> aggregate_send_results(results)
        [result1, result2, result3]

    Example - With reducer:
        >>> aggregate_send_results(results, reducer=lambda a, b: {**a, **b})
        {combined_dict}
    """
    successful = [r.result for r in results if r.success]

    if reducer is None:
        return successful

    if not successful:
        return None

    result = successful[0]
    for item in successful[1:]:
        result = reducer(result, item)
    return result

Reducers

Reducers combine state updates produced by parallel branches. Pick the built-in that matches your field's semantics (add_messages for tuple[Message, ...], merge_dict for dict, max_value / min_value / last_value for scalars).

Reducer

Bases: Protocol[T]

Protocol for state reducers.

A reducer takes the current value and an update value, returning the new merged value.

__call__

__call__(current: T, update: T) -> T

Merge current value with update.

Source code in src/locus/core/reducers.py
def __call__(self, current: T, update: T) -> T:
    """Merge current value with update."""
    ...

reducer

reducer(fn: Callable[[T, T], T]) -> Reducer[T]

Decorator to create a reducer from a function.

Source code in src/locus/core/reducers.py
def reducer(fn: Callable[[T, T], T]) -> Reducer[T]:
    """Decorator to create a reducer from a function."""
    return create_reducer(fn)

get_reducer

get_reducer(annotation: Any) -> Reducer[Any] | None

Extract reducer from an Annotated type hint.

Parameters:

Name Type Description Default
annotation Any

Type annotation, possibly Annotated[T, reducer]

required

Returns:

Type Description
Reducer[Any] | None

Reducer if found in annotation, None otherwise

Example

from typing import Annotated hint = Annotated[list, add_messages] reducer = get_reducer(hint) reducer([msg1], [msg2]) # Returns merged list

Source code in src/locus/core/reducers.py
def get_reducer(annotation: Any) -> Reducer[Any] | None:
    """
    Extract reducer from an Annotated type hint.

    Args:
        annotation: Type annotation, possibly Annotated[T, reducer]

    Returns:
        Reducer if found in annotation, None otherwise

    Example:
        >>> from typing import Annotated
        >>> hint = Annotated[list, add_messages]
        >>> reducer = get_reducer(hint)
        >>> reducer([msg1], [msg2])  # Returns merged list
    """
    # Check if it's an Annotated type
    if get_origin(annotation) is not None:
        # For Python 3.9+ Annotated types
        try:
            from typing import Annotated
            from typing import get_origin as get_origin_typing

            if get_origin_typing(annotation) is Annotated:
                args = get_args(annotation)
                if len(args) >= 2:
                    # Second arg should be the reducer. ``get_args`` returns
                    # ``tuple[Any, ...]`` so the callable narrowing isn't
                    # visible to the type checker.
                    potential_reducer = args[1]
                    if callable(potential_reducer):
                        return potential_reducer  # type: ignore[no-any-return]
        except ImportError:
            pass

    return None

apply_reducers

apply_reducers(current: dict[str, Any], update: dict[str, Any], reducers: dict[str, Reducer[Any]]) -> dict[str, Any]

Apply reducers to merge current state with update.

Parameters:

Name Type Description Default
current dict[str, Any]

Current state dict

required
update dict[str, Any]

Update to apply

required
reducers dict[str, Reducer[Any]]

Map of field names to reducers

required

Returns:

Type Description
dict[str, Any]

New merged state dict

Example

reducers = {"messages": add_messages} current = {"messages": [msg1], "count": 1} update = {"messages": [msg2], "count": 5} result = apply_reducers(current, update, reducers)

result["messages"] is [msg1, msg2] (reduced)

result["count"] is 5 (last-write-wins)

Source code in src/locus/core/reducers.py
def apply_reducers(
    current: dict[str, Any],
    update: dict[str, Any],
    reducers: dict[str, Reducer[Any]],
) -> dict[str, Any]:
    """
    Apply reducers to merge current state with update.

    Args:
        current: Current state dict
        update: Update to apply
        reducers: Map of field names to reducers

    Returns:
        New merged state dict

    Example:
        >>> reducers = {"messages": add_messages}
        >>> current = {"messages": [msg1], "count": 1}
        >>> update = {"messages": [msg2], "count": 5}
        >>> result = apply_reducers(current, update, reducers)
        >>> # result["messages"] is [msg1, msg2] (reduced)
        >>> # result["count"] is 5 (last-write-wins)
    """
    result = dict(current)

    for key, value in update.items():
        if key in reducers and key in current:
            # Apply reducer
            result[key] = reducers[key](current[key], value)
        else:
            # Last-write-wins (default)
            result[key] = value

    return result

Built-in reducers

add_messages module-attribute

add_messages = AddMessages()

merge_dict module-attribute

merge_dict = MergeDict()

deep_merge_dict module-attribute

deep_merge_dict = DeepMergeDict()

append_list module-attribute

append_list = AppendList()

unique_append_list module-attribute

unique_append_list = UniqueAppendList()

add_numbers module-attribute

add_numbers = AddNumbers()

max_value module-attribute

max_value = MaxValue()

min_value module-attribute

min_value = MinValue()

last_value module-attribute

last_value = LastValue()

first_value module-attribute

first_value = FirstValue()

set_union module-attribute

set_union = SetUnion()