Checkpointers¶
Contract¶
BaseCheckpointer ¶
Bases: ABC
Abstract base class for checkpointer implementations.
Checkpointers handle saving and loading agent state, enabling features like: - Conversation persistence - Session recovery - Branching conversations - State inspection and debugging - Full-text search (backend-dependent) - Metadata queries (backend-dependent)
All methods are async to support various backends (file, database, network storage, etc.).
Use the capabilities property to check which features are available
before calling extended methods.
Example
if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling") if checkpointer.capabilities.branching: ... await checkpointer.copy_thread("main", "experiment")
capabilities
property
¶
Return the capabilities of this checkpointer.
Override in subclasses to advertise supported features.
save
abstractmethod
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state to persist |
required |
thread_id
|
str
|
Unique identifier for the conversation thread |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID. If not provided, a new ID will be generated. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying/filtering checkpoints |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID that can be used to restore this state |
Source code in src/locus/memory/checkpointer.py
load
abstractmethod
async
¶
Load agent state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier to load from |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID. If not provided, loads the latest checkpoint. |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/checkpointer.py
list_checkpoints
abstractmethod
async
¶
List available checkpoints for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number of checkpoint IDs to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/checkpointer.py
delete
async
¶
Delete a checkpoint or all checkpoints for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete. If None, deletes all checkpoints for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/checkpointer.py
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_threads
async
¶
List all thread IDs.
Requires: capabilities.list_threads = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter threads (backend-specific) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
CheckpointerCapabilities
dataclass
¶
CheckpointerCapabilities(search: bool = False, metadata_query: bool = False, vacuum: bool = False, branching: bool = False, ttl: bool = False, list_threads: bool = False, list_with_metadata: bool = False, persistent_checkpoint_ids: bool = False)
Capabilities supported by a checkpointer.
Use this to discover what features a checkpointer supports before calling optional methods.
Example
if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling")
Built-in backends¶
MemoryCheckpointer ¶
Bases: BaseCheckpointer
In-memory checkpointer for testing and development.
Stores all checkpoints in a dictionary. Data is not persistent and will be lost when the process terminates.
Useful for: - Unit and integration testing - Development and prototyping - Short-lived agent sessions - As a fast caching layer
Capabilities: - list_threads: Yes - persistent_checkpoint_ids: Yes (within process lifetime)
Example
Source code in src/locus/memory/backends/memory.py
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state to memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for the checkpoint |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID for the saved state |
Source code in src/locus/memory/backends/memory.py
load
async
¶
Load agent state from memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint ID (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/backends/memory.py
list_checkpoints
async
¶
List available checkpoints for a thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/backends/memory.py
delete
async
¶
Delete checkpoint(s) from memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete (all if None) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/backends/memory.py
clear ¶
get_thread_ids ¶
list_threads
async
¶
List all thread IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter (supports * as wildcard) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/backends/memory.py
get_checkpoint_count ¶
Get count of stored checkpoints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str | None
|
Specific thread (all threads if None) |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Number of checkpoints |
Source code in src/locus/memory/backends/memory.py
FileCheckpointer ¶
Bases: BaseCheckpointer
File-based checkpointer for persistent local storage.
Stores each checkpoint as a JSON file, organized by thread ID. Provides durable storage that survives process restarts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_dir
|
str | Path
|
Base directory for checkpoint storage. Defaults to ".locus_checkpoints" in current directory. |
'.locus_checkpoints'
|
pretty
|
bool
|
Whether to format JSON for readability (default True) |
True
|
Example
Source code in src/locus/memory/backends/file.py
capabilities
property
¶
Return the capabilities of this checkpointer.
Override in subclasses to advertise supported features.
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_threads
async
¶
List all thread IDs.
Requires: capabilities.list_threads = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter threads (backend-specific) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state to a JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying/filtering checkpoints |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID for the saved state |
Source code in src/locus/memory/backends/file.py
load
async
¶
Load agent state from a JSON file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint ID (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/backends/file.py
list_checkpoints
async
¶
List available checkpoints for a thread.
Reads checkpoint files and returns IDs sorted by creation time (newest first).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/backends/file.py
delete
async
¶
Delete checkpoint file(s).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete (all if None) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/backends/file.py
get_storage_path ¶
get_disk_usage
async
¶
Get total disk usage in bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str | None
|
Specific thread (all threads if None) |
None
|
Returns:
| Type | Description |
|---|---|
int
|
Total size in bytes |
Source code in src/locus/memory/backends/file.py
HTTPCheckpointer ¶
HTTPCheckpointer(base_url: str, headers: dict[str, str] | None = None, auth: tuple[str, str] | None = None, timeout: float = 30.0)
Bases: BaseCheckpointer
HTTP API-based checkpointer for remote storage.
Stores checkpoints via HTTP API calls, suitable for distributed systems or cloud-based storage backends.
The API is expected to implement the following endpoints: - POST /threads/{thread_id}/checkpoints - Create checkpoint - GET /threads/{thread_id}/checkpoints/{checkpoint_id} - Get checkpoint - GET /threads/{thread_id}/checkpoints - List checkpoints - DELETE /threads/{thread_id}/checkpoints/{checkpoint_id} - Delete checkpoint
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
str
|
Base URL of the checkpoint API |
required |
headers
|
dict[str, str] | None
|
Additional headers to include in requests |
None
|
auth
|
tuple[str, str] | None
|
Authentication tuple (username, password) for basic auth |
None
|
timeout
|
float
|
Request timeout in seconds |
30.0
|
Example
Source code in src/locus/memory/backends/http.py
capabilities
property
¶
Return the capabilities of this checkpointer.
Override in subclasses to advertise supported features.
exists
async
¶
Check if a checkpoint exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to check. If None, checks if any checkpoint exists for the thread. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the checkpoint exists |
Source code in src/locus/memory/checkpointer.py
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
get_metadata
async
¶
Get checkpoint metadata.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Metadata dict or None if not found |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete old checkpoints.
Requires: capabilities.vacuum = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted checkpoints |
Source code in src/locus/memory/checkpointer.py
copy_thread
async
¶
Copy a thread to create a branch.
Requires: capabilities.branching = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_thread_id
|
str
|
Source thread to copy from |
required |
dest_thread_id
|
str
|
Destination thread ID |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if successful |
Source code in src/locus/memory/checkpointer.py
list_threads
async
¶
List all thread IDs.
Requires: capabilities.list_threads = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum threads to return |
100
|
pattern
|
str
|
Pattern to filter threads (backend-specific) |
'*'
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of thread IDs |
Source code in src/locus/memory/checkpointer.py
list_with_metadata
async
¶
List checkpoints with their metadata.
Requires: capabilities.list_with_metadata = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of {thread_id, checkpoint_id, metadata, ...} dicts |
Source code in src/locus/memory/checkpointer.py
close
async
¶
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state via HTTP POST.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying/filtering checkpoints |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID for the saved state |
Raises:
| Type | Description |
|---|---|
HTTPError
|
If the request fails |
Source code in src/locus/memory/backends/http.py
load
async
¶
Load agent state via HTTP GET.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint ID (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/backends/http.py
list_checkpoints
async
¶
List available checkpoints via HTTP GET.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/backends/http.py
delete
async
¶
Delete checkpoint(s) via HTTP DELETE.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint to delete (all if None) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if deletion was successful |
Source code in src/locus/memory/backends/http.py
health_check
async
¶
Check if the API is reachable.
Returns:
| Type | Description |
|---|---|
bool
|
True if the API responds successfully |
Source code in src/locus/memory/backends/http.py
__aenter__
async
¶
OCIBucketBackend ¶
OCIBucketBackend(bucket_name: str, namespace: str, prefix: str = 'locus/checkpoints/', profile_name: str = 'DEFAULT', auth_type: str = 'api_key', region: str | None = None, retry_strategy: Any = None, **kwargs: Any)
Bases: BaseCheckpointer
OCI Object Storage-backed checkpointer.
Durable, per-checkpoint storage with lifecycle-policy support. Pass the
instance directly to :class:~locus.agent.Agent — no adapter needed.
Example::
checkpointer = OCIBucketBackend(
bucket_name="my-checkpoints",
namespace="yzhbfkqxqsx9",
profile_name="API_KEY_AUTH",
)
agent = Agent(config=cfg, checkpointer=checkpointer)
With an OCI compute instance principal::
checkpointer = OCIBucketBackend(
bucket_name="my-checkpoints",
namespace="yzhbfkqxqsx9",
auth_type="instance_principal",
)
Capabilities
list_threads— yes (via object prefix delimiter listing)list_with_metadata— yesmetadata_query— yes (viaget_metadata)branching— yes (viacopy_thread)vacuum— yes (prefer bucket lifecycle policies for prod)persistent_checkpoint_ids— yes
Source code in src/locus/memory/backends/oci_bucket.py
close
async
¶
search
async
¶
Full-text search across checkpoints.
Requires: capabilities.search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If backend doesn't support search |
Source code in src/locus/memory/checkpointer.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Requires: capabilities.metadata_query = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata field name |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints |
Source code in src/locus/memory/checkpointer.py
vacuum
async
¶
Delete threads whose latest checkpoint is older than the cutoff.
For production, prefer an OCI Object Storage lifecycle rule — it runs server-side and costs nothing in client CPU.
Source code in src/locus/memory/backends/oci_bucket.py
copy_thread
async
¶
Copy every checkpoint under source to dest (for branching).
Source code in src/locus/memory/backends/oci_bucket.py
SQLiteBackend ¶
Bases: BaseModel
SQLite checkpoint backend.
Persistent local storage using async SQLite.
Example
backend = SQLiteBackend(path="./checkpoints.db") await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")
Source code in src/locus/memory/backends/sqlite.py
save
async
¶
Save checkpoint to SQLite.
Source code in src/locus/memory/backends/sqlite.py
load
async
¶
Load checkpoint from SQLite.
Source code in src/locus/memory/backends/sqlite.py
delete
async
¶
Delete checkpoint from SQLite.
Source code in src/locus/memory/backends/sqlite.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/sqlite.py
list_threads
async
¶
List all thread IDs matching pattern.
Source code in src/locus/memory/backends/sqlite.py
get_metadata
async
¶
Get checkpoint metadata (created_at, updated_at).
Source code in src/locus/memory/backends/sqlite.py
RedisBackend ¶
Bases: BaseModel
Redis checkpoint backend.
Fast key-value storage with optional TTL for checkpoints.
Example
backend = RedisBackend(url="redis://localhost:6379") await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")
Source code in src/locus/memory/backends/redis.py
save
async
¶
Save checkpoint to Redis.
Source code in src/locus/memory/backends/redis.py
load
async
¶
Load checkpoint from Redis.
Source code in src/locus/memory/backends/redis.py
delete
async
¶
exists
async
¶
Check if checkpoint exists.
list_threads
async
¶
List all thread IDs matching pattern.
Source code in src/locus/memory/backends/redis.py
PostgreSQLBackend ¶
PostgreSQLBackend(host: str = 'localhost', port: int = 5432, database: str = 'locus', user: str = 'postgres', password: str | SecretStr = '', dsn: str | None = None, **kwargs: Any)
Bases: BaseModel
PostgreSQL checkpoint backend.
Production-grade persistent storage with ACID guarantees.
Features: - Connection pooling - Transaction support - JSON/JSONB storage - Indexing for fast lookups - Concurrent access safe
Example
backend = PostgreSQLBackend( ... host="localhost", ... database="myapp", ... user="postgres", ... password="secret", ... ) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")
With DSN
backend = PostgreSQLBackend(dsn="postgresql://user:pass@localhost:5432/mydb")
Source code in src/locus/memory/backends/postgresql.py
save
async
¶
save(thread_id: str, data: dict[str, Any], checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save checkpoint to PostgreSQL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
data
|
dict[str, Any]
|
Checkpoint data |
required |
checkpoint_id
|
str | None
|
Optional checkpoint ID |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID |
Source code in src/locus/memory/backends/postgresql.py
load
async
¶
Load checkpoint from PostgreSQL.
Source code in src/locus/memory/backends/postgresql.py
delete
async
¶
Delete checkpoint from PostgreSQL.
Source code in src/locus/memory/backends/postgresql.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/postgresql.py
list_threads
async
¶
List all thread IDs matching pattern.
Source code in src/locus/memory/backends/postgresql.py
get_metadata
async
¶
Get checkpoint metadata.
Source code in src/locus/memory/backends/postgresql.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Uses PostgreSQL JSONB operators for efficient querying.
Source code in src/locus/memory/backends/postgresql.py
search_data
async
¶
Search checkpoints by data field using JSON path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
JSON path (e.g., "messages", "confidence") |
required |
value
|
Any
|
Value to match |
required |
limit
|
int
|
Maximum results |
100
|
Example
results = await backend.search_data("agent_id", "agent-123")
Source code in src/locus/memory/backends/postgresql.py
count
async
¶
Count checkpoints matching pattern.
Source code in src/locus/memory/backends/postgresql.py
vacuum
async
¶
Delete old checkpoints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted rows |
Source code in src/locus/memory/backends/postgresql.py
OpenSearchBackend ¶
OpenSearchBackend(hosts: list[str] | None = None, index_name: str = 'locus-checkpoints', username: str | None = None, password: str | None = None, **kwargs: Any)
Bases: BaseModel
OpenSearch checkpoint backend.
Scalable document storage with full-text search capabilities.
Example
backend = OpenSearchBackend(hosts=["localhost:9200"]) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1") results = await backend.search("user query")
Source code in src/locus/memory/backends/opensearch.py
save
async
¶
Save checkpoint to OpenSearch.
Source code in src/locus/memory/backends/opensearch.py
load
async
¶
Load checkpoint from OpenSearch.
Source code in src/locus/memory/backends/opensearch.py
delete
async
¶
Delete checkpoint from OpenSearch.
Source code in src/locus/memory/backends/opensearch.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/opensearch.py
list_threads
async
¶
List all thread IDs.
Source code in src/locus/memory/backends/opensearch.py
search
async
¶
Search checkpoints by content.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
Search query |
required |
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of matching checkpoints with scores |
Source code in src/locus/memory/backends/opensearch.py
get_by_metadata
async
¶
Get checkpoints by metadata field.
Source code in src/locus/memory/backends/opensearch.py
OracleBackend ¶
OracleBackend(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, **kwargs: Any)
Bases: BaseModel
Oracle Database checkpoint backend.
Production-grade persistent storage with JSON support and full-text search.
Features: - Connection pooling - JSON column storage with search - Metadata indexing - Vacuum (cleanup old checkpoints) - Works with Autonomous Database (wallet-based auth)
Example with DSN
backend = OracleBackend( ... dsn="mydb_high", # TNS name from tnsnames.ora ... user="admin", ... password="secret", ... wallet_location="/path/to/wallet", ... ) await backend.save("thread_1", state.model_dump())
Example with connection string
backend = OracleBackend( ... host="adb.us-ashburn-1.oraclecloud.com", ... port=1522, ... service_name="xxx_high.adb.oraclecloud.com", ... user="admin", ... password="secret", ... )
Source code in src/locus/memory/backends/oracle.py
save
async
¶
save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str
Save agent state to Oracle Database.
Implements :meth:BaseCheckpointer.save. Parameter order is
(state, thread_id, ...) to match the abstract — the prior
(thread_id, data, ...) signature silently mismatched the
agent runtime, which calls save(state, thread_id) and would
end up trying to bind an :class:AgentState to the
VARCHAR2(255) thread_id column.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state. Serialized via
:meth: |
required |
thread_id
|
str
|
Thread identifier (column primary key). |
required |
checkpoint_id
|
str | None
|
Optional checkpoint ID. Generated if omitted. |
None
|
metadata
|
dict[str, Any] | None
|
Optional metadata for querying. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID. |
Source code in src/locus/memory/backends/oracle.py
load
async
¶
Load agent state from Oracle Database.
checkpoint_id is accepted for :class:BaseCheckpointer
signature parity but ignored — this backend stores one row per
thread_id (MERGE upsert), so latest-state is the only
retrievable checkpoint.
Source code in src/locus/memory/backends/oracle.py
delete
async
¶
Delete checkpoint from Oracle Database.
Source code in src/locus/memory/backends/oracle.py
exists
async
¶
Check if checkpoint exists.
Source code in src/locus/memory/backends/oracle.py
list_threads
async
¶
List all thread IDs matching pattern.
Source code in src/locus/memory/backends/oracle.py
get_metadata
async
¶
Get checkpoint metadata.
Source code in src/locus/memory/backends/oracle.py
query_by_metadata
async
¶
Query checkpoints by metadata field.
Uses Oracle JSON path expressions.
Source code in src/locus/memory/backends/oracle.py
search
async
¶
Search checkpoints by content.
Uses Oracle JSON_TEXTCONTAINS for full-text search within JSON.
Source code in src/locus/memory/backends/oracle.py
vacuum
async
¶
Delete old checkpoints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
older_than_days
|
int
|
Delete checkpoints older than this |
30
|
Returns:
| Type | Description |
|---|---|
int
|
Number of deleted rows |
Source code in src/locus/memory/backends/oracle.py
count
async
¶
Count checkpoints matching pattern.