Skip to content

Checkpointers

Contract

BaseCheckpointer

Bases: ABC

Abstract base class for checkpointer implementations.

Checkpointers handle saving and loading agent state, enabling features like: - Conversation persistence - Session recovery - Branching conversations - State inspection and debugging - Full-text search (backend-dependent) - Metadata queries (backend-dependent)

All methods are async to support various backends (file, database, network storage, etc.).

Use the capabilities property to check which features are available before calling extended methods.

Example

if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling") if checkpointer.capabilities.branching: ... await checkpointer.copy_thread("main", "experiment")

capabilities property

capabilities: CheckpointerCapabilities

Return the capabilities of this checkpointer.

Override in subclasses to advertise supported features.

save abstractmethod async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state.

Parameters:

Name Type Description Default
state AgentState

Current agent state to persist

required
thread_id str

Unique identifier for the conversation thread

required
checkpoint_id str | None

Optional specific checkpoint ID. If not provided, a new ID will be generated.

None
metadata dict[str, Any] | None

Optional metadata for querying/filtering checkpoints

None

Returns:

Type Description
str

Checkpoint ID that can be used to restore this state

Source code in src/locus/memory/checkpointer.py
@abstractmethod
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state.

    Args:
        state: Current agent state to persist
        thread_id: Unique identifier for the conversation thread
        checkpoint_id: Optional specific checkpoint ID. If not provided,
                      a new ID will be generated.
        metadata: Optional metadata for querying/filtering checkpoints

    Returns:
        Checkpoint ID that can be used to restore this state
    """
    ...

load abstractmethod async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state.

Parameters:

Name Type Description Default
thread_id str

Thread identifier to load from

required
checkpoint_id str | None

Optional specific checkpoint ID. If not provided, loads the latest checkpoint.

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/checkpointer.py
@abstractmethod
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state.

    Args:
        thread_id: Thread identifier to load from
        checkpoint_id: Optional specific checkpoint ID. If not provided,
                      loads the latest checkpoint.

    Returns:
        Restored AgentState or None if not found
    """
    ...

list_checkpoints abstractmethod async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints for a thread.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number of checkpoint IDs to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/checkpointer.py
@abstractmethod
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints for a thread.

    Args:
        thread_id: Thread identifier
        limit: Maximum number of checkpoint IDs to return

    Returns:
        List of checkpoint IDs, newest first
    """
    ...

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete a checkpoint or all checkpoints for a thread.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete. If None, deletes all checkpoints for the thread.

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/checkpointer.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete a checkpoint or all checkpoints for a thread.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete. If None,
                      deletes all checkpoints for the thread.

    Returns:
        True if deletion was successful
    """
    raise NotImplementedError("delete not implemented for this backend")

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Requires: capabilities.list_threads = True

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter threads (backend-specific)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/checkpointer.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Requires: capabilities.list_threads = True

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter threads (backend-specific)

    Returns:
        List of thread IDs
    """
    self._require_capability("list_threads")
    raise NotImplementedError("list_threads not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

CheckpointerCapabilities dataclass

CheckpointerCapabilities(search: bool = False, metadata_query: bool = False, vacuum: bool = False, branching: bool = False, ttl: bool = False, list_threads: bool = False, list_with_metadata: bool = False, persistent_checkpoint_ids: bool = False)

Capabilities supported by a checkpointer.

Use this to discover what features a checkpointer supports before calling optional methods.

Example

if checkpointer.capabilities.search: ... results = await checkpointer.search("error handling")

Built-in backends

MemoryCheckpointer

MemoryCheckpointer()

Bases: BaseCheckpointer

In-memory checkpointer for testing and development.

Stores all checkpoints in a dictionary. Data is not persistent and will be lost when the process terminates.

Useful for: - Unit and integration testing - Development and prototyping - Short-lived agent sessions - As a fast caching layer

Capabilities: - list_threads: Yes - persistent_checkpoint_ids: Yes (within process lifetime)

Example
checkpointer = MemoryCheckpointer()

# Save state
checkpoint_id = await checkpointer.save(state, "thread-1")

# Load state
restored = await checkpointer.load("thread-1")
Source code in src/locus/memory/backends/memory.py
def __init__(self) -> None:
    # Storage: {thread_id: {checkpoint_id: (state_data, timestamp, metadata)}}
    self._storage: dict[str, dict[str, tuple[dict[str, Any], datetime, dict[str, Any]]]] = {}

capabilities property

capabilities: CheckpointerCapabilities

Memory checkpointer capabilities.

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state to memory.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for the checkpoint

None

Returns:

Type Description
str

Checkpoint ID for the saved state

Source code in src/locus/memory/backends/memory.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state to memory.

    Args:
        state: Current agent state
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint ID
        metadata: Optional metadata for the checkpoint

    Returns:
        Checkpoint ID for the saved state
    """
    checkpoint_id = checkpoint_id or uuid4().hex

    if thread_id not in self._storage:
        self._storage[thread_id] = {}

    self._storage[thread_id][checkpoint_id] = (
        state.to_checkpoint(),
        datetime.now(UTC),
        metadata or {},
    )

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state from memory.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint ID (latest if None)

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/backends/memory.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state from memory.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint ID (latest if None)

    Returns:
        Restored AgentState or None if not found
    """
    from locus.core.state import AgentState

    if thread_id not in self._storage:
        return None

    thread_data = self._storage[thread_id]

    if not thread_data:
        return None

    if checkpoint_id is None:
        # Get latest checkpoint by timestamp
        latest_id = max(
            thread_data.keys(),
            key=lambda k: thread_data[k][1],
        )
        checkpoint_id = latest_id

    if checkpoint_id not in thread_data:
        return None

    state_data, _, _ = thread_data[checkpoint_id]
    return AgentState.from_checkpoint(state_data)

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints for a thread.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/backends/memory.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints for a thread.

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    if thread_id not in self._storage:
        return []

    thread_data = self._storage[thread_id]

    # Sort by timestamp descending
    sorted_ids = sorted(
        thread_data.keys(),
        key=lambda k: thread_data[k][1],
        reverse=True,
    )

    return sorted_ids[:limit]

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint(s) from memory.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete (all if None)

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/backends/memory.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete checkpoint(s) from memory.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete (all if None)

    Returns:
        True if deletion was successful
    """
    if thread_id not in self._storage:
        return False

    if checkpoint_id is None:
        # Delete all checkpoints for thread
        del self._storage[thread_id]
        return True
    if checkpoint_id in self._storage[thread_id]:
        del self._storage[thread_id][checkpoint_id]
        return True
    return False

clear

clear() -> None

Clear all stored checkpoints.

Source code in src/locus/memory/backends/memory.py
def clear(self) -> None:
    """Clear all stored checkpoints."""
    self._storage.clear()

get_thread_ids

get_thread_ids() -> list[str]

Get list of all thread IDs with checkpoints.

