Skip to content

Memory

Three orthogonal layers, each with its own contract:

  1. Conversation management — what to do with the message history within a single run (keep, window, summarize) when it grows past the model's context window.
  2. Cross-thread store — durable key-value scoped by namespace, used for long-term memory entries the agent reads at session start and writes at session end.
  3. Long-term memory manager — the policy layer that decides what to extract from a run and when to retrieve, sitting on top of any BaseStore backend.

For checkpointing (state persistence between runs), see Checkpointers — those backends are also in locus.memory.backends, with Oracle ADB as the production target.

Conversation management

ConversationManager

Bases: ABC

Base class for conversation management strategies.

Conversation managers handle how message history is maintained, including trimming, summarization, and other memory management.

apply abstractmethod

apply(messages: list[Message]) -> list[Message]

Apply conversation management to messages.

Parameters:

Name Type Description Default
messages list[Message]

Full message history

required

Returns:

Type Description
list[Message]

Managed message list (potentially trimmed/summarized)

Source code in src/locus/memory/conversation.py
@abstractmethod
def apply(self, messages: list[Message]) -> list[Message]:
    """
    Apply conversation management to messages.

    Args:
        messages: Full message history

    Returns:
        Managed message list (potentially trimmed/summarized)
    """
    ...

async_apply async

async_apply(messages: list[Message]) -> list[Message]

Async version of apply. Supports async summarization functions.

Default implementation delegates to the synchronous apply(). Override in subclasses that need async operations (e.g., LLM summarization).

Source code in src/locus/memory/conversation.py
async def async_apply(self, messages: list[Message]) -> list[Message]:
    """
    Async version of apply. Supports async summarization functions.

    Default implementation delegates to the synchronous apply().
    Override in subclasses that need async operations (e.g., LLM summarization).
    """
    return self.apply(messages)

NullManager

Bases: ConversationManager

Null conversation manager - keeps all messages unchanged.

Use this when you want to preserve the entire conversation history without any modifications. Suitable for short conversations or when full history is required.

apply

apply(messages: list[Message]) -> list[Message]

Return messages unchanged.

Source code in src/locus/memory/conversation.py
def apply(self, messages: list[Message]) -> list[Message]:
    """Return messages unchanged."""
    return messages.copy()

async_apply async

async_apply(messages: list[Message]) -> list[Message]

Async version of apply. Supports async summarization functions.

Default implementation delegates to the synchronous apply(). Override in subclasses that need async operations (e.g., LLM summarization).

Source code in src/locus/memory/conversation.py
async def async_apply(self, messages: list[Message]) -> list[Message]:
    """
    Async version of apply. Supports async summarization functions.

    Default implementation delegates to the synchronous apply().
    Override in subclasses that need async operations (e.g., LLM summarization).
    """
    return self.apply(messages)

SlidingWindowManager

SlidingWindowManager(window_size: int = 20, preserve_system: bool = True)

Bases: ConversationManager

Sliding window conversation manager - keeps last N messages.

Preserves the system message (if present) and the last N messages from the conversation. This is a simple and effective strategy for managing conversation length.

Parameters:

Name Type Description Default
window_size int

Maximum number of messages to keep (excluding system message)

20
preserve_system bool

Whether to preserve the system message at the start

True
Source code in src/locus/memory/conversation.py
def __init__(self, window_size: int = 20, preserve_system: bool = True):
    if window_size < 1:
        raise ValueError("window_size must be at least 1")
    self.window_size = window_size
    self.preserve_system = preserve_system

apply

apply(messages: list[Message]) -> list[Message]

Apply sliding window to messages.

Keeps the system message (if preserve_system is True) and the last window_size messages.

Source code in src/locus/memory/conversation.py
def apply(self, messages: list[Message]) -> list[Message]:
    """
    Apply sliding window to messages.

    Keeps the system message (if preserve_system is True) and
    the last window_size messages.
    """
    if not messages:
        return []

    from locus.core.messages import Role

    result: list[Message] = []
    non_system_messages: list[Message] = []

    for msg in messages:
        if msg.role == Role.SYSTEM and self.preserve_system:
            result.append(msg)
        else:
            non_system_messages.append(msg)

    # Keep only the last window_size messages
    before_count = len(non_system_messages)
    if before_count > self.window_size:
        non_system_messages = non_system_messages[-self.window_size :]
        from locus.observability.emit import (  # noqa: PLC0415
            EV_MEMORY_CONVERSATION_PRUNED,
            emit_sync,
        )

        emit_sync(
            EV_MEMORY_CONVERSATION_PRUNED,
            strategy="sliding_window",
            window_size=self.window_size,
            removed_count=before_count - self.window_size,
        )

    result.extend(non_system_messages)
    return result

async_apply async

async_apply(messages: list[Message]) -> list[Message]

Async version of apply. Supports async summarization functions.

Default implementation delegates to the synchronous apply(). Override in subclasses that need async operations (e.g., LLM summarization).

Source code in src/locus/memory/conversation.py
async def async_apply(self, messages: list[Message]) -> list[Message]:
    """
    Async version of apply. Supports async summarization functions.

    Default implementation delegates to the synchronous apply().
    Override in subclasses that need async operations (e.g., LLM summarization).
    """
    return self.apply(messages)

SummarizingManager

SummarizingManager(threshold: int = 30, keep_recent: int = 10, summarize_fn: Any | None = None, preserve_system: bool = True)

Bases: ConversationManager

Summarizing conversation manager - summarizes older messages.

When the conversation exceeds a threshold, older messages are summarized into a single summary message, preserving recent context.

This manager requires a summarization function to be provided, which can use an LLM to generate summaries.

Parameters:

Name Type Description Default
threshold int

Number of messages before summarization kicks in

30
keep_recent int

Number of recent messages to always preserve

10
summarize_fn Any | None

Async function that summarizes a list of messages

None
preserve_system bool

Whether to preserve the system message

True
Source code in src/locus/memory/conversation.py
def __init__(
    self,
    threshold: int = 30,
    keep_recent: int = 10,
    summarize_fn: Any | None = None,
    preserve_system: bool = True,
):
    if threshold < 1:
        raise ValueError("threshold must be at least 1")
    if keep_recent < 1:
        raise ValueError("keep_recent must be at least 1")
    if keep_recent >= threshold:
        raise ValueError("keep_recent must be less than threshold")

    self.threshold = threshold
    self.keep_recent = keep_recent
    self.summarize_fn = summarize_fn
    self.preserve_system = preserve_system
    self._summary_cache: dict[int, str] = {}

apply

apply(messages: list[Message]) -> list[Message]

Apply summarization to messages.

If total messages exceed threshold, older messages are summarized. Note: If no summarize_fn is provided, falls back to a simple summary.

Source code in src/locus/memory/conversation.py
def apply(self, messages: list[Message]) -> list[Message]:
    """
    Apply summarization to messages.

    If total messages exceed threshold, older messages are summarized.
    Note: If no summarize_fn is provided, falls back to a simple summary.
    """
    if not messages:
        return []

    from locus.core.messages import Message as Msg
    from locus.core.messages import Role

    result: list[Msg] = []
    non_system_messages: list[Msg] = []
    system_message: Msg | None = None

    for msg in messages:
        if msg.role == Role.SYSTEM and self.preserve_system:
            system_message = msg
        else:
            non_system_messages.append(msg)

    # If under threshold, no summarization needed
    if len(non_system_messages) <= self.threshold:
        if system_message:
            result.append(system_message)
        result.extend(non_system_messages)
        return result

    # Split into messages to summarize and recent messages
    to_summarize = non_system_messages[: -self.keep_recent]
    recent = non_system_messages[-self.keep_recent :]

    # Generate summary
    summary_text = self._generate_summary(to_summarize)

    # Build result
    if system_message:
        result.append(system_message)

    # Add summary as a system message
    summary_message = Msg(
        role=Role.SYSTEM,
        content=f"[Summary of previous conversation ({len(to_summarize)} messages)]:\n{summary_text}",
    )
    result.append(summary_message)
    result.extend(recent)

    return result

async_apply async

async_apply(messages: list[Message]) -> list[Message]

Async apply with LLM summarization support.

If summarize_fn is an async function, it will be properly awaited. Falls back to synchronous _generate_summary() otherwise.

