Core primitives¶
The shared types every other module is built on. Most consumers don't
import from locus.core directly — the re-exports at the package root
(from locus import Message, etc.) are the supported entry points —
but the typed protocols, error classes, and control-flow helpers below
are the foundation underneath every API.
Configuration¶
Protocols¶
The duck-typed contracts that providers, tools, and checkpointers implement. Use these for type hints when accepting "any model" / "any checkpointer" / "any tool" without binding to a concrete class.
ModelProtocol ¶
Bases: Protocol
Protocol for LLM providers.
Implementations must support both completion and streaming.
complete
async
¶
complete(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> ModelResponse
Complete a chat request.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[Message]
|
Conversation history |
required |
tools
|
list[dict[str, Any]] | None
|
Tool schemas in OpenAI format |
None
|
**kwargs
|
Any
|
Provider-specific options |
{}
|
Returns:
| Type | Description |
|---|---|
ModelResponse
|
Model response with message and metadata |
Source code in src/locus/core/protocols.py
stream ¶
stream(messages: list[Message], tools: list[dict[str, Any]] | None = None, **kwargs: Any) -> AsyncIterator[ModelChunkEvent]
Stream a chat response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[Message]
|
Conversation history |
required |
tools
|
list[dict[str, Any]] | None
|
Tool schemas in OpenAI format |
None
|
**kwargs
|
Any
|
Provider-specific options |
{}
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[ModelChunkEvent]
|
Streaming chunks with content and/or tool calls |
Source code in src/locus/core/protocols.py
ToolProtocol ¶
Bases: Protocol
Protocol for tools that can be called by agents.
Tools must have a name, description, parameters schema, and be callable.
execute
async
¶
Execute the tool with given arguments.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Tool arguments matching the parameters schema |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Tool result (will be converted to string for LLM) |
Source code in src/locus/core/protocols.py
CheckpointerProtocol ¶
Bases: Protocol
Protocol for state persistence.
Implementations handle saving and loading agent state. Extended methods are optional based on capabilities.
capabilities
property
¶
Return the capabilities of this checkpointer.
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Unique identifier for the conversation thread |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying/filtering |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID that can be used to restore |
Source code in src/locus/core/protocols.py
load
async
¶
Load agent state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored state or None if not found |
Source code in src/locus/core/protocols.py
list_checkpoints
async
¶
List available checkpoints for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/core/protocols.py
Messages¶
The wire format the agent loop emits to model providers and receives back.
Message ¶
Bases: BaseModel
A message in the conversation.
system
classmethod
¶
user
classmethod
¶
assistant
classmethod
¶
Create an assistant message.
Source code in src/locus/core/messages.py
tool
classmethod
¶
Create a tool result message.
to_openai_format ¶
Convert to OpenAI API format.
Source code in src/locus/core/messages.py
Role ¶
Bases: StrEnum
Message role in conversation.
ToolCall ¶
Bases: BaseModel
A tool call requested by the model.
to_openai_format ¶
Convert to OpenAI API format.
ToolResult ¶
Bases: BaseModel
Result from a tool execution.
Errors¶
LocusError is the common base — every other exception below is a
subclass. Catch LocusError when you want one handler for the whole
SDK; catch the specific subclass when you need to recover differently
for, e.g., a checkpoint miss vs. a model throttle.
LocusError ¶
Bases: Exception
Base class for every exception raised by Locus.
The kind class attribute is a short stable identifier suitable
for structured logging and metrics. Subclasses override it.
Source code in src/locus/core/errors.py
ConfigError ¶
Bases: LocusError
Configuration is invalid or missing a required value.
Source code in src/locus/core/errors.py
ValidationError ¶
Bases: LocusError
Caller passed invalid or inconsistent input at a public API boundary.
Source code in src/locus/core/errors.py
Model errors¶
ModelError ¶
Bases: LocusError
Base for LLM provider failures.
Instances may optionally carry a :class:~locus.models.failover.FailoverReason
that tells retry / credential-rotation / compaction layers what to do,
independent of the exception class. See
:func:locus.models.failover.classify for the classifier that produces
the reason from an arbitrary provider SDK exception.
Source code in src/locus/core/errors.py
ModelAuthError ¶
Bases: ModelError
Authentication / authorization against the model provider failed.
Source code in src/locus/core/errors.py
ModelResponseError ¶
Bases: ModelError
Provider returned an unusable response (malformed JSON, empty, refused).
Source code in src/locus/core/errors.py
ModelThrottledError ¶
Bases: ModelError
Provider is rate-limiting or refusing capacity.
Source code in src/locus/core/errors.py
Tool errors¶
ToolError ¶
Bases: LocusError
Base for tool-related failures (registration, schema, execution).
Source code in src/locus/core/errors.py
ToolExecutionError ¶
ToolNotFoundError ¶
ToolValidationError ¶
Checkpoint errors¶
CheckpointError ¶
Bases: LocusError
Base for checkpointer / storage-backend failures.
Source code in src/locus/core/errors.py
CheckpointNotFoundError ¶
Bases: CheckpointError
Requested thread / checkpoint does not exist.
Source code in src/locus/core/errors.py
CheckpointSerializationError ¶
Bases: CheckpointError
Saving or loading failed during (de)serialization.
Source code in src/locus/core/errors.py
RAG errors¶
RAGError ¶
Bases: LocusError
Base for RAG-subsystem failures (embeddings, vector stores, retrieval).
Source code in src/locus/core/errors.py
EmbeddingError ¶
VectorStoreError ¶
Control flow¶
Command and friends are the return values graph / loop nodes use to
steer execution: goto("node"), end(value), resume_with(payload).
The agent loop also uses interrupt() for human-in-the-loop pauses.
Command¶
Command ¶
Bases: BaseModel
Control flow command for graph execution.
A Command can: 1. Update state with new values 2. Direct execution to specific node(s) 3. Resume from an interrupt with a value
This enables complex control flow patterns like: - Dynamic routing based on state - Branching to multiple parallel nodes - Returning from interrupts with data
Attributes:
| Name | Type | Description |
|---|---|---|
update |
dict[str, Any]
|
State updates to apply (merged with reducers if defined) |
goto |
str | list[str] | None
|
Target node(s) to execute next. Can be: - str: Single node ID - list[str]: Multiple nodes (parallel execution) - None: Continue with normal graph flow |
resume |
Any
|
Value to pass back when resuming from interrupt |
graph |
str | None
|
For subgraph commands, which graph to route to |
Example - Simple routing
Command(goto="next_node")
Example - State update with routing
Command(update={"processed": True, "result": data}, goto="output_node")
Example - Parallel fan-out
Command(goto=["worker_1", "worker_2", "worker_3"])
Example - Resume from interrupt
Command(resume="approved")
End ¶
Continue ¶
Bases: Command
Special command indicating normal flow continuation.
Useful when you want to update state but continue with default routing logic.
Example
async def process_node(inputs): result = process(inputs) return Continue(update={"processed": result})
goto ¶
Create a Command that routes to target node(s).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
target
|
str | list[str]
|
Node ID or list of node IDs |
required |
**updates
|
Any
|
Optional state updates |
{}
|
Returns:
| Type | Description |
|---|---|
Command
|
Command with goto and optional updates |
Example
goto("next_node") goto(["worker_1", "worker_2"], task_id=123)
Source code in src/locus/core/command.py
end ¶
Create an End command to terminate graph.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**updates
|
Any
|
Final state updates |
{}
|
Returns:
| Type | Description |
|---|---|
End
|
End command |
Example
end(result="success", data=processed_data)
Source code in src/locus/core/command.py
resume_with ¶
Create a Command to resume from interrupt.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
Value to pass to interrupted node |
required |
**updates
|
Any
|
Optional state updates |
{}
|
Returns:
| Type | Description |
|---|---|
Command
|
Command with resume value |
Example
resume_with("approved") resume_with({"action": "modify", "changes": data})
Source code in src/locus/core/command.py
is_command ¶
normalize_node_output ¶
Normalize node output to (state_update, command).
Nodes can return: - dict: Treated as state update, no routing - Command: Extract update and routing - None: No update, no routing - Other: Wrapped as {"result": value}
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output
|
Any
|
Raw output from node execution |
required |
Returns:
| Type | Description |
|---|---|
tuple[dict[str, Any], Command | None]
|
Tuple of (state_update_dict, optional_command) |
Source code in src/locus/core/command.py
Interrupt (HITL)¶
interrupt ¶
Pause graph execution and wait for human input.
When called, this function: 1. If resuming: Returns the resume value immediately 2. If not resuming: Raises InterruptException to pause execution
The graph executor catches the exception, saves state, and returns the interrupt to the caller. When the caller provides a response and resumes, this function returns that response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
payload
|
Any
|
Data to present to the human. Can be: - str: Simple message/question - dict: Structured data (action details, options, etc.) - Any serializable value |
required |
**metadata
|
Any
|
Additional context (not displayed, for tracking) |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
The value passed when resuming (via Command(resume=...)) |
Raises:
| Type | Description |
|---|---|
InterruptException
|
When not resuming, to pause execution |
Example - Simple approval
approval = interrupt("Approve this action?") if approval == "yes": ... execute_action()
Example - Structured data
response = interrupt( ... { ... "type": "confirmation", ... "action": "delete_account", ... "account_id": "12345", ... "options": ["confirm", "cancel", "modify"], ... } ... )
Source code in src/locus/core/interrupt.py
InterruptException ¶
Bases: Exception
Exception raised to pause graph execution for human input.
This is caught by the graph executor, which saves the current state and returns the interrupt value to the caller.
The caller can then present the interrupt to a human and resume execution with their response.
Source code in src/locus/core/interrupt.py
InterruptValue ¶
Bases: BaseModel
Value captured when an interrupt occurs.
Contains all information needed to display to the user and resume execution later.
Attributes:
| Name | Type | Description |
|---|---|---|
interrupt_id |
str
|
Unique identifier for this interrupt |
payload |
Any
|
Data to present to the human (question, context, etc.) |
node_id |
str | None
|
ID of the node that raised the interrupt |
graph_id |
str | None
|
ID of the graph being executed |
created_at |
datetime
|
When the interrupt occurred |
metadata |
dict[str, Any]
|
Additional context for the interrupt |
to_display ¶
Convert to a display-friendly format.
Source code in src/locus/core/interrupt.py
InterruptState ¶
Bases: BaseModel
State saved when graph is interrupted.
This is stored in the checkpoint to enable resumption.
Attributes:
| Name | Type | Description |
|---|---|---|
interrupt |
InterruptValue
|
The interrupt value that paused execution |
node_id |
str
|
Node to resume from |
pending_nodes |
list[str]
|
Nodes that were scheduled but not yet executed |
partial_results |
dict[str, Any]
|
Results from nodes completed before interrupt |
GraphInterrupted ¶
Bases: Exception
Raised when graph execution is paused for human input.
Contains all information needed to resume execution.
Source code in src/locus/core/interrupt.py
InterruptHandler ¶
Base class for handling interrupts.
Subclass this to create custom interrupt handlers (e.g., CLI prompts, web callbacks, message queues).
Example
class CLIInterruptHandler(InterruptHandler): async def handle(self, interrupt: InterruptValue) -> Any: print(f"Interrupt: {interrupt.payload}") return input("Your response: ")
handle
async
¶
Handle an interrupt and return the response.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interrupt
|
InterruptValue
|
The interrupt value to handle |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Response value to pass back to the interrupted node |
Source code in src/locus/core/interrupt.py
AutoApproveHandler ¶
Bases: InterruptHandler
Interrupt handler that auto-approves everything.
Useful for testing and automated pipelines.
Source code in src/locus/core/interrupt.py
handle
async
¶
Send (map-reduce / fan-out)¶
Send lets a single node dispatch multiple child invocations and
gather their results. broadcast / scatter are convenience builders.
Send ¶
Bases: BaseModel
Directive to send data to a specific node for parallel execution.
When a node returns a list of Send objects, the graph executor spawns parallel executions of the target nodes with the given payloads.
Attributes:
| Name | Type | Description |
|---|---|---|
node |
str
|
Target node ID to execute |
payload |
dict[str, Any]
|
Data to pass to the target node |
send_id |
str
|
Unique identifier for this send (for result tracking) |
metadata |
dict[str, Any]
|
Additional context for the send operation |
Example - Simple send
Send("worker", {"task": "process_data"})
Example - With tracking
Send( ... node="analyzer", ... payload={"data": chunk}, ... metadata={"chunk_index": 0, "total_chunks": 10}, ... )
SendBatch ¶
Bases: BaseModel
A batch of Send operations to execute together.
Used internally by the graph executor to group sends that should be processed in parallel.
Attributes:
| Name | Type | Description |
|---|---|---|
sends |
list[Send]
|
List of Send operations |
source_node |
str
|
Node that generated these sends |
aggregator_node |
str | None
|
Optional node to receive all results |
group_by_node ¶
Group sends by target node.
Source code in src/locus/core/send.py
SendResult ¶
Bases: BaseModel
Result from a Send operation.
Tracks the outcome of a parallel node execution spawned by Send.
Attributes:
| Name | Type | Description |
|---|---|---|
send_id |
str
|
ID of the original Send |
node |
str
|
Target node that was executed |
success |
bool
|
Whether execution succeeded |
result |
Any
|
Output from the node |
error |
str | None
|
Error message if failed |
duration_ms |
float | None
|
Execution time in milliseconds |
send ¶
Create a Send to a target node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node
|
str
|
Target node ID |
required |
**payload
|
Any
|
Data to pass to the node |
{}
|
Returns:
| Type | Description |
|---|---|
Send
|
Send instance |
Example
send("worker", task="process", data=[1, 2, 3])
Source code in src/locus/core/send.py
broadcast ¶
Create Sends to multiple nodes with same payload.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodes
|
list[str]
|
List of target node IDs |
required |
payload
|
dict[str, Any] | None
|
Shared payload for all nodes |
None
|
Returns:
| Type | Description |
|---|---|
list[Send]
|
List of Send instances |
Example
broadcast(["worker1", "worker2", "worker3"], {"task": data})
Source code in src/locus/core/send.py
scatter ¶
Scatter items to same node with different payloads.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node
|
str
|
Target node ID |
required |
items
|
list[Any]
|
Items to distribute |
required |
key
|
str
|
Key name for each item in payload |
'item'
|
include_index
|
bool
|
Whether to include index in payload |
True
|
Returns:
| Type | Description |
|---|---|
list[Send]
|
List of Send instances |
Example
scatter("processor", [data1, data2, data3]) [Send(node="processor", payload={"item": data1, "index": 0}), ...]
Source code in src/locus/core/send.py
is_send ¶
is_send_list ¶
normalize_sends ¶
Normalize output to list of Sends if applicable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
Node output |
required |
Returns:
| Type | Description |
|---|---|
list[Send] | None
|
List of Send objects, or None if not send output |
Example
normalize_sends(Send("node", {})) [Send(node="node", ...)] normalize_sends([Send("a", {}), Send("b", {})]) [Send(node="a", ...), Send(node="b", ...)] normalize_sends({"result": 1}) None
Source code in src/locus/core/send.py
extract_send_results ¶
Extract results from Send operations into a dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
results
|
list[SendResult]
|
List of SendResult objects |
required |
key
|
str
|
Key to extract from each result |
'result'
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict mapping send_id to extracted value |
Example
results = [SendResult(send_id="s1", result={"data": 1}, ...)] extract_send_results(results) {"s1": {"data": 1}}
Source code in src/locus/core/send.py
aggregate_send_results ¶
Aggregate results from Send operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
results
|
list[SendResult]
|
List of SendResult objects |
required |
reducer
|
Any
|
Optional reducer function (current, update) -> merged |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Aggregated result |
Example - Collect as list
aggregate_send_results(results) [result1, result2, result3]
Example - With reducer
aggregate_send_results(results, reducer=lambda a, b: {a, b})
Source code in src/locus/core/send.py
Reducers¶
Reducers combine state updates produced by parallel branches. Pick the
built-in that matches your field's semantics (add_messages for
tuple[Message, ...], merge_dict for dict, max_value /
min_value / last_value for scalars).
Reducer ¶
Bases: Protocol[T]
Protocol for state reducers.
A reducer takes the current value and an update value, returning the new merged value.
reducer ¶
get_reducer ¶
Extract reducer from an Annotated type hint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
annotation
|
Any
|
Type annotation, possibly Annotated[T, reducer] |
required |
Returns:
| Type | Description |
|---|---|
Reducer[Any] | None
|
Reducer if found in annotation, None otherwise |
Example
from typing import Annotated hint = Annotated[list, add_messages] reducer = get_reducer(hint) reducer([msg1], [msg2]) # Returns merged list
Source code in src/locus/core/reducers.py
apply_reducers ¶
apply_reducers(current: dict[str, Any], update: dict[str, Any], reducers: dict[str, Reducer[Any]]) -> dict[str, Any]
Apply reducers to merge current state with update.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current
|
dict[str, Any]
|
Current state dict |
required |
update
|
dict[str, Any]
|
Update to apply |
required |
reducers
|
dict[str, Reducer[Any]]
|
Map of field names to reducers |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
New merged state dict |
Example
reducers = {"messages": add_messages} current = {"messages": [msg1], "count": 1} update = {"messages": [msg2], "count": 5} result = apply_reducers(current, update, reducers)
result["messages"] is [msg1, msg2] (reduced)¶
result["count"] is 5 (last-write-wins)¶