Source code in src/locus/memory/backends/memory.py
def get_thread_ids(self) -> list[str]:
    """Get list of all thread IDs with checkpoints."""
    return list(self._storage.keys())

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter (supports * as wildcard)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/backends/memory.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter (supports * as wildcard)

    Returns:
        List of thread IDs
    """
    import fnmatch

    threads = list(self._storage.keys())

    if pattern != "*":
        threads = [t for t in threads if fnmatch.fnmatch(t, pattern)]

    return threads[:limit]

get_checkpoint_count

get_checkpoint_count(thread_id: str | None = None) -> int

Get count of stored checkpoints.

Parameters:

Name Type Description Default
thread_id str | None

Specific thread (all threads if None)

None

Returns:

Type Description
int

Number of checkpoints

Source code in src/locus/memory/backends/memory.py
def get_checkpoint_count(self, thread_id: str | None = None) -> int:
    """
    Get count of stored checkpoints.

    Args:
        thread_id: Specific thread (all threads if None)

    Returns:
        Number of checkpoints
    """
    if thread_id is not None:
        return len(self._storage.get(thread_id, {}))
    return sum(len(t) for t in self._storage.values())

FileCheckpointer

FileCheckpointer(base_dir: str | Path = '.locus_checkpoints', pretty: bool = True)

Bases: BaseCheckpointer

File-based checkpointer for persistent local storage.

Stores each checkpoint as a JSON file, organized by thread ID. Provides durable storage that survives process restarts.

Parameters:

Name Type Description Default
base_dir str | Path

Base directory for checkpoint storage. Defaults to ".locus_checkpoints" in current directory.

'.locus_checkpoints'
pretty bool

Whether to format JSON for readability (default True)

True
Example
checkpointer = FileCheckpointer("./checkpoints")

# Save state
checkpoint_id = await checkpointer.save(state, "thread-1")

# Load state
restored = await checkpointer.load("thread-1")

# Files are stored at: ./checkpoints/thread-1/{checkpoint_id}.json
Source code in src/locus/memory/backends/file.py
def __init__(
    self,
    base_dir: str | Path = ".locus_checkpoints",
    pretty: bool = True,
):
    self.base_dir = Path(base_dir)
    self.pretty = pretty
    self._lock = asyncio.Lock()

capabilities property

capabilities: CheckpointerCapabilities

Return the capabilities of this checkpointer.

Override in subclasses to advertise supported features.

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Requires: capabilities.list_threads = True

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter threads (backend-specific)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/checkpointer.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Requires: capabilities.list_threads = True

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter threads (backend-specific)

    Returns:
        List of thread IDs
    """
    self._require_capability("list_threads")
    raise NotImplementedError("list_threads not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state to a JSON file.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for querying/filtering checkpoints

None

Returns:

Type Description
str

Checkpoint ID for the saved state

Source code in src/locus/memory/backends/file.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state to a JSON file.

    Args:
        state: Current agent state
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint ID
        metadata: Optional metadata for querying/filtering checkpoints

    Returns:
        Checkpoint ID for the saved state
    """
    checkpoint_id = checkpoint_id or uuid4().hex

    async with self._lock:
        thread_dir = self._get_thread_dir(thread_id)
        thread_dir.mkdir(parents=True, exist_ok=True)

        checkpoint_path = self._get_checkpoint_path(thread_id, checkpoint_id)

        # Prepare data with metadata
        data = {
            "checkpoint_id": checkpoint_id,
            "thread_id": thread_id,
            "created_at": datetime.now(UTC).isoformat(),
            "state": state.to_checkpoint(),
            "metadata": metadata or {},
        }

        # Write to file (run in executor to not block)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, self._write_json, checkpoint_path, data)

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state from a JSON file.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint ID (latest if None)

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/backends/file.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state from a JSON file.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint ID (latest if None)

    Returns:
        Restored AgentState or None if not found
    """
    from locus.core.state import AgentState

    thread_dir = self._get_thread_dir(thread_id)

    if not thread_dir.exists():
        return None

    if checkpoint_id is None:
        # Get latest checkpoint
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        if not checkpoints:
            return None
        checkpoint_id = checkpoints[0]

    checkpoint_path = self._get_checkpoint_path(thread_id, checkpoint_id)

    loop = asyncio.get_event_loop()
    data = await loop.run_in_executor(None, self._read_json, checkpoint_path)

    if data is None:
        return None

    return AgentState.from_checkpoint(data["state"])

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints for a thread.

Reads checkpoint files and returns IDs sorted by creation time (newest first).

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/backends/file.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints for a thread.

    Reads checkpoint files and returns IDs sorted by creation time
    (newest first).

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    thread_dir = self._get_thread_dir(thread_id)

    if not thread_dir.exists():
        return []

    # Get all checkpoint files with their metadata
    checkpoints: list[tuple[str, datetime]] = []

    loop = asyncio.get_event_loop()

    for path in thread_dir.glob("*.json"):
        data = await loop.run_in_executor(None, self._read_json, path)
        if data and "checkpoint_id" in data:
            created_at = datetime.fromisoformat(
                data.get("created_at", "1970-01-01T00:00:00+00:00")
            )
            checkpoints.append((data["checkpoint_id"], created_at))

    # Sort by creation time descending
    checkpoints.sort(key=lambda x: x[1], reverse=True)

    return [cp_id for cp_id, _ in checkpoints[:limit]]

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint file(s).

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete (all if None)

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/backends/file.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete checkpoint file(s).

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete (all if None)

    Returns:
        True if deletion was successful
    """
    import shutil

    thread_dir = self._get_thread_dir(thread_id)

    if not thread_dir.exists():
        return False

    async with self._lock:
        if checkpoint_id is None:
            # Delete entire thread directory
            loop = asyncio.get_event_loop()
            await loop.run_in_executor(
                None, lambda: shutil.rmtree(thread_dir, ignore_errors=True)
            )
            return True
        checkpoint_path = self._get_checkpoint_path(thread_id, checkpoint_id)
        if checkpoint_path.exists():
            checkpoint_path.unlink()
            return True
        return False

get_storage_path

get_storage_path() -> Path

Get the base storage directory path.

Source code in src/locus/memory/backends/file.py
def get_storage_path(self) -> Path:
    """Get the base storage directory path."""
    return self.base_dir

get_disk_usage async

get_disk_usage(thread_id: str | None = None) -> int

Get total disk usage in bytes.

Parameters:

Name Type Description Default
thread_id str | None

Specific thread (all threads if None)

None

Returns:

Type Description
int

Total size in bytes

Source code in src/locus/memory/backends/file.py
async def get_disk_usage(self, thread_id: str | None = None) -> int:
    """
    Get total disk usage in bytes.

    Args:
        thread_id: Specific thread (all threads if None)

    Returns:
        Total size in bytes
    """
    if thread_id is not None:
        thread_dir = self._get_thread_dir(thread_id)
        if not thread_dir.exists():
            return 0
        return sum(f.stat().st_size for f in thread_dir.glob("*.json"))

    if not self.base_dir.exists():
        return 0

    total = 0
    for thread_dir in self.base_dir.iterdir():
        if thread_dir.is_dir():
            total += sum(f.stat().st_size for f in thread_dir.glob("*.json"))
    return total

HTTPCheckpointer

HTTPCheckpointer(base_url: str, headers: dict[str, str] | None = None, auth: tuple[str, str] | None = None, timeout: float = 30.0)

Bases: BaseCheckpointer

HTTP API-based checkpointer for remote storage.

Stores checkpoints via HTTP API calls, suitable for distributed systems or cloud-based storage backends.

The API is expected to implement the following endpoints: - POST /threads/{thread_id}/checkpoints - Create checkpoint - GET /threads/{thread_id}/checkpoints/{checkpoint_id} - Get checkpoint - GET /threads/{thread_id}/checkpoints - List checkpoints - DELETE /threads/{thread_id}/checkpoints/{checkpoint_id} - Delete checkpoint

Parameters:

Name Type Description Default
base_url str

Base URL of the checkpoint API

required
headers dict[str, str] | None

Additional headers to include in requests

None
auth tuple[str, str] | None

Authentication tuple (username, password) for basic auth

None
timeout float

Request timeout in seconds

30.0
Example
checkpointer = HTTPCheckpointer(
    base_url="https://api.example.com/v1",
    headers={"Authorization": "Bearer token"},
)

# Save state
checkpoint_id = await checkpointer.save(state, "thread-1")

# Load state
restored = await checkpointer.load("thread-1")
Source code in src/locus/memory/backends/http.py
def __init__(
    self,
    base_url: str,
    headers: dict[str, str] | None = None,
    auth: tuple[str, str] | None = None,
    timeout: float = 30.0,
):
    self.base_url = base_url.rstrip("/")
    self.headers = headers or {}
    self.auth = auth
    self.timeout = timeout
    self._client: Any = None

capabilities property

capabilities: CheckpointerCapabilities

Return the capabilities of this checkpointer.

Override in subclasses to advertise supported features.

exists async

exists(thread_id: str, checkpoint_id: str | None = None) -> bool

Check if a checkpoint exists.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to check. If None, checks if any checkpoint exists for the thread.

None

Returns:

Type Description
bool

True if the checkpoint exists

Source code in src/locus/memory/checkpointer.py
async def exists(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Check if a checkpoint exists.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to check. If None,
                      checks if any checkpoint exists for the thread.

    Returns:
        True if the checkpoint exists
    """
    if checkpoint_id is None:
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        return len(checkpoints) > 0
    state = await self.load(thread_id, checkpoint_id)
    return state is not None

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

get_metadata async

get_metadata(thread_id: str, checkpoint_id: str | None = None) -> dict[str, Any] | None

Get checkpoint metadata.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint (latest if None)

None

Returns:

Type Description
dict[str, Any] | None

Metadata dict or None if not found

Source code in src/locus/memory/checkpointer.py
async def get_metadata(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> dict[str, Any] | None:
    """
    Get checkpoint metadata.

    Requires: capabilities.metadata_query = True

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint (latest if None)

    Returns:
        Metadata dict or None if not found
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("get_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Requires: capabilities.vacuum = True

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted checkpoints

Source code in src/locus/memory/checkpointer.py
async def vacuum(
    self,
    older_than_days: int = 30,
) -> int:
    """
    Delete old checkpoints.

    Requires: capabilities.vacuum = True

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted checkpoints
    """
    self._require_capability("vacuum")
    raise NotImplementedError("vacuum not implemented")

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy a thread to create a branch.

Requires: capabilities.branching = True

Parameters:

Name Type Description Default
source_thread_id str

Source thread to copy from

required
dest_thread_id str

Destination thread ID

required

Returns:

Type Description
bool

True if successful

Source code in src/locus/memory/checkpointer.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """
    Copy a thread to create a branch.

    Requires: capabilities.branching = True

    Args:
        source_thread_id: Source thread to copy from
        dest_thread_id: Destination thread ID

    Returns:
        True if successful
    """
    self._require_capability("branching")
    raise NotImplementedError("copy_thread not implemented")

list_threads async

list_threads(limit: int = 100, pattern: str = '*') -> list[str]

List all thread IDs.

Requires: capabilities.list_threads = True

Parameters:

Name Type Description Default
limit int

Maximum threads to return

100
pattern str

Pattern to filter threads (backend-specific)

'*'

Returns:

Type Description
list[str]

List of thread IDs

Source code in src/locus/memory/checkpointer.py
async def list_threads(
    self,
    limit: int = 100,
    pattern: str = "*",
) -> list[str]:
    """
    List all thread IDs.

    Requires: capabilities.list_threads = True

    Args:
        limit: Maximum threads to return
        pattern: Pattern to filter threads (backend-specific)

    Returns:
        List of thread IDs
    """
    self._require_capability("list_threads")
    raise NotImplementedError("list_threads not implemented")

list_with_metadata async

list_with_metadata(limit: int = 100) -> list[dict[str, Any]]

List checkpoints with their metadata.

Requires: capabilities.list_with_metadata = True

Parameters:

Name Type Description Default
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of {thread_id, checkpoint_id, metadata, ...} dicts

Source code in src/locus/memory/checkpointer.py
async def list_with_metadata(
    self,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    List checkpoints with their metadata.

    Requires: capabilities.list_with_metadata = True

    Args:
        limit: Maximum results

    Returns:
        List of {thread_id, checkpoint_id, metadata, ...} dicts
    """
    self._require_capability("list_with_metadata")
    raise NotImplementedError("list_with_metadata not implemented")

close async

close() -> None

Close the HTTP client.

Source code in src/locus/memory/backends/http.py
async def close(self) -> None:
    """Close the HTTP client."""
    if self._client is not None:
        await self._client.aclose()
        self._client = None

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state via HTTP POST.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required
thread_id str

Thread identifier

required
checkpoint_id str | None

Optional specific checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for querying/filtering checkpoints

None

Returns:

Type Description
str

Checkpoint ID for the saved state

Raises:

Type Description
HTTPError

If the request fails

Source code in src/locus/memory/backends/http.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state via HTTP POST.

    Args:
        state: Current agent state
        thread_id: Thread identifier
        checkpoint_id: Optional specific checkpoint ID
        metadata: Optional metadata for querying/filtering checkpoints

    Returns:
        Checkpoint ID for the saved state

    Raises:
        HTTPError: If the request fails
    """
    checkpoint_id = checkpoint_id or uuid4().hex

    client = await self._get_client()

    payload = {
        "checkpoint_id": checkpoint_id,
        "thread_id": thread_id,
        "created_at": datetime.now(UTC).isoformat(),
        "state": state.to_checkpoint(),
        "metadata": metadata or {},
    }

    response = await client.post(
        f"/threads/{_encode_path_segment(thread_id)}/checkpoints",
        json=payload,
    )
    response.raise_for_status()

    # Extract checkpoint_id from response if provided
    result: dict[str, Any] = response.json()
    returned_id: str = result.get("checkpoint_id", checkpoint_id)
    return returned_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state via HTTP GET.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint ID (latest if None)

None

Returns:

Type Description
AgentState | None

Restored AgentState or None if not found

Source code in src/locus/memory/backends/http.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """
    Load agent state via HTTP GET.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint ID (latest if None)

    Returns:
        Restored AgentState or None if not found
    """
    from locus.core.state import AgentState

    client = await self._get_client()

    if checkpoint_id is None:
        # Get latest checkpoint
        checkpoints = await self.list_checkpoints(thread_id, limit=1)
        if not checkpoints:
            return None
        checkpoint_id = checkpoints[0]

    try:
        response = await client.get(
            f"/threads/{_encode_path_segment(thread_id)}"
            f"/checkpoints/{_encode_path_segment(checkpoint_id)}",
        )
        response.raise_for_status()
    except Exception:  # noqa: BLE001 — missing/unreachable == absent by design
        return None

    data = response.json()

    # Handle both wrapped and unwrapped state formats
    state_data = data.get("state", data)
    return AgentState.from_checkpoint(state_data)

list_checkpoints async

list_checkpoints(thread_id: str, limit: int = 10) -> list[str]

List available checkpoints via HTTP GET.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
limit int

Maximum number to return

10

Returns:

Type Description
list[str]

List of checkpoint IDs, newest first

Source code in src/locus/memory/backends/http.py
async def list_checkpoints(
    self,
    thread_id: str,
    limit: int = 10,
) -> list[str]:
    """
    List available checkpoints via HTTP GET.

    Args:
        thread_id: Thread identifier
        limit: Maximum number to return

    Returns:
        List of checkpoint IDs, newest first
    """
    client = await self._get_client()

    try:
        response = await client.get(
            f"/threads/{_encode_path_segment(thread_id)}/checkpoints",
            params={"limit": limit},
        )
        response.raise_for_status()
    except Exception:  # noqa: BLE001 — unreachable == empty by design
        return []

    data = response.json()

    # Handle various response formats
    if isinstance(data, list):
        # Direct list of checkpoints
        if data and isinstance(data[0], str):
            return data[:limit]
        if data and isinstance(data[0], dict):
            return [cp["checkpoint_id"] for cp in data[:limit]]
    elif isinstance(data, dict):
        # Wrapped response
        checkpoints = data.get("checkpoints", data.get("data", []))
        if checkpoints and isinstance(checkpoints[0], str):
            truncated: list[str] = checkpoints[:limit]
            return truncated
        if checkpoints and isinstance(checkpoints[0], dict):
            return [cp["checkpoint_id"] for cp in checkpoints[:limit]]

    return []

delete async

delete(thread_id: str, checkpoint_id: str | None = None) -> bool

Delete checkpoint(s) via HTTP DELETE.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
checkpoint_id str | None

Specific checkpoint to delete (all if None)

None

Returns:

Type Description
bool

True if deletion was successful

Source code in src/locus/memory/backends/http.py
async def delete(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> bool:
    """
    Delete checkpoint(s) via HTTP DELETE.

    Args:
        thread_id: Thread identifier
        checkpoint_id: Specific checkpoint to delete (all if None)

    Returns:
        True if deletion was successful
    """
    client = await self._get_client()

    try:
        if checkpoint_id is None:
            # Delete all checkpoints for thread
            response = await client.delete(
                f"/threads/{_encode_path_segment(thread_id)}/checkpoints",
            )
        else:
            response = await client.delete(
                f"/threads/{_encode_path_segment(thread_id)}"
                f"/checkpoints/{_encode_path_segment(checkpoint_id)}",
            )
        response.raise_for_status()
        return True
    except Exception:  # noqa: BLE001 — delete is idempotent; report boolean result
        return False

health_check async

health_check() -> bool

Check if the API is reachable.

Returns:

Type Description
bool

True if the API responds successfully

Source code in src/locus/memory/backends/http.py
async def health_check(self) -> bool:
    """
    Check if the API is reachable.

    Returns:
        True if the API responds successfully
    """
    client = await self._get_client()

    try:
        response = await client.get("/health")
        ok: bool = response.status_code < 400
        return ok
    except Exception:  # noqa: BLE001 — health check is a boolean probe
        return False

__aenter__ async

__aenter__() -> Self

Enter async context manager.

Source code in src/locus/memory/backends/http.py
async def __aenter__(self) -> Self:
    """Enter async context manager."""
    await self._get_client()
    return self

__aexit__ async

__aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Exit async context manager.

Source code in src/locus/memory/backends/http.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit async context manager."""
    await self.close()

OCIBucketBackend

OCIBucketBackend(bucket_name: str, namespace: str, prefix: str = 'locus/checkpoints/', profile_name: str = 'DEFAULT', auth_type: str = 'api_key', region: str | None = None, retry_strategy: Any = None, **kwargs: Any)

Bases: BaseCheckpointer

OCI Object Storage-backed checkpointer.

Durable, per-checkpoint storage with lifecycle-policy support. Pass the instance directly to :class:~locus.agent.Agent — no adapter needed.

Example::

checkpointer = OCIBucketBackend(
    bucket_name="my-checkpoints",
    namespace="yzhbfkqxqsx9",
    profile_name="API_KEY_AUTH",
)
agent = Agent(config=cfg, checkpointer=checkpointer)

With an OCI compute instance principal::

checkpointer = OCIBucketBackend(
    bucket_name="my-checkpoints",
    namespace="yzhbfkqxqsx9",
    auth_type="instance_principal",
)
Capabilities
  • list_threads — yes (via object prefix delimiter listing)
  • list_with_metadata — yes
  • metadata_query — yes (via get_metadata)
  • branching — yes (via copy_thread)
  • vacuum — yes (prefer bucket lifecycle policies for prod)
  • persistent_checkpoint_ids — yes
Source code in src/locus/memory/backends/oci_bucket.py
def __init__(
    self,
    bucket_name: str,
    namespace: str,
    prefix: str = "locus/checkpoints/",
    profile_name: str = "DEFAULT",
    auth_type: str = "api_key",
    region: str | None = None,
    retry_strategy: Any = None,
    **kwargs: Any,
) -> None:
    self.config = OCIBucketConfig(
        bucket_name=bucket_name,
        namespace=namespace,
        prefix=prefix,
        profile_name=profile_name,
        auth_type=auth_type,
        region=region,
        **kwargs,
    )
    self._client: ObjectStorageClient | None = None
    self._initialized = False
    # Override the default retry strategy by passing one explicitly.
    # Default (None) resolves to ``oci.retry.DEFAULT_RETRY_STRATEGY`` at
    # first call — exponential backoff with jitter on 429 / 5xx /
    # transport errors. Pass ``oci.retry.NoneRetryStrategy()`` to
    # disable retries (e.g. for tests).
    self._retry_strategy: Any = retry_strategy

close async

close() -> None

Close any resources (connections, files, etc.).

Override in subclasses if cleanup is needed.

Source code in src/locus/memory/checkpointer.py
async def close(self) -> None:
    """
    Close any resources (connections, files, etc.).

    Override in subclasses if cleanup is needed.
    """

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Full-text search across checkpoints.

Requires: capabilities.search = True

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Raises:

Type Description
NotImplementedError

If backend doesn't support search

Source code in src/locus/memory/checkpointer.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Full-text search across checkpoints.

    Requires: capabilities.search = True

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores

    Raises:
        NotImplementedError: If backend doesn't support search
    """
    self._require_capability("search")
    raise NotImplementedError("search not implemented")

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Requires: capabilities.metadata_query = True

Parameters:

Name Type Description Default
key str

Metadata field name

required
value Any

Value to match

required
limit int

Maximum results

100

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints

Source code in src/locus/memory/checkpointer.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Requires: capabilities.metadata_query = True

    Args:
        key: Metadata field name
        value: Value to match
        limit: Maximum results

    Returns:
        List of matching checkpoints
    """
    self._require_capability("metadata_query")
    raise NotImplementedError("query_by_metadata not implemented")

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete threads whose latest checkpoint is older than the cutoff.

For production, prefer an OCI Object Storage lifecycle rule — it runs server-side and costs nothing in client CPU.

Source code in src/locus/memory/backends/oci_bucket.py
async def vacuum(self, older_than_days: int = 30) -> int:
    """Delete threads whose latest checkpoint is older than the cutoff.

    For production, prefer an OCI Object Storage lifecycle rule — it runs
    server-side and costs nothing in client CPU.
    """
    await self._ensure_bucket()

    cutoff = datetime.now(UTC) - timedelta(days=older_than_days)
    threads = await self.list_threads(limit=1000)

    async def _maybe_delete(thread_id: str) -> bool:
        meta = await self.get_metadata(thread_id)
        if not meta:
            return False
        updated = meta.get("updated_at")
        if not updated:
            return False
        try:
            updated_dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
        except (ValueError, TypeError):
            return False
        if updated_dt >= cutoff:
            return False
        await self.delete(thread_id)
        return True

    results = await asyncio.gather(*(_maybe_delete(t) for t in threads))
    return sum(1 for r in results if r)

copy_thread async

copy_thread(source_thread_id: str, dest_thread_id: str) -> bool

Copy every checkpoint under source to dest (for branching).

Source code in src/locus/memory/backends/oci_bucket.py
async def copy_thread(
    self,
    source_thread_id: str,
    dest_thread_id: str,
) -> bool:
    """Copy every checkpoint under source to dest (for branching)."""
    await self._ensure_bucket()

    checkpoints = await self.list_checkpoints(source_thread_id, limit=1000)
    if not checkpoints:
        return False

    for cp_id in checkpoints:
        payload, meta = await asyncio.gather(
            self._get_json(self._checkpoint_key(source_thread_id, cp_id)),
            self._get_json(self._meta_key(source_thread_id, cp_id)),
        )
        if payload is None:
            continue
        payload["thread_id"] = dest_thread_id
        if meta is not None:
            meta["thread_id"] = dest_thread_id
            await asyncio.gather(
                self._put_json(self._checkpoint_key(dest_thread_id, cp_id), payload),
                self._put_json(self._meta_key(dest_thread_id, cp_id), meta),
            )
        else:
            await self._put_json(self._checkpoint_key(dest_thread_id, cp_id), payload)

    # Point dest's latest at the most-recent source checkpoint.
    await self._put_bytes(
        self._latest_key(dest_thread_id),
        checkpoints[0].encode("utf-8"),
        "text/plain",
    )
    return True

SQLiteBackend

SQLiteBackend(path: str = 'locus_checkpoints.db', **kwargs: Any)

Bases: BaseModel

SQLite checkpoint backend.

Persistent local storage using async SQLite.

Example

backend = SQLiteBackend(path="./checkpoints.db") await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")

Source code in src/locus/memory/backends/sqlite.py
def __init__(self, path: str = "locus_checkpoints.db", **kwargs: Any) -> None:
    config = SQLiteConfig(path=path, **kwargs)
    super().__init__(config=config)

save async

save(thread_id: str, data: dict[str, Any]) -> None

Save checkpoint to SQLite.

Source code in src/locus/memory/backends/sqlite.py
async def save(self, thread_id: str, data: dict[str, Any]) -> None:
    """Save checkpoint to SQLite."""
    import aiosqlite

    await self._ensure_table()

    now = datetime.now(UTC).isoformat()
    json_data = json.dumps(data)

    async with aiosqlite.connect(self.config.path) as db:
        await db.execute(
            f"""
            INSERT INTO {self.config.table_name} (thread_id, data, created_at, updated_at)
            VALUES (?, ?, ?, ?)
            ON CONFLICT(thread_id) DO UPDATE SET
                data = excluded.data,
                updated_at = excluded.updated_at
            """,
            (thread_id, json_data, now, now),
        )
        await db.commit()

load async

load(thread_id: str) -> dict[str, Any] | None

Load checkpoint from SQLite.

Source code in src/locus/memory/backends/sqlite.py
async def load(self, thread_id: str) -> dict[str, Any] | None:
    """Load checkpoint from SQLite."""
    import aiosqlite

    await self._ensure_table()

    async with (
        aiosqlite.connect(self.config.path) as db,
        db.execute(
            f"SELECT data FROM {self.config.table_name} WHERE thread_id = ?",
            (thread_id,),
        ) as cursor,
    ):
        row = await cursor.fetchone()

    if row is None:
        return None

    data: dict[str, Any] = json.loads(row[0])
    return data

delete async

delete(thread_id: str) -> bool

Delete checkpoint from SQLite.

Source code in src/locus/memory/backends/sqlite.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from SQLite."""
    import aiosqlite

    await self._ensure_table()

    async with aiosqlite.connect(self.config.path) as db:
        cursor = await db.execute(
            f"DELETE FROM {self.config.table_name} WHERE thread_id = ?",
            (thread_id,),
        )
        await db.commit()
        deleted: bool = cursor.rowcount > 0
        return deleted

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/sqlite.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    import aiosqlite

    await self._ensure_table()

    async with (
        aiosqlite.connect(self.config.path) as db,
        db.execute(
            f"SELECT 1 FROM {self.config.table_name} WHERE thread_id = ?",
            (thread_id,),
        ) as cursor,
    ):
        row = await cursor.fetchone()

    return row is not None

list_threads async

list_threads(limit: int = 100, offset: int = 0, pattern: str = '%') -> list[str]

List all thread IDs matching pattern.

Source code in src/locus/memory/backends/sqlite.py
async def list_threads(
    self,
    limit: int = 100,
    offset: int = 0,
    pattern: str = "%",
) -> list[str]:
    """List all thread IDs matching pattern."""
    import aiosqlite

    await self._ensure_table()

    async with (
        aiosqlite.connect(self.config.path) as db,
        db.execute(
            f"""SELECT thread_id FROM {self.config.table_name}
            WHERE thread_id LIKE ?
            ORDER BY updated_at DESC
            LIMIT ? OFFSET ?""",
            (pattern, limit, offset),
        ) as cursor,
    ):
        rows = await cursor.fetchall()

    return [row[0] for row in rows]

get_metadata async

get_metadata(thread_id: str) -> dict[str, Any] | None

Get checkpoint metadata (created_at, updated_at).

Source code in src/locus/memory/backends/sqlite.py
async def get_metadata(self, thread_id: str) -> dict[str, Any] | None:
    """Get checkpoint metadata (created_at, updated_at)."""
    import aiosqlite

    await self._ensure_table()

    async with (
        aiosqlite.connect(self.config.path) as db,
        db.execute(
            f"SELECT created_at, updated_at FROM {self.config.table_name} WHERE thread_id = ?",
            (thread_id,),
        ) as cursor,
    ):
        row = await cursor.fetchone()

    if row is None:
        return None

    return {"created_at": row[0], "updated_at": row[1]}

RedisBackend

RedisBackend(url: str = 'redis://localhost:6379', **kwargs: Any)

Bases: BaseModel

Redis checkpoint backend.

Fast key-value storage with optional TTL for checkpoints.

Example

backend = RedisBackend(url="redis://localhost:6379") await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")

Source code in src/locus/memory/backends/redis.py
def __init__(self, url: str = "redis://localhost:6379", **kwargs: Any) -> None:
    config = RedisConfig(url=url, **kwargs)
    super().__init__(config=config)

save async

save(thread_id: str, data: dict[str, Any]) -> None

Save checkpoint to Redis.

Source code in src/locus/memory/backends/redis.py
async def save(self, thread_id: str, data: dict[str, Any]) -> None:
    """Save checkpoint to Redis."""
    client = await self._get_client()
    key = self._key(thread_id)
    value = json.dumps(data)

    if self.config.ttl_seconds:
        await client.setex(key, self.config.ttl_seconds, value)
    else:
        await client.set(key, value)

load async

load(thread_id: str) -> dict[str, Any] | None

Load checkpoint from Redis.

Source code in src/locus/memory/backends/redis.py
async def load(self, thread_id: str) -> dict[str, Any] | None:
    """Load checkpoint from Redis."""
    client = await self._get_client()
    key = self._key(thread_id)
    value = await client.get(key)

    if value is None:
        return None

    data: dict[str, Any] = json.loads(value)
    return data

delete async

delete(thread_id: str) -> bool

Delete checkpoint from Redis.

Source code in src/locus/memory/backends/redis.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from Redis."""
    client = await self._get_client()
    key = self._key(thread_id)
    result: int = await client.delete(key)
    return result > 0

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/redis.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    client = await self._get_client()
    key = self._key(thread_id)
    existing: int = await client.exists(key)
    return existing > 0

list_threads async

list_threads(limit: int = 100, offset: int = 0, pattern: str = '*') -> list[str]

List all thread IDs matching pattern.

Source code in src/locus/memory/backends/redis.py
async def list_threads(
    self,
    limit: int = 100,
    offset: int = 0,
    pattern: str = "*",
) -> list[str]:
    """List all thread IDs matching pattern."""
    client = await self._get_client()
    full_pattern = f"{self.config.prefix}{pattern}"
    keys = await client.keys(full_pattern)
    prefix_len = len(self.config.prefix)
    threads = [k[prefix_len:] for k in keys]
    # Apply offset and limit
    return threads[offset : offset + limit]

close async

close() -> None

Close Redis connection.

Source code in src/locus/memory/backends/redis.py
async def close(self) -> None:
    """Close Redis connection."""
    if self._client:
        await self._client.close()
        self._client = None

PostgreSQLBackend

PostgreSQLBackend(host: str = 'localhost', port: int = 5432, database: str = 'locus', user: str = 'postgres', password: str | SecretStr = '', dsn: str | None = None, **kwargs: Any)

Bases: BaseModel

PostgreSQL checkpoint backend.

Production-grade persistent storage with ACID guarantees.

Features: - Connection pooling - Transaction support - JSON/JSONB storage - Indexing for fast lookups - Concurrent access safe

Example

backend = PostgreSQLBackend( ... host="localhost", ... database="myapp", ... user="postgres", ... password="secret", ... ) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1")

With DSN

backend = PostgreSQLBackend(dsn="postgresql://user:pass@localhost:5432/mydb")

Source code in src/locus/memory/backends/postgresql.py
def __init__(
    self,
    host: str = "localhost",
    port: int = 5432,
    database: str = "locus",
    user: str = "postgres",
    password: str | SecretStr = "",
    dsn: str | None = None,
    **kwargs: Any,
) -> None:
    config = PostgreSQLConfig(
        host=host,
        port=port,
        database=database,
        user=user,
        password=SecretStr(password) if isinstance(password, str) else password,
        dsn=dsn,
        **kwargs,
    )
    super().__init__(config=config)

save async

save(thread_id: str, data: dict[str, Any], checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save checkpoint to PostgreSQL.

Parameters:

Name Type Description Default
thread_id str

Thread identifier

required
data dict[str, Any]

Checkpoint data

required
checkpoint_id str | None

Optional checkpoint ID

None
metadata dict[str, Any] | None

Optional metadata for querying

None

Returns:

Type Description
str

Checkpoint ID

Source code in src/locus/memory/backends/postgresql.py
async def save(
    self,
    thread_id: str,
    data: dict[str, Any],
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save checkpoint to PostgreSQL.

    Args:
        thread_id: Thread identifier
        data: Checkpoint data
        checkpoint_id: Optional checkpoint ID
        metadata: Optional metadata for querying

    Returns:
        Checkpoint ID
    """
    await self._ensure_table()
    pool = await self._get_pool()

    from uuid import uuid4

    checkpoint_id = checkpoint_id or uuid4().hex
    now = datetime.now(UTC)

    async with pool.acquire() as conn:
        await conn.execute(
            f"""
            INSERT INTO {self._full_table_name}
                (thread_id, checkpoint_id, data, created_at, updated_at, metadata)
            VALUES ($1, $2, $3::jsonb, $4, $5, $6::jsonb)
            ON CONFLICT (thread_id) DO UPDATE SET
                checkpoint_id = EXCLUDED.checkpoint_id,
                data = EXCLUDED.data,
                updated_at = EXCLUDED.updated_at,
                metadata = EXCLUDED.metadata
            """,
            thread_id,
            checkpoint_id,
            json.dumps(data),
            now,
            now,
            json.dumps(metadata or {}),
        )

    return checkpoint_id

load async

load(thread_id: str) -> dict[str, Any] | None

Load checkpoint from PostgreSQL.

Source code in src/locus/memory/backends/postgresql.py
async def load(self, thread_id: str) -> dict[str, Any] | None:
    """Load checkpoint from PostgreSQL."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"SELECT data FROM {self._full_table_name} WHERE thread_id = $1",
            thread_id,
        )

    if row is None:
        return None

    data: dict[str, Any] = json.loads(row["data"])
    return data

delete async

delete(thread_id: str) -> bool

Delete checkpoint from PostgreSQL.

Source code in src/locus/memory/backends/postgresql.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from PostgreSQL."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        result: str = await conn.execute(
            f"DELETE FROM {self._full_table_name} WHERE thread_id = $1",
            thread_id,
        )

    return result == "DELETE 1"

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/postgresql.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"SELECT 1 FROM {self._full_table_name} WHERE thread_id = $1",
            thread_id,
        )

    return row is not None

list_threads async

list_threads(pattern: str = '%', limit: int = 100, offset: int = 0) -> list[str]

List all thread IDs matching pattern.

Source code in src/locus/memory/backends/postgresql.py
async def list_threads(
    self,
    pattern: str = "%",
    limit: int = 100,
    offset: int = 0,
) -> list[str]:
    """List all thread IDs matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        rows = await conn.fetch(
            f"""
            SELECT thread_id FROM {self._full_table_name}
            WHERE thread_id LIKE $1
            ORDER BY updated_at DESC
            LIMIT $2 OFFSET $3
            """,
            pattern,
            limit,
            offset,
        )

    return [row["thread_id"] for row in rows]

get_metadata async

get_metadata(thread_id: str) -> dict[str, Any] | None

Get checkpoint metadata.

Source code in src/locus/memory/backends/postgresql.py
async def get_metadata(self, thread_id: str) -> dict[str, Any] | None:
    """Get checkpoint metadata."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"""
            SELECT checkpoint_id, created_at, updated_at, metadata
            FROM {self._full_table_name}
            WHERE thread_id = $1
            """,
            thread_id,
        )

    if row is None:
        return None

    return {
        "checkpoint_id": row["checkpoint_id"],
        "created_at": row["created_at"].isoformat(),
        "updated_at": row["updated_at"].isoformat(),
        "metadata": json.loads(row["metadata"]) if row["metadata"] else {},
    }

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Uses PostgreSQL JSONB operators for efficient querying.

Source code in src/locus/memory/backends/postgresql.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Uses PostgreSQL JSONB operators for efficient querying.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        rows = await conn.fetch(
            f"""
            SELECT thread_id, data, updated_at
            FROM {self._full_table_name}
            WHERE metadata @> $1::jsonb
            ORDER BY updated_at DESC
            LIMIT $2
            """,
            json.dumps({key: value}),
            limit,
        )

    return [
        {
            "thread_id": row["thread_id"],
            "data": json.loads(row["data"]),
            "updated_at": row["updated_at"].isoformat(),
        }
        for row in rows
    ]

search_data async

search_data(path: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Search checkpoints by data field using JSON path.

Parameters:

Name Type Description Default
path str

JSON path (e.g., "messages", "confidence")

required
value Any

Value to match

required
limit int

Maximum results

100
Example

results = await backend.search_data("agent_id", "agent-123")

Source code in src/locus/memory/backends/postgresql.py
async def search_data(
    self,
    path: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Search checkpoints by data field using JSON path.

    Args:
        path: JSON path (e.g., "messages", "confidence")
        value: Value to match
        limit: Maximum results

    Example:
        >>> results = await backend.search_data("agent_id", "agent-123")
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        rows = await conn.fetch(
            f"""
            SELECT thread_id, data, updated_at
            FROM {self._full_table_name}
            WHERE data @> $1::jsonb
            ORDER BY updated_at DESC
            LIMIT $2
            """,
            json.dumps({path: value}),
            limit,
        )

    return [
        {
            "thread_id": row["thread_id"],
            "data": json.loads(row["data"]),
            "updated_at": row["updated_at"].isoformat(),
        }
        for row in rows
    ]

count async

count(pattern: str = '%') -> int

Count checkpoints matching pattern.

Source code in src/locus/memory/backends/postgresql.py
async def count(self, pattern: str = "%") -> int:
    """Count checkpoints matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        row = await conn.fetchrow(
            f"SELECT COUNT(*) as cnt FROM {self._full_table_name} WHERE thread_id LIKE $1",
            pattern,
        )

    return row["cnt"] if row else 0

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted rows

Source code in src/locus/memory/backends/postgresql.py
async def vacuum(self, older_than_days: int = 30) -> int:
    """
    Delete old checkpoints.

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted rows
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn:
        result = await conn.execute(
            f"""
            DELETE FROM {self._full_table_name}
            WHERE updated_at < NOW() - make_interval(days => $1)
            """,
            older_than_days,
        )

    # Parse "DELETE N"
    try:
        return int(result.split()[1])
    except (IndexError, ValueError):
        return 0

close async

close() -> None

Close connection pool.

Source code in src/locus/memory/backends/postgresql.py
async def close(self) -> None:
    """Close connection pool."""
    if self._pool:
        await self._pool.close()
        self._pool = None

OpenSearchBackend

OpenSearchBackend(hosts: list[str] | None = None, index_name: str = 'locus-checkpoints', username: str | None = None, password: str | None = None, **kwargs: Any)

Bases: BaseModel

OpenSearch checkpoint backend.

Scalable document storage with full-text search capabilities.

Example

backend = OpenSearchBackend(hosts=["localhost:9200"]) await backend.save("thread_1", state.model_dump()) data = await backend.load("thread_1") results = await backend.search("user query")

Source code in src/locus/memory/backends/opensearch.py
def __init__(
    self,
    hosts: list[str] | None = None,
    index_name: str = "locus-checkpoints",
    username: str | None = None,
    password: str | None = None,
    **kwargs: Any,
) -> None:
    config = OpenSearchConfig(
        hosts=hosts or ["localhost:9200"],
        index_name=index_name,
        username=username,
        password=password,
        **kwargs,
    )
    super().__init__(config=config)

save async

save(thread_id: str, data: dict[str, Any], metadata: dict[str, Any] | None = None) -> None

Save checkpoint to OpenSearch.

Source code in src/locus/memory/backends/opensearch.py
async def save(
    self,
    thread_id: str,
    data: dict[str, Any],
    metadata: dict[str, Any] | None = None,
) -> None:
    """Save checkpoint to OpenSearch."""
    await self._ensure_index()
    client = await self._get_client()

    now = datetime.now(UTC).isoformat()

    doc = {
        "thread_id": thread_id,
        "data": data,
        "data_json": json.dumps(data),  # For text search
        "updated_at": now,
        "metadata": metadata or {},
    }

    # Check if exists for created_at
    try:
        existing = await client.get(
            index=self.config.index_name,
            id=thread_id,
        )
        doc["created_at"] = existing["_source"].get("created_at", now)
    except Exception:  # noqa: BLE001 — first-write path; any lookup failure == "no prior"
        doc["created_at"] = now

    await client.index(
        index=self.config.index_name,
        id=thread_id,
        body=doc,
        refresh=True,
    )

load async

load(thread_id: str) -> dict[str, Any] | None

Load checkpoint from OpenSearch.

Source code in src/locus/memory/backends/opensearch.py
async def load(self, thread_id: str) -> dict[str, Any] | None:
    """Load checkpoint from OpenSearch."""
    await self._ensure_index()
    client = await self._get_client()

    try:
        result = await client.get(
            index=self.config.index_name,
            id=thread_id,
        )
        data: dict[str, Any] = result["_source"]["data"]
        return data
    except Exception:  # noqa: BLE001 — missing document == None by design
        return None

delete async

delete(thread_id: str) -> bool

Delete checkpoint from OpenSearch.

Source code in src/locus/memory/backends/opensearch.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from OpenSearch."""
    await self._ensure_index()
    client = await self._get_client()

    try:
        await client.delete(
            index=self.config.index_name,
            id=thread_id,
            refresh=True,
        )
        return True
    except Exception:  # noqa: BLE001 — delete is idempotent; report boolean result
        return False

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/opensearch.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    await self._ensure_index()
    client = await self._get_client()

    present: bool = await client.exists(
        index=self.config.index_name,
        id=thread_id,
    )
    return present

list_threads async

list_threads(limit: int = 100, offset: int = 0) -> list[str]

List all thread IDs.

Source code in src/locus/memory/backends/opensearch.py
async def list_threads(
    self,
    limit: int = 100,
    offset: int = 0,
) -> list[str]:
    """List all thread IDs."""
    await self._ensure_index()
    client = await self._get_client()

    result = await client.search(
        index=self.config.index_name,
        body={
            "query": {"match_all": {}},
            "size": limit,
            "from": offset,
            "_source": ["thread_id"],
            "sort": [{"updated_at": "desc"}],
        },
    )

    return [hit["_source"]["thread_id"] for hit in result["hits"]["hits"]]

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Search checkpoints by content.

Parameters:

Name Type Description Default
query str

Search query

required
limit int

Maximum results

10

Returns:

Type Description
list[dict[str, Any]]

List of matching checkpoints with scores

Source code in src/locus/memory/backends/opensearch.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Search checkpoints by content.

    Args:
        query: Search query
        limit: Maximum results

    Returns:
        List of matching checkpoints with scores
    """
    await self._ensure_index()
    client = await self._get_client()

    result = await client.search(
        index=self.config.index_name,
        body={
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["data_json", "thread_id"],
                }
            },
            "size": limit,
            "_source": ["thread_id", "data", "updated_at"],
        },
    )

    return [
        {
            "thread_id": hit["_source"]["thread_id"],
            "data": hit["_source"]["data"],
            "score": hit["_score"],
            "updated_at": hit["_source"]["updated_at"],
        }
        for hit in result["hits"]["hits"]
    ]

get_by_metadata async

get_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Get checkpoints by metadata field.

Source code in src/locus/memory/backends/opensearch.py
async def get_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """Get checkpoints by metadata field."""
    await self._ensure_index()
    client = await self._get_client()

    result = await client.search(
        index=self.config.index_name,
        body={
            "query": {"term": {f"metadata.{key}": value}},
            "size": limit,
        },
    )

    return [
        {
            "thread_id": hit["_source"]["thread_id"],
            "data": hit["_source"]["data"],
        }
        for hit in result["hits"]["hits"]
    ]

close async

close() -> None

Close OpenSearch connection.

Source code in src/locus/memory/backends/opensearch.py
async def close(self) -> None:
    """Close OpenSearch connection."""
    if self._client:
        await self._client.close()
        self._client = None

OracleBackend

OracleBackend(dsn: str | None = None, user: str = 'admin', password: str | SecretStr = '', wallet_location: str | None = None, wallet_password: str | SecretStr | None = None, host: str | None = None, port: int = 1521, service_name: str | None = None, **kwargs: Any)

Bases: BaseModel

Oracle Database checkpoint backend.

Production-grade persistent storage with JSON support and full-text search.

Features: - Connection pooling - JSON column storage with search - Metadata indexing - Vacuum (cleanup old checkpoints) - Works with Autonomous Database (wallet-based auth)

Example with DSN

backend = OracleBackend( ... dsn="mydb_high", # TNS name from tnsnames.ora ... user="admin", ... password="secret", ... wallet_location="/path/to/wallet", ... ) await backend.save("thread_1", state.model_dump())

Example with connection string

backend = OracleBackend( ... host="adb.us-ashburn-1.oraclecloud.com", ... port=1522, ... service_name="xxx_high.adb.oraclecloud.com", ... user="admin", ... password="secret", ... )

Source code in src/locus/memory/backends/oracle.py
def __init__(
    self,
    dsn: str | None = None,
    user: str = "admin",
    password: str | SecretStr = "",
    wallet_location: str | None = None,
    wallet_password: str | SecretStr | None = None,
    host: str | None = None,
    port: int = 1521,
    service_name: str | None = None,
    **kwargs: Any,
) -> None:
    config = OracleConfig(
        dsn=dsn,
        user=user,
        password=SecretStr(password) if isinstance(password, str) else password,
        wallet_location=wallet_location,
        wallet_password=SecretStr(wallet_password)
        if isinstance(wallet_password, str)
        else wallet_password,
        host=host,
        port=port,
        service_name=service_name,
        **kwargs,
    )
    super().__init__(config=config)

save async

save(state: AgentState, thread_id: str, checkpoint_id: str | None = None, metadata: dict[str, Any] | None = None) -> str

Save agent state to Oracle Database.

Implements :meth:BaseCheckpointer.save. Parameter order is (state, thread_id, ...) to match the abstract — the prior (thread_id, data, ...) signature silently mismatched the agent runtime, which calls save(state, thread_id) and would end up trying to bind an :class:AgentState to the VARCHAR2(255) thread_id column.

Parameters:

Name Type Description Default
state AgentState

Current agent state. Serialized via :meth:AgentState.to_checkpoint (Pydantic JSON dump).

required
thread_id str

Thread identifier (column primary key).

required
checkpoint_id str | None

Optional checkpoint ID. Generated if omitted.

None
metadata dict[str, Any] | None

Optional metadata for querying.

None

Returns:

Type Description
str

Checkpoint ID.

Source code in src/locus/memory/backends/oracle.py
async def save(
    self,
    state: AgentState,
    thread_id: str,
    checkpoint_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> str:
    """
    Save agent state to Oracle Database.

    Implements :meth:`BaseCheckpointer.save`. Parameter order is
    ``(state, thread_id, ...)`` to match the abstract — the prior
    ``(thread_id, data, ...)`` signature silently mismatched the
    agent runtime, which calls ``save(state, thread_id)`` and would
    end up trying to bind an :class:`AgentState` to the
    ``VARCHAR2(255) thread_id`` column.

    Args:
        state: Current agent state. Serialized via
            :meth:`AgentState.to_checkpoint` (Pydantic JSON dump).
        thread_id: Thread identifier (column primary key).
        checkpoint_id: Optional checkpoint ID. Generated if omitted.
        metadata: Optional metadata for querying.

    Returns:
        Checkpoint ID.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    from uuid import uuid4

    checkpoint_id = checkpoint_id or uuid4().hex
    data = state.to_checkpoint() if isinstance(state, AgentState) else state

    async with pool.acquire() as conn, conn.cursor() as cursor:
        # Use MERGE for upsert
        await cursor.execute(
            f"""
                MERGE INTO {self._full_table_name} t
                USING (SELECT :thread_id AS thread_id FROM dual) s
                ON (t.thread_id = s.thread_id)
                WHEN MATCHED THEN
                    UPDATE SET
                        checkpoint_id = :checkpoint_id,
                        data = :data,
                        updated_at = SYSTIMESTAMP,
                        metadata = :metadata
                WHEN NOT MATCHED THEN
                    INSERT (thread_id, checkpoint_id, data, metadata)
                    VALUES (:thread_id, :checkpoint_id, :data, :metadata)
                """,
            {
                "thread_id": thread_id,
                "checkpoint_id": checkpoint_id,
                "data": json.dumps(data, default=str),
                "metadata": json.dumps(metadata or {}),
            },
        )
        await conn.commit()

    return checkpoint_id

load async

load(thread_id: str, checkpoint_id: str | None = None) -> AgentState | None

Load agent state from Oracle Database.

checkpoint_id is accepted for :class:BaseCheckpointer signature parity but ignored — this backend stores one row per thread_id (MERGE upsert), so latest-state is the only retrievable checkpoint.

Source code in src/locus/memory/backends/oracle.py
async def load(
    self,
    thread_id: str,
    checkpoint_id: str | None = None,
) -> AgentState | None:
    """Load agent state from Oracle Database.

    ``checkpoint_id`` is accepted for :class:`BaseCheckpointer`
    signature parity but ignored — this backend stores one row per
    ``thread_id`` (MERGE upsert), so latest-state is the only
    retrievable checkpoint.
    """
    del checkpoint_id  # one-row-per-thread; arg present for parity
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"SELECT data FROM {self._full_table_name} WHERE thread_id = :thread_id",
            {"thread_id": thread_id},
        )
        row = await cursor.fetchone()

    if row is None:
        return None

    # Handle CLOB - read if needed
    data = row[0]
    if hasattr(data, "read"):
        data = data.read()

    # oracledb might already return JSON as dict
    if isinstance(data, dict):
        payload = data
    else:
        payload = json.loads(data)
    return AgentState.from_checkpoint(payload)