Source code in src/locus/memory/conversation.py
async def async_apply(self, messages: list[Message]) -> list[Message]:
    """
    Async apply with LLM summarization support.

    If summarize_fn is an async function, it will be properly awaited.
    Falls back to synchronous _generate_summary() otherwise.
    """
    if not messages:
        return []

    from locus.core.messages import Message as Msg
    from locus.core.messages import Role

    result: list[Msg] = []
    non_system_messages: list[Msg] = []
    system_message: Msg | None = None

    for msg in messages:
        if msg.role == Role.SYSTEM and self.preserve_system:
            system_message = msg
        else:
            non_system_messages.append(msg)

    if len(non_system_messages) <= self.threshold:
        if system_message:
            result.append(system_message)
        result.extend(non_system_messages)
        return result

    to_summarize = non_system_messages[: -self.keep_recent]
    recent = non_system_messages[-self.keep_recent :]

    # Use async summarize_fn if available
    import asyncio

    from locus.observability.emit import (  # noqa: PLC0415
        EV_MEMORY_COMPACTOR_COMPLETED,
        EV_MEMORY_COMPACTOR_TRIGGERED,
        emit,
    )

    await emit(
        EV_MEMORY_COMPACTOR_TRIGGERED,
        strategy="summarizing",
        messages_before=len(non_system_messages),
        threshold=self.threshold,
    )
    import time as _time  # noqa: PLC0415

    _started = _time.perf_counter()

    if self.summarize_fn is not None and asyncio.iscoroutinefunction(self.summarize_fn):
        summary_text = await self.summarize_fn(to_summarize)
    else:
        summary_text = self._generate_summary(to_summarize)

    if system_message:
        result.append(system_message)

    summary_message = Msg(
        role=Role.SYSTEM,
        content=f"[Summary of previous conversation ({len(to_summarize)} messages)]:\n{summary_text}",
    )
    result.append(summary_message)
    result.extend(recent)
    await emit(
        EV_MEMORY_COMPACTOR_COMPLETED,
        strategy="summarizing",
        messages_before=len(non_system_messages),
        messages_after=len(result),
        summarized_count=len(to_summarize),
        duration_ms=(_time.perf_counter() - _started) * 1000,
    )
    return result

Cross-thread store

The durable KV the long-term memory manager writes to. InMemoryStore is for tests; production stores live in locus.memory.backends (see Checkpointers for Oracle / OCI Object Storage / OpenSearch / Postgres / Redis backends, which all implement BaseStore as well as BaseCheckpointer).

BaseStore

Bases: ABC

Abstract base class for store implementations.

Provides common functionality and defines the interface.

capabilities property

capabilities: StoreCapabilities

Return capabilities. Override in subclasses.

put abstractmethod async

put(namespace: tuple[str, ...], key: str, value: Any, metadata: dict[str, Any] | None = None) -> None

Store a value.

