Tutorial 20: Checkpoint Backends¶
This tutorial demonstrates different checkpoint storage backends for persisting agent state and conversation history.
Topics covered:
- Memory checkpointer (development)
- SQLite backend (local persistence)
- File checkpointer (simple storage)
- Backend interface and operations
- Backend selection patterns
Note: Redis, PostgreSQL, and cloud backends require additional setup.
Run with: python examples/tutorial_20_checkpoint_backends.py
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 20: Checkpoint Backends
This tutorial demonstrates different checkpoint storage backends
for persisting agent state and conversation history.
Topics covered:
1. Memory checkpointer (development)
2. SQLite backend (local persistence)
3. File checkpointer (simple storage)
4. Backend interface and operations
5. Backend selection patterns
Note: Redis, PostgreSQL, and cloud backends require additional setup.
Run with:
python examples/tutorial_20_checkpoint_backends.py
"""
import asyncio
import os
import tempfile
from locus.core.messages import Message
from locus.core.state import AgentState
from locus.memory.backends import (
FileCheckpointer,
MemoryCheckpointer,
SQLiteBackend,
)
# SQLite backend requires aiosqlite - check if it's available
try:
import aiosqlite
HAS_SQLITE = True
except ImportError:
HAS_SQLITE = False
async def main():
print("=" * 60)
print("Tutorial 20: Checkpoint Backends")
print("=" * 60)
# Create temp directory for demo
temp_dir = tempfile.mkdtemp()
# =========================================================================
# Part 1: Memory Checkpointer
# =========================================================================
print("\n=== Part 1: Memory Checkpointer ===\n")
# Memory checkpointer for development and testing
memory_cp = MemoryCheckpointer()
# Create an agent state
state = AgentState(agent_id="demo_agent")
state = state.with_message(Message.user("Hello!"))
state = state.with_message(Message.assistant("Hi there!"))
# Save checkpoint
checkpoint_id = await memory_cp.save(state, "thread_1")
print(f"Saved checkpoint: {checkpoint_id}")
# Load checkpoint
loaded = await memory_cp.load("thread_1")
print(f"Loaded state with {len(loaded.messages)} messages")
# List checkpoints
checkpoints = await memory_cp.list_checkpoints("thread_1")
print(f"Available checkpoints: {checkpoints}")
# Memory checkpointer is cleared on restart
print("\nNote: Memory checkpointer loses data on restart")
# =========================================================================
# Part 2: SQLite Backend (Dict-based)
# =========================================================================
print("\n=== Part 2: SQLite Backend ===\n")
sqlite_backend = None # Will be set if aiosqlite is available
if HAS_SQLITE:
db_path = os.path.join(temp_dir, "checkpoints.db")
sqlite_backend = SQLiteBackend(path=db_path)
# Save raw dict checkpoints
for i in range(3):
await sqlite_backend.save(
f"thread_{i}",
{
"agent_id": f"agent_{i}",
"messages": [{"role": "user", "content": f"Message {i}"}],
"iteration": i,
},
)
print("Saved 3 threads to SQLite")
# Load and verify
data = await sqlite_backend.load("thread_1")
print(f"Loaded thread_1: {data}")
# List threads
threads = await sqlite_backend.list_threads()
print(f"All threads: {threads}")
# Check exists
exists = await sqlite_backend.exists("thread_1")
print(f"Thread exists: {exists}")
# Delete a thread
deleted = await sqlite_backend.delete("thread_2")
print(f"Deleted thread_2: {deleted}")
# List again
threads = await sqlite_backend.list_threads()
print(f"Remaining threads: {threads}")
print(f"\nSQLite database: {db_path}")
else:
print("SQLite backend requires 'aiosqlite' package.")
print("Install with: pip install aiosqlite")
print("Skipping SQLite demo...")
# =========================================================================
# Part 3: File Checkpointer
# =========================================================================
print("\n=== Part 3: File Checkpointer ===\n")
file_dir = os.path.join(temp_dir, "checkpoints")
file_cp = FileCheckpointer(base_dir=file_dir)
# Save agent states
state1 = AgentState(agent_id="file_agent_1")
state1 = state1.with_message(Message.system("You are helpful."))
state1 = state1.with_message(Message.user("Help me code."))
await file_cp.save(state1, "conversation_a")
state2 = AgentState(agent_id="file_agent_2")
state2 = state2.with_message(Message.user("Different conversation"))
await file_cp.save(state2, "conversation_b")
print("Saved to file checkpointer")
# Load and verify
loaded = await file_cp.load("conversation_a")
print(f"Loaded: {len(loaded.messages)} messages")
# Check if list_threads is supported
if file_cp.capabilities.list_threads:
threads = await file_cp.list_threads()
print(f"Saved conversations: {threads}")
else:
print("Note: FileCheckpointer doesn't support list_threads")
print(f"\nFile storage: {file_dir}")
# =========================================================================
# Part 4: Checkpointer Interface
# =========================================================================
print("\n=== Part 4: Checkpointer Interface ===\n")
print("Checkpointers (MemoryCheckpointer, FileCheckpointer) implement:")
print(" save(state, thread_id) - Save AgentState")
print(" load(thread_id) - Load AgentState")
print(" delete(thread_id) - Delete checkpoint")
print(" list_checkpoints(thread_id)- List checkpoint IDs")
print(" list_threads() - List all thread IDs")
print("\nBackends (SQLiteBackend, RedisBackend) work with dicts:")
print(" save(thread_id, data) - Save dict data")
print(" load(thread_id) - Load dict data")
print(" delete(thread_id) - Delete data")
print(" exists(thread_id) - Check existence")
print(" list_threads() - List thread IDs")
# =========================================================================
# Part 5: Checkpointer Capabilities
# =========================================================================
print("\n=== Part 5: Checkpointer Capabilities ===\n")
# Each checkpointer reports its capabilities
print("Memory checkpointer capabilities:")
print(f" list_threads: {memory_cp.capabilities.list_threads}")
print(f" persistent_checkpoint_ids: {memory_cp.capabilities.persistent_checkpoint_ids}")
print("\nFile checkpointer capabilities:")
print(f" list_threads: {file_cp.capabilities.list_threads}")
print(f" persistent_checkpoint_ids: {file_cp.capabilities.persistent_checkpoint_ids}")
# =========================================================================
# Part 6: Multiple Checkpoints per Thread
# =========================================================================
print("\n=== Part 6: Multiple Checkpoints ===\n")
# Create multiple checkpoints for the same thread
thread_id = "multi_checkpoint_thread"
state = AgentState(agent_id="agent")
# Checkpoint 1
state = state.with_message(Message.user("First message"))
cp1 = await memory_cp.save(state, thread_id)
# Checkpoint 2 (more progress)
state = state.with_message(Message.assistant("Response"))
state = state.with_iteration(1)
cp2 = await memory_cp.save(state, thread_id)
# Checkpoint 3 (even more progress)
state = state.with_message(Message.user("Follow up"))
cp3 = await memory_cp.save(state, thread_id)
# List all checkpoints
all_cps = await memory_cp.list_checkpoints(thread_id)
print(f"Checkpoints for {thread_id}: {len(all_cps)}")
for cp_id in all_cps:
print(f" - {cp_id}")
# Load specific checkpoint
loaded = await memory_cp.load(thread_id, checkpoint_id=cp1)
print(f"\nLoaded checkpoint 1: {len(loaded.messages)} messages")
# Load latest (default)
latest = await memory_cp.load(thread_id)
print(f"Loaded latest: {len(latest.messages)} messages")
# =========================================================================
# Part 7: Backend Selection Patterns
# =========================================================================
print("\n=== Part 7: Backend Selection ===\n")
def get_checkpointer(environment: str):
"""Select checkpointer based on environment."""
if environment == "development":
return MemoryCheckpointer()
elif environment == "testing":
return MemoryCheckpointer() # Fast, in-memory
elif environment == "production":
# In production, use persistent storage
return FileCheckpointer(base_dir="/var/lib/locus/checkpoints")
else:
raise ValueError(f"Unknown environment: {environment}")
for env in ["development", "testing", "production"]:
cp = get_checkpointer(env)
print(f" {env}: {type(cp).__name__}")
# =========================================================================
# Part 8: Available Backends
# =========================================================================
print("\n=== Part 8: Available Backends ===\n")
backends = [
("MemoryCheckpointer", "In-memory, no dependencies", "Development, testing"),
("FileCheckpointer", "JSON files, no dependencies", "Simple persistence"),
("SQLiteBackend", "Local file, requires aiosqlite", "Single-node storage"),
("RedisBackend", "Redis server, requires redis", "Distributed, high performance"),
("PostgreSQLBackend", "PostgreSQL, requires asyncpg", "Production, ACID compliance"),
("OCIBucketBackend", "OCI Object Storage", "Cloud, scalable storage"),
("OpenSearchBackend", "OpenSearch/Elasticsearch", "Searchable checkpoints"),
("OracleBackend", "Oracle Database", "Enterprise, JSON support"),
]
print("Backend options:")
for name, deps, use_case in backends:
print(f"\n {name}")
print(f" Dependencies: {deps}")
print(f" Use case: {use_case}")
# =========================================================================
# Part 9: Thread Listing and Filtering
# =========================================================================
print("\n=== Part 9: Thread Management ===\n")
if sqlite_backend is not None:
# Create multiple threads with pattern
for user in ["alice", "bob", "charlie"]:
for session in range(2):
thread_id = f"user_{user}_session_{session}"
await sqlite_backend.save(thread_id, {"user": user, "session": session})
# List all threads
all_threads = await sqlite_backend.list_threads()
print(f"Total threads: {len(all_threads)}")
# List with pattern (SQLite supports LIKE patterns)
alice_threads = await sqlite_backend.list_threads(pattern="user_alice%")
print(f"Alice's threads: {alice_threads}")
# List with pagination
page1 = await sqlite_backend.list_threads(limit=3, offset=0)
page2 = await sqlite_backend.list_threads(limit=3, offset=3)
print(f"Page 1: {page1}")
print(f"Page 2: {page2}")
else:
print("SQLite not available - skipping thread management demo")
print("Install aiosqlite to see this functionality")
# =========================================================================
# Part 10: Best Practices
# =========================================================================
print("\n=== Part 10: Best Practices ===\n")
print("1. Use MemoryCheckpointer for unit tests")
print("2. Use FileCheckpointer for development")
print("3. Use Redis/PostgreSQL for production")
print("4. Use meaningful thread IDs (user_id + session)")
print("5. Implement cleanup for old checkpoints")
print("6. Test checkpoint restore after changes")
print("7. Consider encryption for sensitive data")
print("8. Monitor storage usage over time")
# Cleanup
import shutil
shutil.rmtree(temp_dir)
# =========================================================================
print("\n" + "=" * 60)
print("Next: Tutorial 21 - SSE Streaming")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())