delete async

delete(thread_id: str) -> bool

Delete checkpoint from Oracle Database.

Source code in src/locus/memory/backends/oracle.py
async def delete(self, thread_id: str) -> bool:
    """Delete checkpoint from Oracle Database."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"DELETE FROM {self._full_table_name} WHERE thread_id = :thread_id",
            {"thread_id": thread_id},
        )
        deleted: bool = cursor.rowcount > 0
        await conn.commit()

    return deleted

exists async

exists(thread_id: str) -> bool

Check if checkpoint exists.

Source code in src/locus/memory/backends/oracle.py
async def exists(self, thread_id: str) -> bool:
    """Check if checkpoint exists."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"SELECT 1 FROM {self._full_table_name} WHERE thread_id = :thread_id",
            {"thread_id": thread_id},
        )
        row = await cursor.fetchone()

    return row is not None

list_threads async

list_threads(limit: int = 100, offset: int = 0, pattern: str = '%') -> list[str]

List all thread IDs matching pattern.

Source code in src/locus/memory/backends/oracle.py
async def list_threads(
    self,
    limit: int = 100,
    offset: int = 0,
    pattern: str = "%",
) -> list[str]:
    """List all thread IDs matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"""
                SELECT thread_id FROM {self._full_table_name}
                WHERE thread_id LIKE :pattern
                ORDER BY updated_at DESC
                OFFSET :offset ROWS FETCH NEXT :limit ROWS ONLY
                """,
            {"pattern": pattern, "limit": limit, "offset": offset},
        )
        rows = await cursor.fetchall()

    return [row[0] for row in rows]

get_metadata async

get_metadata(thread_id: str) -> dict[str, Any] | None

Get checkpoint metadata.

Source code in src/locus/memory/backends/oracle.py
async def get_metadata(self, thread_id: str) -> dict[str, Any] | None:
    """Get checkpoint metadata."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"""
                SELECT checkpoint_id, created_at, updated_at, metadata
                FROM {self._full_table_name}
                WHERE thread_id = :thread_id
                """,
            {"thread_id": thread_id},
        )
        row = await cursor.fetchone()

    if row is None:
        return None

    metadata = row[3]
    if hasattr(metadata, "read"):
        metadata = metadata.read()
    # oracledb might already return JSON as dict
    if isinstance(metadata, str):
        metadata = json.loads(metadata) if metadata else {}
    elif metadata is None:
        metadata = {}

    return {
        "checkpoint_id": row[0],
        "created_at": row[1].isoformat() if row[1] else None,
        "updated_at": row[2].isoformat() if row[2] else None,
        "metadata": metadata,
    }

query_by_metadata async

query_by_metadata(key: str, value: Any, limit: int = 100) -> list[dict[str, Any]]

Query checkpoints by metadata field.

Uses Oracle JSON path expressions.

Source code in src/locus/memory/backends/oracle.py
async def query_by_metadata(
    self,
    key: str,
    value: Any,
    limit: int = 100,
) -> list[dict[str, Any]]:
    """
    Query checkpoints by metadata field.

    Uses Oracle JSON path expressions.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        # Validate key to prevent JSON path injection
        if not key.isidentifier():
            raise ValueError(f"Invalid metadata key: {key!r}")
        # Use JSON_VALUE for querying
        await cursor.execute(
            f"""
                SELECT thread_id, data, updated_at
                FROM {self._full_table_name}
                WHERE JSON_VALUE(metadata, '$.{key}') = :value
                ORDER BY updated_at DESC
                FETCH FIRST :limit ROWS ONLY
                """,
            {"value": str(value), "limit": limit},
        )
        rows = await cursor.fetchall()

    results = []
    for row in rows:
        data = row[1]
        if hasattr(data, "read"):
            data = data.read()
        # oracledb might already return JSON as dict
        if isinstance(data, str):
            data = json.loads(data)
        results.append(
            {
                "thread_id": row[0],
                "data": data,
                "updated_at": row[2].isoformat() if row[2] else None,
            }
        )

    return results