Source code in src/locus/memory/store.py
@abstractmethod
async def put(
    self,
    namespace: tuple[str, ...],
    key: str,
    value: Any,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Store a value."""
    ...

get abstractmethod async

get(namespace: tuple[str, ...], key: str) -> Any | None

Retrieve a value.

Source code in src/locus/memory/store.py
@abstractmethod
async def get(
    self,
    namespace: tuple[str, ...],
    key: str,
) -> Any | None:
    """Retrieve a value."""
    ...

delete abstractmethod async

delete(namespace: tuple[str, ...], key: str) -> bool

Delete a value.

Source code in src/locus/memory/store.py
@abstractmethod
async def delete(
    self,
    namespace: tuple[str, ...],
    key: str,
) -> bool:
    """Delete a value."""
    ...

list_keys abstractmethod async

list_keys(namespace: tuple[str, ...], limit: int = 100) -> list[str]

List keys in namespace.

Source code in src/locus/memory/store.py
@abstractmethod
async def list_keys(
    self,
    namespace: tuple[str, ...],
    limit: int = 100,
) -> list[str]:
    """List keys in namespace."""
    ...

exists async

exists(namespace: tuple[str, ...], key: str) -> bool

Check if a key exists.

Source code in src/locus/memory/store.py
async def exists(
    self,
    namespace: tuple[str, ...],
    key: str,
) -> bool:
    """Check if a key exists."""
    value = await self.get(namespace, key)
    return value is not None

get_item async

get_item(namespace: tuple[str, ...], key: str) -> StoreItem | None

Get full item with metadata.

Source code in src/locus/memory/store.py
async def get_item(
    self,
    namespace: tuple[str, ...],
    key: str,
) -> StoreItem | None:
    """Get full item with metadata."""
    # Default implementation - subclasses can override
    value = await self.get(namespace, key)
    if value is None:
        return None
    now = datetime.now(UTC)
    return StoreItem(
        namespace=namespace,
        key=key,
        value=value,
        metadata={},
        created_at=now,
        updated_at=now,
    )

search async

search(namespace: tuple[str, ...], query: str | None = None, limit: int = 10) -> list[StoreItem]

Search for items in namespace.

Optional method — backends advertise support via capabilities.search. Subclasses that support search must override this method; the default raises :class:StoreCapabilityError.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Namespace to search in

required
query str | None

Search query (implementation-specific)

None
limit int

Maximum results

10

Returns:

Type Description
list[StoreItem]

List of matching items

Raises:

Type Description
StoreCapabilityError

If the backend does not support search.

Source code in src/locus/memory/store.py
async def search(
    self,
    namespace: tuple[str, ...],
    query: str | None = None,
    limit: int = 10,
) -> list[StoreItem]:
    """
    Search for items in namespace.

    Optional method — backends advertise support via
    ``capabilities.search``. Subclasses that support search must
    override this method; the default raises
    :class:`StoreCapabilityError`.

    Args:
        namespace: Namespace to search in
        query: Search query (implementation-specific)
        limit: Maximum results

    Returns:
        List of matching items

    Raises:
        StoreCapabilityError: If the backend does not support search.
    """
    raise StoreCapabilityError(
        capability="search",
        store_class=type(self).__name__,
        hint=(
            "Choose a backend that advertises capabilities.search "
            "(e.g., InMemoryStore, OpenSearch, Oracle 26ai)."
        ),
    )

put_with_embedding async

put_with_embedding(namespace: tuple[str, ...], key: str, value: Any, embedding: list[float], metadata: dict[str, Any] | None = None) -> None

Store a value with its embedding vector for semantic search.

Optional method — backends advertise support via capabilities.semantic_search. Subclasses that support semantic search must override this method.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Hierarchical namespace tuple

required
key str

Key within the namespace

required
value Any

Value to store (must be JSON-serializable)

required
embedding list[float]

Vector embedding (e.g., from OpenAI, Cohere, etc.)

required
metadata dict[str, Any] | None

Optional metadata for filtering

None

Raises:

Type Description
StoreCapabilityError

If the backend does not support semantic search.

Example

Get embedding from your provider

embedding = await embedder.embed("User prefers dark theme")

await store.put_with_embedding( ("users", user_id, "memories"), "theme_preference", {"theme": "dark", "reason": "easier on eyes"}, embedding=embedding, metadata={"category": "preferences"} )

Source code in src/locus/memory/store.py
async def put_with_embedding(
    self,
    namespace: tuple[str, ...],
    key: str,
    value: Any,
    embedding: list[float],
    metadata: dict[str, Any] | None = None,
) -> None:
    """
    Store a value with its embedding vector for semantic search.

    Optional method — backends advertise support via
    ``capabilities.semantic_search``. Subclasses that support
    semantic search must override this method.

    Args:
        namespace: Hierarchical namespace tuple
        key: Key within the namespace
        value: Value to store (must be JSON-serializable)
        embedding: Vector embedding (e.g., from OpenAI, Cohere, etc.)
        metadata: Optional metadata for filtering

    Raises:
        StoreCapabilityError: If the backend does not support
            semantic search.

    Example:
        # Get embedding from your provider
        embedding = await embedder.embed("User prefers dark theme")

        await store.put_with_embedding(
            ("users", user_id, "memories"),
            "theme_preference",
            {"theme": "dark", "reason": "easier on eyes"},
            embedding=embedding,
            metadata={"category": "preferences"}
        )
    """
    raise StoreCapabilityError(
        capability="semantic_search",
        store_class=type(self).__name__,
        hint="Use put() for plain storage, or pick a vector-capable backend.",
    )

search_by_embedding async

search_by_embedding(namespace: tuple[str, ...], query_embedding: list[float], limit: int = 10, threshold: float | None = None, metadata_filter: dict[str, Any] | None = None) -> list[SemanticSearchResult]

Search for similar items using vector similarity.

Optional method — see capabilities.semantic_search.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Namespace to search in

required
query_embedding list[float]

Vector to compare against stored embeddings

required
limit int

Maximum results to return

10
threshold float | None

Minimum similarity score (0.0-1.0), or None for no threshold

None
metadata_filter dict[str, Any] | None

Optional metadata constraints

None

Returns:

Type Description
list[SemanticSearchResult]

List of SemanticSearchResult, sorted by similarity (highest first)

Raises:

Type Description
StoreCapabilityError

If the backend does not support semantic search.

Source code in src/locus/memory/store.py
async def search_by_embedding(
    self,
    namespace: tuple[str, ...],
    query_embedding: list[float],
    limit: int = 10,
    threshold: float | None = None,
    metadata_filter: dict[str, Any] | None = None,
) -> list[SemanticSearchResult]:
    """
    Search for similar items using vector similarity.

    Optional method — see ``capabilities.semantic_search``.

    Args:
        namespace: Namespace to search in
        query_embedding: Vector to compare against stored embeddings
        limit: Maximum results to return
        threshold: Minimum similarity score (0.0-1.0), or None for no threshold
        metadata_filter: Optional metadata constraints

    Returns:
        List of SemanticSearchResult, sorted by similarity (highest first)

    Raises:
        StoreCapabilityError: If the backend does not support
            semantic search.
    """
    raise StoreCapabilityError(
        capability="semantic_search",
        store_class=type(self).__name__,
        hint="Pick a vector-capable backend (e.g., Oracle 26ai, pgvector).",
    )

get_embedding async

get_embedding(namespace: tuple[str, ...], key: str) -> list[float] | None

Get the embedding vector for a stored item.

Optional method — see capabilities.semantic_search.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Hierarchical namespace tuple

required
key str

Key within the namespace

required

Returns:

Type Description
list[float] | None

Embedding vector or None if not found / no embedding stored.

Raises:

Type Description
StoreCapabilityError

If the backend does not support semantic search.

Source code in src/locus/memory/store.py
async def get_embedding(
    self,
    namespace: tuple[str, ...],
    key: str,
) -> list[float] | None:
    """
    Get the embedding vector for a stored item.

    Optional method — see ``capabilities.semantic_search``.

    Args:
        namespace: Hierarchical namespace tuple
        key: Key within the namespace

    Returns:
        Embedding vector or None if not found / no embedding stored.

    Raises:
        StoreCapabilityError: If the backend does not support
            semantic search.
    """
    raise StoreCapabilityError(
        capability="semantic_search",
        store_class=type(self).__name__,
    )

put_batch async

put_batch(items: list[tuple[tuple[str, ...], str, Any, dict[str, Any] | None]]) -> None

Store multiple items.

Backends that advertise capabilities.batch_operations should override this method to dispatch atomically. The default falls back to sequential put() calls so callers always get functional batching, just without atomicity guarantees.

Parameters:

Name Type Description Default
items list[tuple[tuple[str, ...], str, Any, dict[str, Any] | None]]

List of (namespace, key, value, metadata) tuples

required
Source code in src/locus/memory/store.py
async def put_batch(
    self,
    items: list[tuple[tuple[str, ...], str, Any, dict[str, Any] | None]],
) -> None:
    """
    Store multiple items.

    Backends that advertise ``capabilities.batch_operations`` should
    override this method to dispatch atomically. The default falls
    back to sequential ``put()`` calls so callers always get
    functional batching, just without atomicity guarantees.

    Args:
        items: List of (namespace, key, value, metadata) tuples
    """
    # Sequential fallback — works for every backend, even those that
    # don't advertise batch_operations. Backends that *do* advertise
    # the capability override this method to dispatch in one
    # transaction / round-trip.
    for namespace, key, value, metadata in items:
        await self.put(namespace, key, value, metadata)

get_batch async

get_batch(keys: list[tuple[tuple[str, ...], str]]) -> dict[tuple[tuple[str, ...], str], Any]

Retrieve multiple items.

Parameters:

Name Type Description Default
keys list[tuple[tuple[str, ...], str]]

List of (namespace, key) tuples

required

Returns:

Type Description
dict[tuple[tuple[str, ...], str], Any]

Dict mapping (namespace, key) to value

Source code in src/locus/memory/store.py
async def get_batch(
    self,
    keys: list[tuple[tuple[str, ...], str]],
) -> dict[tuple[tuple[str, ...], str], Any]:
    """
    Retrieve multiple items.

    Args:
        keys: List of (namespace, key) tuples

    Returns:
        Dict mapping (namespace, key) to value
    """
    results = {}
    for namespace, key in keys:
        value = await self.get(namespace, key)
        if value is not None:
            results[(namespace, key)] = value
    return results

list_namespaces async

list_namespaces(prefix: tuple[str, ...] | None = None, limit: int = 100) -> list[tuple[str, ...]]

List all namespaces.

Optional method — see capabilities.list_namespaces.

Parameters:

Name Type Description Default
prefix tuple[str, ...] | None

Optional prefix to filter namespaces

None
limit int

Maximum namespaces to return

100

Returns:

Type Description
list[tuple[str, ...]]

List of namespace tuples

Raises:

Type Description
StoreCapabilityError

If the backend does not support namespace listing.

Source code in src/locus/memory/store.py
async def list_namespaces(
    self,
    prefix: tuple[str, ...] | None = None,
    limit: int = 100,
) -> list[tuple[str, ...]]:
    """
    List all namespaces.

    Optional method — see ``capabilities.list_namespaces``.

    Args:
        prefix: Optional prefix to filter namespaces
        limit: Maximum namespaces to return

    Returns:
        List of namespace tuples

    Raises:
        StoreCapabilityError: If the backend does not support
            namespace listing.
    """
    raise StoreCapabilityError(
        capability="list_namespaces",
        store_class=type(self).__name__,
    )

clear_namespace async

clear_namespace(namespace: tuple[str, ...]) -> int

Delete all items in a namespace.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Namespace to clear

required

Returns:

Type Description
int

Number of items deleted

Source code in src/locus/memory/store.py
async def clear_namespace(
    self,
    namespace: tuple[str, ...],
) -> int:
    """
    Delete all items in a namespace.

    Args:
        namespace: Namespace to clear

    Returns:
        Number of items deleted
    """
    keys = await self.list_keys(namespace, limit=10000)
    count = 0
    for key in keys:
        if await self.delete(namespace, key):
            count += 1
    return count

close async

close() -> None

Close any resources.

Source code in src/locus/memory/store.py
async def close(self) -> None:
    """Close any resources."""

InMemoryStore

InMemoryStore()

Bases: BaseStore

In-memory store implementation.

Fast but not persistent - data is lost when process exits. Useful for testing and development.

Source code in src/locus/memory/store.py
def __init__(self) -> None:
    self._data: dict[str, StoreItem] = {}
    self._namespaces: set[tuple[str, ...]] = set()
    self._lock = asyncio.Lock()

exists async

exists(namespace: tuple[str, ...], key: str) -> bool

Check if a key exists.

Source code in src/locus/memory/store.py
async def exists(
    self,
    namespace: tuple[str, ...],
    key: str,
) -> bool:
    """Check if a key exists."""
    value = await self.get(namespace, key)
    return value is not None

put_with_embedding async

put_with_embedding(namespace: tuple[str, ...], key: str, value: Any, embedding: list[float], metadata: dict[str, Any] | None = None) -> None

Store a value with its embedding vector for semantic search.

Optional method — backends advertise support via capabilities.semantic_search. Subclasses that support semantic search must override this method.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Hierarchical namespace tuple

required
key str

Key within the namespace

required
value Any

Value to store (must be JSON-serializable)

required
embedding list[float]

Vector embedding (e.g., from OpenAI, Cohere, etc.)

required
metadata dict[str, Any] | None

Optional metadata for filtering

None

Raises:

Type Description
StoreCapabilityError

If the backend does not support semantic search.

Example

Get embedding from your provider

embedding = await embedder.embed("User prefers dark theme")

await store.put_with_embedding( ("users", user_id, "memories"), "theme_preference", {"theme": "dark", "reason": "easier on eyes"}, embedding=embedding, metadata={"category": "preferences"} )

Source code in src/locus/memory/store.py
async def put_with_embedding(
    self,
    namespace: tuple[str, ...],
    key: str,
    value: Any,
    embedding: list[float],
    metadata: dict[str, Any] | None = None,
) -> None:
    """
    Store a value with its embedding vector for semantic search.

    Optional method — backends advertise support via
    ``capabilities.semantic_search``. Subclasses that support
    semantic search must override this method.

    Args:
        namespace: Hierarchical namespace tuple
        key: Key within the namespace
        value: Value to store (must be JSON-serializable)
        embedding: Vector embedding (e.g., from OpenAI, Cohere, etc.)
        metadata: Optional metadata for filtering

    Raises:
        StoreCapabilityError: If the backend does not support
            semantic search.

    Example:
        # Get embedding from your provider
        embedding = await embedder.embed("User prefers dark theme")

        await store.put_with_embedding(
            ("users", user_id, "memories"),
            "theme_preference",
            {"theme": "dark", "reason": "easier on eyes"},
            embedding=embedding,
            metadata={"category": "preferences"}
        )
    """
    raise StoreCapabilityError(
        capability="semantic_search",
        store_class=type(self).__name__,
        hint="Use put() for plain storage, or pick a vector-capable backend.",
    )

search_by_embedding async

search_by_embedding(namespace: tuple[str, ...], query_embedding: list[float], limit: int = 10, threshold: float | None = None, metadata_filter: dict[str, Any] | None = None) -> list[SemanticSearchResult]

Search for similar items using vector similarity.

Optional method — see capabilities.semantic_search.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Namespace to search in

required
query_embedding list[float]

Vector to compare against stored embeddings

required
limit int

Maximum results to return

10
threshold float | None

Minimum similarity score (0.0-1.0), or None for no threshold

None
metadata_filter dict[str, Any] | None

Optional metadata constraints

None

Returns:

Type Description
list[SemanticSearchResult]

List of SemanticSearchResult, sorted by similarity (highest first)

Raises:

Type Description
StoreCapabilityError

If the backend does not support semantic search.

Source code in src/locus/memory/store.py
async def search_by_embedding(
    self,
    namespace: tuple[str, ...],
    query_embedding: list[float],
    limit: int = 10,
    threshold: float | None = None,
    metadata_filter: dict[str, Any] | None = None,
) -> list[SemanticSearchResult]:
    """
    Search for similar items using vector similarity.

    Optional method — see ``capabilities.semantic_search``.

    Args:
        namespace: Namespace to search in
        query_embedding: Vector to compare against stored embeddings
        limit: Maximum results to return
        threshold: Minimum similarity score (0.0-1.0), or None for no threshold
        metadata_filter: Optional metadata constraints

    Returns:
        List of SemanticSearchResult, sorted by similarity (highest first)

    Raises:
        StoreCapabilityError: If the backend does not support
            semantic search.
    """
    raise StoreCapabilityError(
        capability="semantic_search",
        store_class=type(self).__name__,
        hint="Pick a vector-capable backend (e.g., Oracle 26ai, pgvector).",
    )

get_embedding async

get_embedding(namespace: tuple[str, ...], key: str) -> list[float] | None

Get the embedding vector for a stored item.

Optional method — see capabilities.semantic_search.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Hierarchical namespace tuple

required
key str

Key within the namespace

required

Returns:

Type Description
list[float] | None

Embedding vector or None if not found / no embedding stored.

Raises:

Type Description
StoreCapabilityError

If the backend does not support semantic search.

Source code in src/locus/memory/store.py
async def get_embedding(
    self,
    namespace: tuple[str, ...],
    key: str,
) -> list[float] | None:
    """
    Get the embedding vector for a stored item.

    Optional method — see ``capabilities.semantic_search``.

    Args:
        namespace: Hierarchical namespace tuple
        key: Key within the namespace

    Returns:
        Embedding vector or None if not found / no embedding stored.

    Raises:
        StoreCapabilityError: If the backend does not support
            semantic search.
    """
    raise StoreCapabilityError(
        capability="semantic_search",
        store_class=type(self).__name__,
    )

get_batch async

get_batch(keys: list[tuple[tuple[str, ...], str]]) -> dict[tuple[tuple[str, ...], str], Any]

Retrieve multiple items.

Parameters:

Name Type Description Default
keys list[tuple[tuple[str, ...], str]]

List of (namespace, key) tuples

required

Returns:

Type Description
dict[tuple[tuple[str, ...], str], Any]

Dict mapping (namespace, key) to value

Source code in src/locus/memory/store.py
async def get_batch(
    self,
    keys: list[tuple[tuple[str, ...], str]],
) -> dict[tuple[tuple[str, ...], str], Any]:
    """
    Retrieve multiple items.

    Args:
        keys: List of (namespace, key) tuples

    Returns:
        Dict mapping (namespace, key) to value
    """
    results = {}
    for namespace, key in keys:
        value = await self.get(namespace, key)
        if value is not None:
            results[(namespace, key)] = value
    return results

clear_namespace async

clear_namespace(namespace: tuple[str, ...]) -> int

Delete all items in a namespace.

Parameters:

Name Type Description Default
namespace tuple[str, ...]

Namespace to clear

required

Returns:

Type Description
int

Number of items deleted

Source code in src/locus/memory/store.py
async def clear_namespace(
    self,
    namespace: tuple[str, ...],
) -> int:
    """
    Delete all items in a namespace.

    Args:
        namespace: Namespace to clear

    Returns:
        Number of items deleted
    """
    keys = await self.list_keys(namespace, limit=10000)
    count = 0
    for key in keys:
        if await self.delete(namespace, key):
            count += 1
    return count

close async

close() -> None

Close any resources.

Source code in src/locus/memory/store.py
async def close(self) -> None:
    """Close any resources."""

NamespacedStore

NamespacedStore(store: BaseStore, namespace: tuple[str, ...])

Store wrapper with a fixed namespace prefix.

Makes it easier to work with a specific scope.

Example

user_store = NamespacedStore(store, ("users", user_id)) await user_store.put("preferences", {"theme": "dark"}) prefs = await user_store.get("preferences")

Source code in src/locus/memory/store.py
def __init__(
    self,
    store: BaseStore,
    namespace: tuple[str, ...],
):
    self._store = store
    self._namespace = namespace

scoped

scoped(*suffix: str) -> NamespacedStore

Create new wrapper with extended namespace.

Source code in src/locus/memory/store.py
def scoped(self, *suffix: str) -> NamespacedStore:
    """Create new wrapper with extended namespace."""
    return NamespacedStore(self._store, (*self._namespace, *suffix))

StoreContext

StoreContext(store: BaseStore, user_id: str | None = None, session_id: str | None = None)

Context object passed to nodes for store access.

Provides convenient methods for common memory operations.

Example

async def my_node(inputs, *, store: StoreContext): # Get user memory user_prefs = await store.get_user_memory("preferences")

# Remember something
await store.remember(
    "discussed_topic",
    {"topic": "python", "timestamp": now}
)

# Search memories
related = await store.search_memories("python")
Source code in src/locus/memory/store.py
def __init__(
    self,
    store: BaseStore,
    user_id: str | None = None,
    session_id: str | None = None,
):
    self._store = store
    self._user_id = user_id
    self._session_id = session_id

store property

store: BaseStore

Access the underlying store.

for_user

for_user(user_id: str | None = None) -> NamespacedStore

Get namespaced store for a user.

If user_id is not provided, uses the user_id from the context.

Source code in src/locus/memory/store.py
def for_user(self, user_id: str | None = None) -> NamespacedStore:
    """Get namespaced store for a user.

    If user_id is not provided, uses the user_id from the context.
    """
    uid = user_id or self._user_id
    if not uid:
        raise ValueError("user_id must be provided or set in context")
    return NamespacedStore(self._store, ("users", uid))

for_session

for_session(session_id: str | None = None) -> NamespacedStore

Get namespaced store for a session.

If session_id is not provided, uses the session_id from the context.

Source code in src/locus/memory/store.py
def for_session(self, session_id: str | None = None) -> NamespacedStore:
    """Get namespaced store for a session.

    If session_id is not provided, uses the session_id from the context.
    """
    sid = session_id or self._session_id
    if not sid:
        raise ValueError("session_id must be provided or set in context")
    return NamespacedStore(self._store, ("sessions", sid))

get_user_memory async

get_user_memory(key: str) -> Any | None

Get a memory for the current user.

Source code in src/locus/memory/store.py
async def get_user_memory(self, key: str) -> Any | None:
    """Get a memory for the current user."""
    if not self._user_id:
        return None
    return await self._store.get(("users", self._user_id, "memories"), key)

remember async

remember(key: str, value: Any, metadata: dict[str, Any] | None = None) -> None

Store a memory for the current user.

Source code in src/locus/memory/store.py
async def remember(
    self,
    key: str,
    value: Any,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Store a memory for the current user."""
    if not self._user_id:
        raise ValueError("No user_id set in store context")
    await self._store.put(
        ("users", self._user_id, "memories"),
        key,
        value,
        metadata,
    )

forget async

forget(key: str) -> bool

Delete a memory for the current user.

Source code in src/locus/memory/store.py
async def forget(self, key: str) -> bool:
    """Delete a memory for the current user."""
    if not self._user_id:
        return False
    return await self._store.delete(("users", self._user_id, "memories"), key)

search_memories async

search_memories(query: str, limit: int = 10) -> list[StoreItem]

Search user memories (full-text).

Source code in src/locus/memory/store.py
async def search_memories(
    self,
    query: str,
    limit: int = 10,
) -> list[StoreItem]:
    """Search user memories (full-text)."""
    if not self._user_id:
        return []
    return await self._store.search(
        ("users", self._user_id, "memories"),
        query,
        limit,
    )

remember_with_embedding async

remember_with_embedding(key: str, value: Any, embedding: list[float], metadata: dict[str, Any] | None = None) -> None

Store a memory with its embedding for semantic search.

Requires: capabilities.semantic_search = True

Parameters:

Name Type Description Default
key str

Memory key

required
value Any

Value to store

required
embedding list[float]

Vector embedding for semantic search

required
metadata dict[str, Any] | None

Optional metadata

None
Example

embedding = await embedder.embed("User likes dark theme") await store.remember_with_embedding( "theme_pref", {"theme": "dark"}, embedding=embedding, )

Source code in src/locus/memory/store.py
async def remember_with_embedding(
    self,
    key: str,
    value: Any,
    embedding: list[float],
    metadata: dict[str, Any] | None = None,
) -> None:
    """
    Store a memory with its embedding for semantic search.

    Requires: capabilities.semantic_search = True

    Args:
        key: Memory key
        value: Value to store
        embedding: Vector embedding for semantic search
        metadata: Optional metadata

    Example:
        embedding = await embedder.embed("User likes dark theme")
        await store.remember_with_embedding(
            "theme_pref",
            {"theme": "dark"},
            embedding=embedding,
        )
    """
    if not self._user_id:
        raise ValueError("No user_id set in store context")
    await self._store.put_with_embedding(
        ("users", self._user_id, "memories"),
        key,
        value,
        embedding,
        metadata,
    )

search_memories_semantic async

search_memories_semantic(query_embedding: list[float], limit: int = 10, threshold: float | None = None) -> list[SemanticSearchResult]

Search user memories by semantic similarity.

Requires: capabilities.semantic_search = True

Parameters:

Name Type Description Default
query_embedding list[float]

Vector to search with

required
limit int

Maximum results

10
threshold float | None

Minimum similarity score (0.0-1.0)

None

Returns:

Type Description
list[SemanticSearchResult]

List of SemanticSearchResult sorted by similarity

Example

query_vec = await embedder.embed("user preferences") results = await store.search_memories_semantic( query_embedding=query_vec, limit=5, threshold=0.7, ) for r in results: print(f"{r.item.key}: {r.score:.2f} similar")

Source code in src/locus/memory/store.py
async def search_memories_semantic(
    self,
    query_embedding: list[float],
    limit: int = 10,
    threshold: float | None = None,
) -> list[SemanticSearchResult]:
    """
    Search user memories by semantic similarity.

    Requires: capabilities.semantic_search = True

    Args:
        query_embedding: Vector to search with
        limit: Maximum results
        threshold: Minimum similarity score (0.0-1.0)

    Returns:
        List of SemanticSearchResult sorted by similarity

    Example:
        query_vec = await embedder.embed("user preferences")
        results = await store.search_memories_semantic(
            query_embedding=query_vec,
            limit=5,
            threshold=0.7,
        )
        for r in results:
            print(f"{r.item.key}: {r.score:.2f} similar")
    """
    if not self._user_id:
        return []
    return await self._store.search_by_embedding(
        ("users", self._user_id, "memories"),
        query_embedding,
        limit,
        threshold,
    )

get_global async

get_global(key: str) -> Any | None

Get a global value.

Source code in src/locus/memory/store.py
async def get_global(self, key: str) -> Any | None:
    """Get a global value."""
    return await self._store.get(("global",), key)

set_global async

set_global(key: str, value: Any, metadata: dict[str, Any] | None = None) -> None

Set a global value.

Source code in src/locus/memory/store.py
async def set_global(
    self,
    key: str,
    value: Any,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Set a global value."""
    await self._store.put(("global",), key, value, metadata)

StoreItem dataclass

StoreItem(namespace: tuple[str, ...], key: str, value: Any, metadata: dict[str, Any], created_at: datetime, updated_at: datetime, version: int = 1)

An item stored in the store.

Attributes:

Name Type Description
namespace tuple[str, ...]

The namespace tuple

key str

The key within namespace

value Any

The stored value

metadata dict[str, Any]

Optional metadata

created_at datetime

Creation timestamp

updated_at datetime

Last update timestamp

version int

Version counter for optimistic locking

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary.

Source code in src/locus/memory/store.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return {
        "namespace": list(self.namespace),
        "key": self.key,
        "value": self.value,
        "metadata": self.metadata,
        "created_at": self.created_at.isoformat(),
        "updated_at": self.updated_at.isoformat(),
        "version": self.version,
    }

from_dict classmethod

from_dict(d: dict[str, Any]) -> StoreItem

Create from dictionary.

Source code in src/locus/memory/store.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> StoreItem:
    """Create from dictionary."""
    return cls(
        namespace=tuple(d["namespace"]),
        key=d["key"],
        value=d["value"],
        metadata=d.get("metadata", {}),
        created_at=datetime.fromisoformat(d["created_at"]),
        updated_at=datetime.fromisoformat(d["updated_at"]),
        version=d.get("version", 1),
    )

StoreCapabilities dataclass

StoreCapabilities(search: bool = False, semantic_search: bool = False, embedding_dimension: int | None = None, ttl: bool = False, list_namespaces: bool = False, batch_operations: bool = False, transactions: bool = False)

Capabilities supported by a store implementation.

Use this to discover what features a store supports.

StoreCapabilityError

StoreCapabilityError(capability: str, store_class: str, hint: str | None = None)

Bases: NotImplementedError

Raised when a store operation is requested that the backend does not support.

Carries structured context (capability, store_class, hint) so callers can branch on the capability or surface a clean message.

Subclasses :class:NotImplementedError so legacy except NotImplementedError blocks continue to catch it — this is strictly a richer payload, not a behaviour change.

Example::

try:
    results = await store.search(("users", uid), query="...")
except StoreCapabilityError as exc:
    log.info(
        "store %s does not support %s; falling back",
        exc.store_class,
        exc.capability,
    )
    results = await store.list_keys(("users", uid))
Source code in src/locus/memory/store.py
def __init__(
    self,
    capability: str,
    store_class: str,
    hint: str | None = None,
) -> None:
    self.capability = capability
    self.store_class = store_class
    self.hint = hint
    msg = f"{store_class} does not support {capability!r}"
    if hint:
        msg = f"{msg}. {hint}"
    super().__init__(msg)

SemanticSearchResult dataclass

SemanticSearchResult(item: StoreItem, score: float, distance: float | None = None)

Result from semantic (vector similarity) search.

Attributes:

Name Type Description
item StoreItem

The matching store item

score float

Similarity score (0.0 to 1.0, higher is more similar)

distance float | None

Raw distance metric (interpretation depends on distance type)

Long-term memory manager

LLMMemoryManager is the default — it uses an auxiliary model to extract and categorize memories at session end and retrieves the top-k relevant entries at session start. NoopMemoryManager is the pass-through used in tests.

BaseMemoryManager

Bases: ABC

Abstract base for long-term memory managers.

Subclasses implement :meth:extract to decide what is worth remembering; the base class handles retrieval, injection, and the session-start / session-end lifecycle hooks that the agent calls automatically.

Two concrete implementations are provided:

  • :class:NoopMemoryManager — no-op, useful for testing.
  • :class:LLMMemoryManager — persists to any :class:BaseStore backend; accepts an optional LLM-backed extraction function.

extract abstractmethod async

extract(messages: list[Message]) -> list[Memory]

Extract durable memories from a finished conversation.

Parameters:

Name Type Description Default
messages list[Message]

The full message history for the completed session.

required

Returns:

Type Description
list[Memory]

List of :class:Memory objects to persist. May be empty.

Source code in src/locus/memory/manager.py
@abstractmethod
async def extract(self, messages: list[Message]) -> list[Memory]:
    """Extract durable memories from a finished conversation.

    Args:
        messages: The full message history for the completed session.

    Returns:
        List of :class:`Memory` objects to persist.  May be empty.
    """
    ...

retrieve abstractmethod async

retrieve(limit: int = 20) -> list[Memory]

Retrieve stored memories for injection at session start.

Parameters:

Name Type Description Default
limit int

Maximum number of memories to return.

20

Returns:

Type Description
list[Memory]

List of :class:Memory objects, most recently updated first.

Source code in src/locus/memory/manager.py
@abstractmethod
async def retrieve(self, limit: int = 20) -> list[Memory]:
    """Retrieve stored memories for injection at session start.

    Args:
        limit: Maximum number of memories to return.

    Returns:
        List of :class:`Memory` objects, most recently updated first.
    """
    ...

save abstractmethod async

save(memories: list[Memory]) -> None

Persist a list of memories to the backing store.

Implementations should upsert by key — re-extracting the same fact updates the record rather than creating a duplicate.

Parameters:

Name Type Description Default
memories list[Memory]

Memories to persist.

required
Source code in src/locus/memory/manager.py
@abstractmethod
async def save(self, memories: list[Memory]) -> None:
    """Persist a list of memories to the backing store.

    Implementations should upsert by key — re-extracting the same
    fact updates the record rather than creating a duplicate.

    Args:
        memories: Memories to persist.
    """
    ...

on_session_start async

on_session_start(state: AgentState) -> AgentState

Retrieve memories and inject them into the agent state.

Called by the agent runtime at the start of every invocation, after on_before_invocation hooks but before the first model call. The default implementation retrieves all stored memories and prepends a formatted system message to state.messages.

Parameters:

Name Type Description Default
state AgentState

Current agent state (just-created or loaded from checkpointer).

required

Returns:

Type Description
AgentState

Possibly-modified state with memory context injected.

Source code in src/locus/memory/manager.py
async def on_session_start(self, state: AgentState) -> AgentState:
    """Retrieve memories and inject them into the agent state.

    Called by the agent runtime at the start of every invocation,
    after ``on_before_invocation`` hooks but before the first model
    call.  The default implementation retrieves all stored memories
    and prepends a formatted system message to ``state.messages``.

    Args:
        state: Current agent state (just-created or loaded from
            checkpointer).

    Returns:
        Possibly-modified state with memory context injected.
    """
    memories = await self.retrieve()
    if not memories:
        return state

    from locus.observability.emit import emit  # noqa: PLC0415

    injected_state = _inject_memories_into_state(state, memories)

    await emit(
        "memory.manager.injected",
        memory_count=len(memories),
        types=[m.type.value for m in memories],
    )

    return injected_state

on_session_end async

on_session_end(state: AgentState) -> None

Extract memories from the finished session and save them.

Called by the agent runtime in the finally block of every invocation, after on_after_invocation hooks but before the final checkpoint.

Parameters:

Name Type Description Default
state AgentState

Final agent state with the complete message history.

required
Source code in src/locus/memory/manager.py
async def on_session_end(self, state: AgentState) -> None:
    """Extract memories from the finished session and save them.

    Called by the agent runtime in the ``finally`` block of every
    invocation, after ``on_after_invocation`` hooks but before the
    final checkpoint.

    Args:
        state: Final agent state with the complete message history.
    """
    memories = await self.extract(list(state.messages))
    if not memories:
        return

    await self.save(memories)

    from locus.observability.emit import emit  # noqa: PLC0415

    await emit(
        "memory.manager.extracted",
        memory_count=len(memories),
        types=[m.type.value for m in memories],
        keys=[m.key for m in memories],
    )

LLMMemoryManager

LLMMemoryManager(store: BaseStore, *, extract_fn: ExtractFn | None = None, namespace_prefix: tuple[str, ...] = ('locus_memory',), max_memories: int = 50, retrieve_limit: int = 20)

Bases: BaseMemoryManager

Memory manager backed by any :class:~locus.memory.store.BaseStore.

Extraction uses either a caller-supplied async function (extract_fn) or a built-in heuristic that pattern-matches common conversational signals (corrections, confirmations, role disclosures). Supply extract_fn for production use; the heuristic is adequate for demos and tests.

Parameters:

Name Type Description Default
store BaseStore

Any :class:~locus.memory.store.BaseStore implementation. InMemoryStore works for development; use RedisBackend, PostgreSQLBackend, OracleBackend, etc. for production.

required
extract_fn ExtractFn | None

Optional async callable (messages: list[Message]) -> list[Memory]. When provided, the heuristic is bypassed.

None
namespace_prefix tuple[str, ...]

Store namespace prefix. Scope per user or tenant by passing ("tenants", tenant_id). Default ("locus_memory",).

('locus_memory',)
max_memories int

Hard cap on the total number of memories kept per type. Oldest entries are pruned when the limit is reached.

50
retrieve_limit int

Maximum memories returned by :meth:retrieve.

20

Example::

from locus.memory.manager import LLMMemoryManager
from locus.memory.backends import OracleBackend

manager = LLMMemoryManager(
    store=OracleBackend(dsn="mydb_high", user="admin", password="secret"),
    extract_fn=my_llm_extractor,
    namespace_prefix=("users", user_id),
)

agent = Agent(model="oci:openai.gpt-5", memory_manager=manager)
Source code in src/locus/memory/manager.py
def __init__(
    self,
    store: BaseStore,
    *,
    extract_fn: ExtractFn | None = None,
    namespace_prefix: tuple[str, ...] = ("locus_memory",),
    max_memories: int = 50,
    retrieve_limit: int = 20,
) -> None:
    self.store = store
    self.extract_fn = extract_fn
    self.namespace_prefix = namespace_prefix
    self.max_memories = max_memories
    self.retrieve_limit = retrieve_limit

extract async

extract(messages: list[Message]) -> list[Memory]

Extract memories from a message list.

Uses extract_fn when provided; otherwise applies the built-in heuristic.

Source code in src/locus/memory/manager.py
async def extract(self, messages: list[Message]) -> list[Memory]:
    """Extract memories from a message list.

    Uses ``extract_fn`` when provided; otherwise applies the
    built-in heuristic.
    """
    if self.extract_fn is not None:
        return await self.extract_fn(messages)
    return _heuristic_extract(messages)

retrieve async

retrieve(limit: int = 20) -> list[Memory]

Retrieve all stored memories across every type.

Returns memories sorted by most-recently-updated first, up to limit entries total.

Source code in src/locus/memory/manager.py
async def retrieve(self, limit: int = 20) -> list[Memory]:
    """Retrieve all stored memories across every type.

    Returns memories sorted by most-recently-updated first, up to
    ``limit`` entries total.
    """
    memories: list[Memory] = []

    for memory_type in MemoryType:
        ns = self._ns(memory_type)
        try:
            items = await self.store.search(ns, query=None, limit=limit)
        except Exception:  # noqa: BLE001
            # Gracefully fall back for backends that don't support search.
            keys = await self.store.list_keys(ns, limit=limit)
            items = []
            for k in keys:
                raw = await self.store.get(ns, k)
                if raw is not None:
                    from datetime import UTC, datetime  # noqa: PLC0415

                    from locus.memory.store import StoreItem  # noqa: PLC0415

                    now = datetime.now(UTC)
                    items.append(
                        StoreItem(
                            namespace=ns,
                            key=k,
                            value=raw,
                            metadata={},
                            created_at=now,
                            updated_at=now,
                        )
                    )

        for item in items:
            try:
                memories.append(Memory.from_store_value(item.value))
            except (KeyError, ValueError):
                pass

    # Sort newest first by updated_at (best-effort — not all items carry it).
    memories.sort(
        key=lambda m: m.metadata.get("updated_at", ""),
        reverse=True,
    )
    return memories[:limit]

save async

save(memories: list[Memory]) -> None

Upsert memories into the backing store.

Memories with the same key and type overwrite the previous entry — no duplicates accumulate.

Source code in src/locus/memory/manager.py
async def save(self, memories: list[Memory]) -> None:
    """Upsert memories into the backing store.

    Memories with the same ``key`` and ``type`` overwrite the
    previous entry — no duplicates accumulate.
    """
    from datetime import UTC, datetime  # noqa: PLC0415

    now = datetime.now(UTC).isoformat()

    for memory in memories:
        ns = self._ns(memory.type)
        value = memory.to_store_value()
        value["metadata"]["updated_at"] = now

        await self.store.put(
            ns,
            memory.key,
            value,
            metadata={"type": memory.type.value, "updated_at": now},
        )

on_session_start async

on_session_start(state: AgentState) -> AgentState

Retrieve memories and inject them into the agent state.

Called by the agent runtime at the start of every invocation, after on_before_invocation hooks but before the first model call. The default implementation retrieves all stored memories and prepends a formatted system message to state.messages.

Parameters:

Name Type Description Default
state AgentState

Current agent state (just-created or loaded from checkpointer).

required

Returns:

Type Description
AgentState

Possibly-modified state with memory context injected.

Source code in src/locus/memory/manager.py
async def on_session_start(self, state: AgentState) -> AgentState:
    """Retrieve memories and inject them into the agent state.

    Called by the agent runtime at the start of every invocation,
    after ``on_before_invocation`` hooks but before the first model
    call.  The default implementation retrieves all stored memories
    and prepends a formatted system message to ``state.messages``.

    Args:
        state: Current agent state (just-created or loaded from
            checkpointer).

    Returns:
        Possibly-modified state with memory context injected.
    """
    memories = await self.retrieve()
    if not memories:
        return state

    from locus.observability.emit import emit  # noqa: PLC0415

    injected_state = _inject_memories_into_state(state, memories)

    await emit(
        "memory.manager.injected",
        memory_count=len(memories),
        types=[m.type.value for m in memories],
    )

    return injected_state

on_session_end async

on_session_end(state: AgentState) -> None

Extract memories from the finished session and save them.

Called by the agent runtime in the finally block of every invocation, after on_after_invocation hooks but before the final checkpoint.

Parameters:

Name Type Description Default
state AgentState

Final agent state with the complete message history.

required
Source code in src/locus/memory/manager.py
async def on_session_end(self, state: AgentState) -> None:
    """Extract memories from the finished session and save them.

    Called by the agent runtime in the ``finally`` block of every
    invocation, after ``on_after_invocation`` hooks but before the
    final checkpoint.

    Args:
        state: Final agent state with the complete message history.
    """
    memories = await self.extract(list(state.messages))
    if not memories:
        return

    await self.save(memories)

    from locus.observability.emit import emit  # noqa: PLC0415

    await emit(
        "memory.manager.extracted",
        memory_count=len(memories),
        types=[m.type.value for m in memories],
        keys=[m.key for m in memories],
    )

NoopMemoryManager

Bases: BaseMemoryManager

Pass-through memory manager — stores and retrieves nothing.

Useful as a test double or as a placeholder when you want the agent-wiring (memory_manager= kwarg) without actual persistence.

Memory dataclass

Memory(type: MemoryType, key: str, content: str, metadata: dict[str, Any] = dict())

A single durable memory entry.

Attributes:

Name Type Description
type MemoryType

Semantic category (user / feedback / project / reference).

key str

Stable logical name within the category. The store uses this as the key, so re-extracting the same fact under the same key updates the record rather than creating a duplicate.

content str

The actual fact or rule, as a human-readable string.

metadata dict[str, Any]

Arbitrary extra fields — confidence, source, why, how_to_apply, ISO timestamps, etc.

to_store_value

to_store_value() -> dict[str, Any]

Serialise to a JSON-compatible dict for the store.

Source code in src/locus/memory/manager.py
def to_store_value(self) -> dict[str, Any]:
    """Serialise to a JSON-compatible dict for the store."""
    return {
        "type": self.type.value,
        "key": self.key,
        "content": self.content,
        "metadata": self.metadata,
    }

from_store_value classmethod

from_store_value(value: dict[str, Any]) -> Memory

Deserialise from a store value dict.

Source code in src/locus/memory/manager.py
@classmethod
def from_store_value(cls, value: dict[str, Any]) -> Memory:
    """Deserialise from a store value dict."""
    return cls(
        type=MemoryType(value["type"]),
        key=value["key"],
        content=value["content"],
        metadata=value.get("metadata", {}),
    )

MemoryType

Bases: StrEnum

Semantic category for a stored memory.

Delta checkpointing

Storage-efficient checkpointer that persists only the diff between consecutive states (~77% storage savings on long conversations). Layered on top of any DeltaStorage backend.

DeltaCheckpointer

DeltaCheckpointer(storage: DeltaStorage | None = None, max_chain_depth: int = 5, compression_level: int = 6)

Delta-based checkpointer for efficient state persistence.

This checkpointer stores only the differences between states, achieving significant storage savings. It maintains a chain of deltas pointing back to a full checkpoint.

Features: - Delta compression: Only store changes from parent state - Chain depth limiting: Force full checkpoint after N deltas - zlib compression: Additional compression of stored data - ~77% storage reduction in typical usage

Parameters:

Name Type Description Default
storage DeltaStorage | None

Storage backend for checkpoints

None
max_chain_depth int

Maximum number of deltas before forcing full checkpoint

5
compression_level int

zlib compression level (0-9, higher = better compression)

6
Source code in src/locus/memory/delta.py
def __init__(
    self,
    storage: DeltaStorage | None = None,
    max_chain_depth: int = 5,
    compression_level: int = 6,
):
    self.storage = storage or InMemoryDeltaStorage()
    self.max_chain_depth = max_chain_depth
    self.compression_level = compression_level
    self._state_cache: dict[str, dict[str, Any]] = {}  # Thread -> checkpoint_id -> state

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None) -> str

Save agent state with delta compression.

If a parent checkpoint exists and chain depth allows, only the delta from parent is stored. Otherwise, a full checkpoint is created.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint ID

None

Returns:

Type Description
str

Checkpoint ID for the saved state

Source code in src/locus/memory/delta.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> str:
    """
    Save agent state with delta compression.

    If a parent checkpoint exists and chain depth allows,
    only the delta from parent is stored. Otherwise, a full
    checkpoint is created.

    Args:
        state: Current agent state
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint ID

    Returns:
        Checkpoint ID for the saved state
    """
    checkpoint_id = checkpoint_id or uuid4().hex

    # Get current state as dict
    current_data = state.to_checkpoint()

    # Check for existing checkpoints
    existing = await self.storage.list_checkpoints(thread_id, limit=1)

    if not existing:
        # No parent - create full checkpoint
        checkpoint = await self._create_full_checkpoint(current_data, thread_id, checkpoint_id)
    else:
        parent_meta = existing[0]

        # Check if we should create a full checkpoint due to chain depth
        if parent_meta.chain_depth >= self.max_chain_depth:
            checkpoint = await self._create_full_checkpoint(
                current_data, thread_id, checkpoint_id
            )
        else:
            # Create delta checkpoint
            parent_checkpoint = await self.storage.retrieve(
                thread_id, parent_meta.checkpoint_id
            )
            if parent_checkpoint is None:
                # Parent not found, create full
                checkpoint = await self._create_full_checkpoint(
                    current_data, thread_id, checkpoint_id
                )
            else:
                # Load parent state and compute delta
                parent_data = await self._load_full_state(thread_id, parent_meta.checkpoint_id)
                if parent_data is None:
                    checkpoint = await self._create_full_checkpoint(
                        current_data, thread_id, checkpoint_id
                    )
                else:
                    checkpoint = await self._create_delta_checkpoint(
                        current_data,
                        parent_data,
                        thread_id,
                        checkpoint_id,
                        parent_meta.checkpoint_id,
                        parent_meta.chain_depth + 1,
                    )

    await self.storage.store(thread_id, checkpoint_id, checkpoint)

    # Update cache
    if thread_id not in self._state_cache:
        self._state_cache[thread_id] = {}
    self._state_cache[thread_id][checkpoint_id] = current_data

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state from checkpoint.

If loading a delta checkpoint, reconstructs full state by walking the delta chain back to a full checkpoint.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint ID (latest if None)

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/delta.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state from checkpoint.

    If loading a delta checkpoint, reconstructs full state
    by walking the delta chain back to a full checkpoint.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint ID (latest if None)

    Returns:
        Restored AgentState or None if not found
    """
    from locus.core.state import AgentState

    if checkpoint_id is None:
        # Get latest checkpoint
        checkpoints = await self.storage.list_checkpoints(thread_id, limit=1)
        if not checkpoints:
            return None
        checkpoint_id = checkpoints[0].checkpoint_id

    # Try cache first
    if thread_id in self._state_cache:
        if checkpoint_id in self._state_cache[thread_id]:
            return AgentState.from_checkpoint(self._state_cache[thread_id][checkpoint_id])

    # Load and reconstruct state
    state_data = await self._load_full_state(thread_id, checkpoint_id)
    if state_data is None:
        return None

    return AgentState.from_checkpoint(state_data)

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoint IDs.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/delta.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoint IDs.

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    metadata_list = await self.storage.list_checkpoints(thread_id, limit)
    return [m.checkpoint_id for m in metadata_list]

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str) -> CheckpointMetadata | None

Get metadata for a specific checkpoint.

Source code in src/locus/memory/delta.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str,
) -> CheckpointMetadata | None:
    """Get metadata for a specific checkpoint."""
    checkpoint = await self.storage.retrieve(thread_id, checkpoint_id)
    return checkpoint.metadata if checkpoint else None

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint(s).

