Memory¶
Three orthogonal layers, each with its own contract:
- Conversation management — what to do with the message history within a single run (keep, window, summarize) when it grows past the model's context window.
- Cross-thread store — durable key-value scoped by namespace, used for long-term memory entries the agent reads at session start and writes at session end.
- Long-term memory manager — the policy layer that decides what
to extract from a run and when to retrieve, sitting on top of any
BaseStorebackend.
For checkpointing (state persistence between runs), see
Checkpointers — those backends are also in
locus.memory.backends, with Oracle ADB as the production target.
Conversation management¶
ConversationManager ¶
Bases: ABC
Base class for conversation management strategies.
Conversation managers handle how message history is maintained, including trimming, summarization, and other memory management.
apply
abstractmethod
¶
async_apply
async
¶
Async version of apply. Supports async summarization functions.
Default implementation delegates to the synchronous apply(). Override in subclasses that need async operations (e.g., LLM summarization).
Source code in src/locus/memory/conversation.py
NullManager ¶
Bases: ConversationManager
Null conversation manager - keeps all messages unchanged.
Use this when you want to preserve the entire conversation history without any modifications. Suitable for short conversations or when full history is required.
apply ¶
async_apply
async
¶
Async version of apply. Supports async summarization functions.
Default implementation delegates to the synchronous apply(). Override in subclasses that need async operations (e.g., LLM summarization).
Source code in src/locus/memory/conversation.py
SlidingWindowManager ¶
Bases: ConversationManager
Sliding window conversation manager - keeps last N messages.
Preserves the system message (if present) and the last N messages from the conversation. This is a simple and effective strategy for managing conversation length.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
window_size
|
int
|
Maximum number of messages to keep (excluding system message) |
20
|
preserve_system
|
bool
|
Whether to preserve the system message at the start |
True
|
Source code in src/locus/memory/conversation.py
apply ¶
Apply sliding window to messages.
Keeps the system message (if preserve_system is True) and the last window_size messages.
Source code in src/locus/memory/conversation.py
async_apply
async
¶
Async version of apply. Supports async summarization functions.
Default implementation delegates to the synchronous apply(). Override in subclasses that need async operations (e.g., LLM summarization).
Source code in src/locus/memory/conversation.py
SummarizingManager ¶
SummarizingManager(threshold: int = 30, keep_recent: int = 10, summarize_fn: Any | None = None, preserve_system: bool = True)
Bases: ConversationManager
Summarizing conversation manager - summarizes older messages.
When the conversation exceeds a threshold, older messages are summarized into a single summary message, preserving recent context.
This manager requires a summarization function to be provided, which can use an LLM to generate summaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
threshold
|
int
|
Number of messages before summarization kicks in |
30
|
keep_recent
|
int
|
Number of recent messages to always preserve |
10
|
summarize_fn
|
Any | None
|
Async function that summarizes a list of messages |
None
|
preserve_system
|
bool
|
Whether to preserve the system message |
True
|
Source code in src/locus/memory/conversation.py
apply ¶
Apply summarization to messages.
If total messages exceed threshold, older messages are summarized. Note: If no summarize_fn is provided, falls back to a simple summary.
Source code in src/locus/memory/conversation.py
async_apply
async
¶
Async apply with LLM summarization support.
If summarize_fn is an async function, it will be properly awaited. Falls back to synchronous _generate_summary() otherwise.
Source code in src/locus/memory/conversation.py
Cross-thread store¶
The durable KV the long-term memory manager writes to. InMemoryStore
is for tests; production stores live in locus.memory.backends (see
Checkpointers for Oracle / OCI Object Storage /
OpenSearch / Postgres / Redis backends, which all implement
BaseStore as well as BaseCheckpointer).
BaseStore ¶
Bases: ABC
Abstract base class for store implementations.
Provides common functionality and defines the interface.
capabilities
property
¶
Return capabilities. Override in subclasses.
put
abstractmethod
async
¶
get
abstractmethod
async
¶
delete
abstractmethod
async
¶
list_keys
abstractmethod
async
¶
exists
async
¶
get_item
async
¶
Get full item with metadata.
Source code in src/locus/memory/store.py
search
async
¶
Search for items in namespace.
Optional method — backends advertise support via
capabilities.search. Subclasses that support search must
override this method; the default raises
:class:StoreCapabilityError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Namespace to search in |
required |
query
|
str | None
|
Search query (implementation-specific) |
None
|
limit
|
int
|
Maximum results |
10
|
Returns:
| Type | Description |
|---|---|
list[StoreItem]
|
List of matching items |
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support search. |
Source code in src/locus/memory/store.py
put_with_embedding
async
¶
put_with_embedding(namespace: tuple[str, ...], key: str, value: Any, embedding: list[float], metadata: dict[str, Any] | None = None) -> None
Store a value with its embedding vector for semantic search.
Optional method — backends advertise support via
capabilities.semantic_search. Subclasses that support
semantic search must override this method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Hierarchical namespace tuple |
required |
key
|
str
|
Key within the namespace |
required |
value
|
Any
|
Value to store (must be JSON-serializable) |
required |
embedding
|
list[float]
|
Vector embedding (e.g., from OpenAI, Cohere, etc.) |
required |
metadata
|
dict[str, Any] | None
|
Optional metadata for filtering |
None
|
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support semantic search. |
Example
Get embedding from your provider¶
embedding = await embedder.embed("User prefers dark theme")
await store.put_with_embedding( ("users", user_id, "memories"), "theme_preference", {"theme": "dark", "reason": "easier on eyes"}, embedding=embedding, metadata={"category": "preferences"} )
Source code in src/locus/memory/store.py
search_by_embedding
async
¶
search_by_embedding(namespace: tuple[str, ...], query_embedding: list[float], limit: int = 10, threshold: float | None = None, metadata_filter: dict[str, Any] | None = None) -> list[SemanticSearchResult]
Search for similar items using vector similarity.
Optional method — see capabilities.semantic_search.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Namespace to search in |
required |
query_embedding
|
list[float]
|
Vector to compare against stored embeddings |
required |
limit
|
int
|
Maximum results to return |
10
|
threshold
|
float | None
|
Minimum similarity score (0.0-1.0), or None for no threshold |
None
|
metadata_filter
|
dict[str, Any] | None
|
Optional metadata constraints |
None
|
Returns:
| Type | Description |
|---|---|
list[SemanticSearchResult]
|
List of SemanticSearchResult, sorted by similarity (highest first) |
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support semantic search. |
Source code in src/locus/memory/store.py
get_embedding
async
¶
Get the embedding vector for a stored item.
Optional method — see capabilities.semantic_search.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Hierarchical namespace tuple |
required |
key
|
str
|
Key within the namespace |
required |
Returns:
| Type | Description |
|---|---|
list[float] | None
|
Embedding vector or None if not found / no embedding stored. |
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support semantic search. |
Source code in src/locus/memory/store.py
put_batch
async
¶
Store multiple items.
Backends that advertise capabilities.batch_operations should
override this method to dispatch atomically. The default falls
back to sequential put() calls so callers always get
functional batching, just without atomicity guarantees.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
items
|
list[tuple[tuple[str, ...], str, Any, dict[str, Any] | None]]
|
List of (namespace, key, value, metadata) tuples |
required |
Source code in src/locus/memory/store.py
get_batch
async
¶
Retrieve multiple items.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[tuple[tuple[str, ...], str]]
|
List of (namespace, key) tuples |
required |
Returns:
| Type | Description |
|---|---|
dict[tuple[tuple[str, ...], str], Any]
|
Dict mapping (namespace, key) to value |
Source code in src/locus/memory/store.py
list_namespaces
async
¶
List all namespaces.
Optional method — see capabilities.list_namespaces.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
tuple[str, ...] | None
|
Optional prefix to filter namespaces |
None
|
limit
|
int
|
Maximum namespaces to return |
100
|
Returns:
| Type | Description |
|---|---|
list[tuple[str, ...]]
|
List of namespace tuples |
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support namespace listing. |
Source code in src/locus/memory/store.py
clear_namespace
async
¶
Delete all items in a namespace.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Namespace to clear |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of items deleted |
Source code in src/locus/memory/store.py
InMemoryStore ¶
Bases: BaseStore
In-memory store implementation.
Fast but not persistent - data is lost when process exits. Useful for testing and development.
Source code in src/locus/memory/store.py
exists
async
¶
put_with_embedding
async
¶
put_with_embedding(namespace: tuple[str, ...], key: str, value: Any, embedding: list[float], metadata: dict[str, Any] | None = None) -> None
Store a value with its embedding vector for semantic search.
Optional method — backends advertise support via
capabilities.semantic_search. Subclasses that support
semantic search must override this method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Hierarchical namespace tuple |
required |
key
|
str
|
Key within the namespace |
required |
value
|
Any
|
Value to store (must be JSON-serializable) |
required |
embedding
|
list[float]
|
Vector embedding (e.g., from OpenAI, Cohere, etc.) |
required |
metadata
|
dict[str, Any] | None
|
Optional metadata for filtering |
None
|
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support semantic search. |
Example
Get embedding from your provider¶
embedding = await embedder.embed("User prefers dark theme")
await store.put_with_embedding( ("users", user_id, "memories"), "theme_preference", {"theme": "dark", "reason": "easier on eyes"}, embedding=embedding, metadata={"category": "preferences"} )
Source code in src/locus/memory/store.py
search_by_embedding
async
¶
search_by_embedding(namespace: tuple[str, ...], query_embedding: list[float], limit: int = 10, threshold: float | None = None, metadata_filter: dict[str, Any] | None = None) -> list[SemanticSearchResult]
Search for similar items using vector similarity.
Optional method — see capabilities.semantic_search.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Namespace to search in |
required |
query_embedding
|
list[float]
|
Vector to compare against stored embeddings |
required |
limit
|
int
|
Maximum results to return |
10
|
threshold
|
float | None
|
Minimum similarity score (0.0-1.0), or None for no threshold |
None
|
metadata_filter
|
dict[str, Any] | None
|
Optional metadata constraints |
None
|
Returns:
| Type | Description |
|---|---|
list[SemanticSearchResult]
|
List of SemanticSearchResult, sorted by similarity (highest first) |
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support semantic search. |
Source code in src/locus/memory/store.py
get_embedding
async
¶
Get the embedding vector for a stored item.
Optional method — see capabilities.semantic_search.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Hierarchical namespace tuple |
required |
key
|
str
|
Key within the namespace |
required |
Returns:
| Type | Description |
|---|---|
list[float] | None
|
Embedding vector or None if not found / no embedding stored. |
Raises:
| Type | Description |
|---|---|
StoreCapabilityError
|
If the backend does not support semantic search. |
Source code in src/locus/memory/store.py
get_batch
async
¶
Retrieve multiple items.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
keys
|
list[tuple[tuple[str, ...], str]]
|
List of (namespace, key) tuples |
required |
Returns:
| Type | Description |
|---|---|
dict[tuple[tuple[str, ...], str], Any]
|
Dict mapping (namespace, key) to value |
Source code in src/locus/memory/store.py
clear_namespace
async
¶
Delete all items in a namespace.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
namespace
|
tuple[str, ...]
|
Namespace to clear |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of items deleted |
Source code in src/locus/memory/store.py
NamespacedStore ¶
Store wrapper with a fixed namespace prefix.
Makes it easier to work with a specific scope.
Example
user_store = NamespacedStore(store, ("users", user_id)) await user_store.put("preferences", {"theme": "dark"}) prefs = await user_store.get("preferences")
Source code in src/locus/memory/store.py
scoped ¶
StoreContext ¶
Context object passed to nodes for store access.
Provides convenient methods for common memory operations.
Example
async def my_node(inputs, *, store: StoreContext): # Get user memory user_prefs = await store.get_user_memory("preferences")
# Remember something
await store.remember(
"discussed_topic",
{"topic": "python", "timestamp": now}
)
# Search memories
related = await store.search_memories("python")
Source code in src/locus/memory/store.py
for_user ¶
Get namespaced store for a user.
If user_id is not provided, uses the user_id from the context.
Source code in src/locus/memory/store.py
for_session ¶
Get namespaced store for a session.
If session_id is not provided, uses the session_id from the context.
Source code in src/locus/memory/store.py
get_user_memory
async
¶
remember
async
¶
Store a memory for the current user.
Source code in src/locus/memory/store.py
forget
async
¶
search_memories
async
¶
Search user memories (full-text).
Source code in src/locus/memory/store.py
remember_with_embedding
async
¶
remember_with_embedding(key: str, value: Any, embedding: list[float], metadata: dict[str, Any] | None = None) -> None
Store a memory with its embedding for semantic search.
Requires: capabilities.semantic_search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Memory key |
required |
value
|
Any
|
Value to store |
required |
embedding
|
list[float]
|
Vector embedding for semantic search |
required |
metadata
|
dict[str, Any] | None
|
Optional metadata |
None
|
Example
embedding = await embedder.embed("User likes dark theme") await store.remember_with_embedding( "theme_pref", {"theme": "dark"}, embedding=embedding, )
Source code in src/locus/memory/store.py
search_memories_semantic
async
¶
search_memories_semantic(query_embedding: list[float], limit: int = 10, threshold: float | None = None) -> list[SemanticSearchResult]
Search user memories by semantic similarity.
Requires: capabilities.semantic_search = True
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query_embedding
|
list[float]
|
Vector to search with |
required |
limit
|
int
|
Maximum results |
10
|
threshold
|
float | None
|
Minimum similarity score (0.0-1.0) |
None
|
Returns:
| Type | Description |
|---|---|
list[SemanticSearchResult]
|
List of SemanticSearchResult sorted by similarity |
Example
query_vec = await embedder.embed("user preferences") results = await store.search_memories_semantic( query_embedding=query_vec, limit=5, threshold=0.7, ) for r in results: print(f"{r.item.key}: {r.score:.2f} similar")
Source code in src/locus/memory/store.py
get_global
async
¶
set_global
async
¶
StoreItem
dataclass
¶
StoreItem(namespace: tuple[str, ...], key: str, value: Any, metadata: dict[str, Any], created_at: datetime, updated_at: datetime, version: int = 1)
An item stored in the store.
Attributes:
| Name | Type | Description |
|---|---|---|
namespace |
tuple[str, ...]
|
The namespace tuple |
key |
str
|
The key within namespace |
value |
Any
|
The stored value |
metadata |
dict[str, Any]
|
Optional metadata |
created_at |
datetime
|
Creation timestamp |
updated_at |
datetime
|
Last update timestamp |
version |
int
|
Version counter for optimistic locking |
to_dict ¶
Convert to dictionary.
Source code in src/locus/memory/store.py
from_dict
classmethod
¶
Create from dictionary.
Source code in src/locus/memory/store.py
StoreCapabilities
dataclass
¶
StoreCapabilities(search: bool = False, semantic_search: bool = False, embedding_dimension: int | None = None, ttl: bool = False, list_namespaces: bool = False, batch_operations: bool = False, transactions: bool = False)
Capabilities supported by a store implementation.
Use this to discover what features a store supports.
StoreCapabilityError ¶
Bases: NotImplementedError
Raised when a store operation is requested that the backend does not support.
Carries structured context (capability, store_class, hint)
so callers can branch on the capability or surface a clean message.
Subclasses :class:NotImplementedError so legacy except
NotImplementedError blocks continue to catch it — this is strictly
a richer payload, not a behaviour change.
Example::
try:
results = await store.search(("users", uid), query="...")
except StoreCapabilityError as exc:
log.info(
"store %s does not support %s; falling back",
exc.store_class,
exc.capability,
)
results = await store.list_keys(("users", uid))
Source code in src/locus/memory/store.py
SemanticSearchResult
dataclass
¶
Result from semantic (vector similarity) search.
Attributes:
| Name | Type | Description |
|---|---|---|
item |
StoreItem
|
The matching store item |
score |
float
|
Similarity score (0.0 to 1.0, higher is more similar) |
distance |
float | None
|
Raw distance metric (interpretation depends on distance type) |
Long-term memory manager¶
LLMMemoryManager is the default — it uses an auxiliary model to
extract and categorize memories at session end and retrieves the
top-k relevant entries at session start. NoopMemoryManager is the
pass-through used in tests.
BaseMemoryManager ¶
Bases: ABC
Abstract base for long-term memory managers.
Subclasses implement :meth:extract to decide what is worth
remembering; the base class handles retrieval, injection, and the
session-start / session-end lifecycle hooks that the agent calls
automatically.
Two concrete implementations are provided:
- :class:
NoopMemoryManager— no-op, useful for testing. - :class:
LLMMemoryManager— persists to any :class:BaseStorebackend; accepts an optional LLM-backed extraction function.
extract
abstractmethod
async
¶
Extract durable memories from a finished conversation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
messages
|
list[Message]
|
The full message history for the completed session. |
required |
Returns:
| Type | Description |
|---|---|
list[Memory]
|
List of :class: |
Source code in src/locus/memory/manager.py
retrieve
abstractmethod
async
¶
Retrieve stored memories for injection at session start.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of memories to return. |
20
|
Returns:
| Type | Description |
|---|---|
list[Memory]
|
List of :class: |
Source code in src/locus/memory/manager.py
save
abstractmethod
async
¶
Persist a list of memories to the backing store.
Implementations should upsert by key — re-extracting the same fact updates the record rather than creating a duplicate.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
memories
|
list[Memory]
|
Memories to persist. |
required |
Source code in src/locus/memory/manager.py
on_session_start
async
¶
Retrieve memories and inject them into the agent state.
Called by the agent runtime at the start of every invocation,
after on_before_invocation hooks but before the first model
call. The default implementation retrieves all stored memories
and prepends a formatted system message to state.messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state (just-created or loaded from checkpointer). |
required |
Returns:
| Type | Description |
|---|---|
AgentState
|
Possibly-modified state with memory context injected. |
Source code in src/locus/memory/manager.py
on_session_end
async
¶
Extract memories from the finished session and save them.
Called by the agent runtime in the finally block of every
invocation, after on_after_invocation hooks but before the
final checkpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Final agent state with the complete message history. |
required |
Source code in src/locus/memory/manager.py
LLMMemoryManager ¶
LLMMemoryManager(store: BaseStore, *, extract_fn: ExtractFn | None = None, namespace_prefix: tuple[str, ...] = ('locus_memory',), max_memories: int = 50, retrieve_limit: int = 20)
Bases: BaseMemoryManager
Memory manager backed by any :class:~locus.memory.store.BaseStore.
Extraction uses either a caller-supplied async function
(extract_fn) or a built-in heuristic that pattern-matches
common conversational signals (corrections, confirmations, role
disclosures). Supply extract_fn for production use; the
heuristic is adequate for demos and tests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store
|
BaseStore
|
Any :class: |
required |
extract_fn
|
ExtractFn | None
|
Optional async callable
|
None
|
namespace_prefix
|
tuple[str, ...]
|
Store namespace prefix. Scope per user or
tenant by passing |
('locus_memory',)
|
max_memories
|
int
|
Hard cap on the total number of memories kept per type. Oldest entries are pruned when the limit is reached. |
50
|
retrieve_limit
|
int
|
Maximum memories returned by :meth: |
20
|
Example::
from locus.memory.manager import LLMMemoryManager
from locus.memory.backends import OracleBackend
manager = LLMMemoryManager(
store=OracleBackend(dsn="mydb_high", user="admin", password="secret"),
extract_fn=my_llm_extractor,
namespace_prefix=("users", user_id),
)
agent = Agent(model="oci:openai.gpt-5", memory_manager=manager)
Source code in src/locus/memory/manager.py
extract
async
¶
Extract memories from a message list.
Uses extract_fn when provided; otherwise applies the
built-in heuristic.
Source code in src/locus/memory/manager.py
retrieve
async
¶
Retrieve all stored memories across every type.
Returns memories sorted by most-recently-updated first, up to
limit entries total.
Source code in src/locus/memory/manager.py
save
async
¶
Upsert memories into the backing store.
Memories with the same key and type overwrite the
previous entry — no duplicates accumulate.
Source code in src/locus/memory/manager.py
on_session_start
async
¶
Retrieve memories and inject them into the agent state.
Called by the agent runtime at the start of every invocation,
after on_before_invocation hooks but before the first model
call. The default implementation retrieves all stored memories
and prepends a formatted system message to state.messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state (just-created or loaded from checkpointer). |
required |
Returns:
| Type | Description |
|---|---|
AgentState
|
Possibly-modified state with memory context injected. |
Source code in src/locus/memory/manager.py
on_session_end
async
¶
Extract memories from the finished session and save them.
Called by the agent runtime in the finally block of every
invocation, after on_after_invocation hooks but before the
final checkpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Final agent state with the complete message history. |
required |
Source code in src/locus/memory/manager.py
NoopMemoryManager ¶
Bases: BaseMemoryManager
Pass-through memory manager — stores and retrieves nothing.
Useful as a test double or as a placeholder when you want the
agent-wiring (memory_manager= kwarg) without actual persistence.
Memory
dataclass
¶
A single durable memory entry.
Attributes:
| Name | Type | Description |
|---|---|---|
type |
MemoryType
|
Semantic category (user / feedback / project / reference). |
key |
str
|
Stable logical name within the category. The store uses this as the key, so re-extracting the same fact under the same key updates the record rather than creating a duplicate. |
content |
str
|
The actual fact or rule, as a human-readable string. |
metadata |
dict[str, Any]
|
Arbitrary extra fields — |
MemoryType ¶
Bases: StrEnum
Semantic category for a stored memory.
Delta checkpointing¶
Storage-efficient checkpointer that persists only the diff between
consecutive states (~77% storage savings on long conversations).
Layered on top of any DeltaStorage backend.
DeltaCheckpointer ¶
DeltaCheckpointer(storage: DeltaStorage | None = None, max_chain_depth: int = 5, compression_level: int = 6)
Delta-based checkpointer for efficient state persistence.
This checkpointer stores only the differences between states, achieving significant storage savings. It maintains a chain of deltas pointing back to a full checkpoint.
Features: - Delta compression: Only store changes from parent state - Chain depth limiting: Force full checkpoint after N deltas - zlib compression: Additional compression of stored data - ~77% storage reduction in typical usage
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
storage
|
DeltaStorage | None
|
Storage backend for checkpoints |
None
|
max_chain_depth
|
int
|
Maximum number of deltas before forcing full checkpoint |
5
|
compression_level
|
int
|
zlib compression level (0-9, higher = better compression) |
6
|
Source code in src/locus/memory/delta.py
save
async
¶
Save agent state with delta compression.
If a parent checkpoint exists and chain depth allows, only the delta from parent is stored. Otherwise, a full checkpoint is created.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
AgentState
|
Current agent state |
required |
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Optional specific checkpoint ID |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Checkpoint ID for the saved state |
Source code in src/locus/memory/delta.py
load
async
¶
Load agent state from checkpoint.
If loading a delta checkpoint, reconstructs full state by walking the delta chain back to a full checkpoint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
checkpoint_id
|
str | None
|
Specific checkpoint ID (latest if None) |
None
|
Returns:
| Type | Description |
|---|---|
AgentState | None
|
Restored AgentState or None if not found |
Source code in src/locus/memory/delta.py
list_checkpoints
async
¶
List available checkpoint IDs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
thread_id
|
str
|
Thread identifier |
required |
limit
|
int
|
Maximum number to return |
10
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of checkpoint IDs, newest first |
Source code in src/locus/memory/delta.py
get_metadata
async
¶
Get metadata for a specific checkpoint.
Source code in src/locus/memory/delta.py
delete
async
¶
Delete checkpoint(s).
Source code in src/locus/memory/delta.py
get_storage_stats
async
¶
Get storage statistics for a thread.
Returns dict with: - total_checkpoints: Number of checkpoints - total_size: Uncompressed size - compressed_size: Compressed size - compression_ratio: Overall compression ratio - full_checkpoints: Number of full checkpoints - delta_checkpoints: Number of delta checkpoints
Source code in src/locus/memory/delta.py
DeltaCheckpoint
dataclass
¶
A delta checkpoint containing changes from parent.
compression_ratio
property
¶
Calculate compression ratio (uncompressed / compressed).
CheckpointMetadata
dataclass
¶
CheckpointMetadata(checkpoint_id: str, thread_id: str, parent_id: str | None = None, is_full: bool = True, chain_depth: int = 0, created_at: datetime = (lambda: datetime.now(UTC))(), size_bytes: int = 0, compressed_size_bytes: int = 0)
Metadata for a checkpoint.
DeltaStorage ¶
InMemoryDeltaStorage ¶
Bases: DeltaStorage
In-memory storage for delta checkpoints (for testing).
Source code in src/locus/memory/delta.py
Registry¶
String-based checkpointer lookup — used when configuration passes a
provider name (e.g. "oracle", "redis") instead of an instance.
Custom backends register themselves via register_checkpointer.
get_checkpointer ¶
Get a checkpointer from a string identifier.
Format: "provider" or "provider:config_hint"
The config_hint is provider-specific and is passed as a keyword argument.
Examples:
- "memory" -> MemoryCheckpointer
- "file:./checkpoints" -> FileCheckpointer(base_dir="./checkpoints")
- "redis:localhost:6379" -> RedisCheckpointer(url="redis://localhost:6379")
- "postgresql:mydb" -> PostgreSQLCheckpointer(database="mydb")
- "opensearch" -> OpenSearchCheckpointer()
- "oci:bucket/namespace" -> OCIBucketCheckpointer(bucket_name="bucket", namespace="namespace")
- "oracle:mydb" -> OracleCheckpointer(database="mydb")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
checkpointer_string
|
str
|
Checkpointer identifier |
required |
**kwargs
|
Any
|
Provider-specific configuration |
{}
|
Returns:
| Type | Description |
|---|---|
BaseCheckpointer
|
Checkpointer instance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If provider is unknown |
Source code in src/locus/memory/registry.py
register_checkpointer ¶
Register a checkpointer provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Provider name (e.g., "redis", "postgresql", "oracle") |
required |
factory
|
Callable[..., BaseCheckpointer]
|
Factory function that takes kwargs and returns a checkpointer |
required |
Example
def my_factory(kwargs) -> BaseCheckpointer: ... return MyCustomCheckpointer(kwargs) register_checkpointer("custom", my_factory)
Source code in src/locus/memory/registry.py
list_checkpointers ¶
List available checkpointer providers.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of registered provider names |