search async

search(query: str, limit: int = 10) -> list[dict[str, Any]]

Search checkpoints by content.

Uses Oracle JSON_TEXTCONTAINS for full-text search within JSON.

Source code in src/locus/memory/backends/oracle.py
async def search(
    self,
    query: str,
    limit: int = 10,
) -> list[dict[str, Any]]:
    """
    Search checkpoints by content.

    Uses Oracle JSON_TEXTCONTAINS for full-text search within JSON.
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        # Simple LIKE search on JSON content
        # For production, use Oracle Text or JSON search index
        await cursor.execute(
            f"""
                SELECT thread_id, data, updated_at
                FROM {self._full_table_name}
                WHERE LOWER(data) LIKE LOWER(:query_pattern)
                ORDER BY updated_at DESC
                FETCH FIRST :limit ROWS ONLY
                """,
            {"query_pattern": f"%{query}%", "limit": limit},
        )
        rows = await cursor.fetchall()

    results = []
    for row in rows:
        data = row[1]
        if hasattr(data, "read"):
            data = data.read()
        # oracledb might already return JSON as dict
        if isinstance(data, str):
            data = json.loads(data)
        results.append(
            {
                "thread_id": row[0],
                "data": data,
                "updated_at": row[2].isoformat() if row[2] else None,
            }
        )

    return results

vacuum async

vacuum(older_than_days: int = 30) -> int

Delete old checkpoints.

Parameters:

Name Type Description Default
older_than_days int

Delete checkpoints older than this

30

Returns:

Type Description
int

Number of deleted rows

Source code in src/locus/memory/backends/oracle.py
async def vacuum(self, older_than_days: int = 30) -> int:
    """
    Delete old checkpoints.

    Args:
        older_than_days: Delete checkpoints older than this

    Returns:
        Number of deleted rows
    """
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        # Use NUMTODSINTERVAL for dynamic interval
        await cursor.execute(
            f"""
                DELETE FROM {self._full_table_name}
                WHERE updated_at < SYSTIMESTAMP - NUMTODSINTERVAL(:days, 'DAY')
                """,
            {"days": older_than_days},
        )
        deleted_count: int = cursor.rowcount
        await conn.commit()

    return deleted_count

count async

count(pattern: str = '%') -> int

Count checkpoints matching pattern.

Source code in src/locus/memory/backends/oracle.py
async def count(self, pattern: str = "%") -> int:
    """Count checkpoints matching pattern."""
    await self._ensure_table()
    pool = await self._get_pool()

    async with pool.acquire() as conn, conn.cursor() as cursor:
        await cursor.execute(
            f"SELECT COUNT(*) FROM {self._full_table_name} WHERE thread_id LIKE :pattern",
            {"pattern": pattern},
        )
        row = await cursor.fetchone()

    return row[0] if row else 0

close async

close() -> None

Close connection pool.

Source code in src/locus/memory/backends/oracle.py
async def close(self) -> None:
    """Close connection pool."""
    if self._pool:
        await self._pool.close()
        self._pool = None