Source code in src/locus/memory/delta.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """Delete checkpoint(s)."""
    # Clear cache
    if thread_id in self._state_cache:
        if checkpoint_id:
            self._state_cache[thread_id].pop(checkpoint_id, None)
        else:
            del self._state_cache[thread_id]

    return await self.storage.delete(thread_id, checkpoint_id)

get_storage_stats async

get_storage_stats(thread_id: str) -> dict[str, Any]

Get storage statistics for a thread.

Returns dict with: - total_checkpoints: Number of checkpoints - total_size: Uncompressed size - compressed_size: Compressed size - compression_ratio: Overall compression ratio - full_checkpoints: Number of full checkpoints - delta_checkpoints: Number of delta checkpoints

Source code in src/locus/memory/delta.py
async def get_storage_stats(
    self,
    thread_id: str,
) -> dict[str, Any]:
    """
    Get storage statistics for a thread.

    Returns dict with:
    - total_checkpoints: Number of checkpoints
    - total_size: Uncompressed size
    - compressed_size: Compressed size
    - compression_ratio: Overall compression ratio
    - full_checkpoints: Number of full checkpoints
    - delta_checkpoints: Number of delta checkpoints
    """
    metadata_list = await self.storage.list_checkpoints(thread_id, limit=1000)

    if not metadata_list:
        return {
            "total_checkpoints": 0,
            "total_size": 0,
            "compressed_size": 0,
            "compression_ratio": 1.0,
            "full_checkpoints": 0,
            "delta_checkpoints": 0,
        }

    total_size = sum(m.size_bytes for m in metadata_list)
    compressed_size = sum(m.compressed_size_bytes for m in metadata_list)
    full_count = sum(1 for m in metadata_list if m.is_full)
    delta_count = sum(1 for m in metadata_list if not m.is_full)

    return {
        "total_checkpoints": len(metadata_list),
        "total_size": total_size,
        "compressed_size": compressed_size,
        "compression_ratio": total_size / compressed_size if compressed_size > 0 else 1.0,
        "full_checkpoints": full_count,
        "delta_checkpoints": delta_count,
    }

DeltaCheckpoint dataclass

DeltaCheckpoint(metadata: CheckpointMetadata, data: bytes, is_delta: bool = True)

A delta checkpoint containing changes from parent.

compression_ratio property

compression_ratio: float

Calculate compression ratio (uncompressed / compressed).

CheckpointMetadata dataclass

CheckpointMetadata(checkpoint_id: str, thread_id: str, parent_id: str | None = None, is_full: bool = True, chain_depth: int = 0, created_at: datetime = (lambda: datetime.now(UTC))(), size_bytes: int = 0, compressed_size_bytes: int = 0)

Metadata for a checkpoint.

DeltaStorage

Bases: ABC

Abstract storage backend for delta checkpoints.

store abstractmethod async

store(thread_id: str, checkpoint_id: str, checkpoint: DeltaCheckpoint) -> None

Store a checkpoint.

Source code in src/locus/memory/delta.py
@abstractmethod
async def store(
    self,
    thread_id: str,
    checkpoint_id: str,
    checkpoint: DeltaCheckpoint,
) -> None:
    """Store a checkpoint."""
    ...

retrieve abstractmethod async

retrieve(thread_id: str, checkpoint_id: str) -> DeltaCheckpoint | None

Retrieve a checkpoint.

Source code in src/locus/memory/delta.py
@abstractmethod
async def retrieve(
    self,
    thread_id: str,
    checkpoint_id: str,
) -> DeltaCheckpoint | None:
    """Retrieve a checkpoint."""
    ...

list_checkpoints abstractmethod async

list_checkpoints(thread_id: str, limit: int = 10) -> list[CheckpointMetadata]

List checkpoint metadata, newest first.

Source code in src/locus/memory/delta.py
@abstractmethod
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[CheckpointMetadata]:
    """List checkpoint metadata, newest first."""
    ...

delete abstractmethod async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint(s).

Source code in src/locus/memory/delta.py
@abstractmethod
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """Delete checkpoint(s)."""
    ...

InMemoryDeltaStorage

InMemoryDeltaStorage()

Bases: DeltaStorage

In-memory storage for delta checkpoints (for testing).

Source code in src/locus/memory/delta.py
def __init__(self) -> None:
    self._storage: dict[str, dict[str, DeltaCheckpoint]] = {}

Registry

String-based checkpointer lookup — used when configuration passes a provider name (e.g. "oracle", "redis") instead of an instance. Custom backends register themselves via register_checkpointer.

get_checkpointer

get_checkpointer(checkpointer_string: str, **kwargs: Any) -> BaseCheckpointer

Get a checkpointer from a string identifier.

Format: "provider" or "provider:config_hint"

The config_hint is provider-specific and is passed as a keyword argument.

Examples:

  • "memory" -> MemoryCheckpointer
  • "file:./checkpoints" -> FileCheckpointer(base_dir="./checkpoints")
  • "redis:localhost:6379" -> RedisCheckpointer(url="redis://localhost:6379")
  • "postgresql:mydb" -> PostgreSQLCheckpointer(database="mydb")
  • "opensearch" -> OpenSearchCheckpointer()
  • "oci:bucket/namespace" -> OCIBucketCheckpointer(bucket_name="bucket", namespace="namespace")
  • "oracle:mydb" -> OracleCheckpointer(database="mydb")

Parameters:

Name Type Description Default
checkpointer_string str

Checkpointer identifier

required
**kwargs Any

Provider-specific configuration

{}

Returns:

Type Description
BaseCheckpointer

Checkpointer instance

Raises:

Type Description
ValueError

If provider is unknown

Source code in src/locus/memory/registry.py
def get_checkpointer(checkpointer_string: str, **kwargs: Any) -> BaseCheckpointer:
    """
    Get a checkpointer from a string identifier.

    Format: "provider" or "provider:config_hint"

    The config_hint is provider-specific and is passed as a keyword argument.

    Examples:
        - "memory" -> MemoryCheckpointer
        - "file:./checkpoints" -> FileCheckpointer(base_dir="./checkpoints")
        - "redis:localhost:6379" -> RedisCheckpointer(url="redis://localhost:6379")
        - "postgresql:mydb" -> PostgreSQLCheckpointer(database="mydb")
        - "opensearch" -> OpenSearchCheckpointer()
        - "oci:bucket/namespace" -> OCIBucketCheckpointer(bucket_name="bucket", namespace="namespace")
        - "oracle:mydb" -> OracleCheckpointer(database="mydb")

    Args:
        checkpointer_string: Checkpointer identifier
        **kwargs: Provider-specific configuration

    Returns:
        Checkpointer instance

    Raises:
        ValueError: If provider is unknown
    """
    if ":" in checkpointer_string:
        provider, config_hint = checkpointer_string.split(":", 1)
    else:
        provider = checkpointer_string
        config_hint = None

    if provider not in _CHECKPOINTERS:
        available = list(_CHECKPOINTERS.keys())
        raise ValueError(
            f"Unknown checkpointer provider: '{provider}'. "
            f"Available providers: {available}. "
            f"Install optional dependencies or register a custom provider."
        )

    # Pass config_hint if provided
    if config_hint:
        kwargs["config_hint"] = config_hint

    return _CHECKPOINTERS[provider](**kwargs)

register_checkpointer

register_checkpointer(name: str, factory: Callable[..., BaseCheckpointer]) -> None

Register a checkpointer provider.

Parameters:

Name Type Description Default
name str

Provider name (e.g., "redis", "postgresql", "oracle")

required
factory Callable[..., BaseCheckpointer]

Factory function that takes kwargs and returns a checkpointer

required
Example

def my_factory(kwargs) -> BaseCheckpointer: ... return MyCustomCheckpointer(kwargs) register_checkpointer("custom", my_factory)

Source code in src/locus/memory/registry.py
def register_checkpointer(
    name: str,
    factory: Callable[..., BaseCheckpointer],
) -> None:
    """
    Register a checkpointer provider.

    Args:
        name: Provider name (e.g., "redis", "postgresql", "oracle")
        factory: Factory function that takes kwargs and returns a checkpointer

    Example:
        >>> def my_factory(**kwargs) -> BaseCheckpointer:
        ...     return MyCustomCheckpointer(**kwargs)
        >>> register_checkpointer("custom", my_factory)
    """
    _CHECKPOINTERS[name] = factory

list_checkpointers

list_checkpointers() -> list[str]

List available checkpointer providers.

Returns:

Type Description
list[str]

List of registered provider names

Source code in src/locus/memory/registry.py
def list_checkpointers() -> list[str]:
    """
    List available checkpointer providers.

    Returns:
        List of registered provider names
    """
    return list(_CHECKPOINTERS.keys())