Skip to content

Checkpoint Backends

The checkpointer contract is backend-agnostic, but the production default on OCI is Oracle 26ai — native JSON columns, vector and text indexes in one schema, and the full capability set (list_threads, search, vacuum) over a single durable store. Notebook 06 covers the checkpointer contract itself; this notebook drives it against a real ADB.

  • Save and load AgentState via oracle_checkpointer.
  • Inspect the reported capabilities.
  • Walk thread history with list_threads / list_checkpoints.
  • Vacuum old checkpoints with OracleBackend.vacuum.
  • Full-text search across stored conversations.

Run it (requires a running Autonomous Database with its wallet on disk):

export ORACLE_DSN=mydb_low                   # tnsnames alias
export ORACLE_USER=locus_app                 # least-privileged
export ORACLE_PASSWORD='<app-password>'
export ORACLE_WALLET=~/.oci/wallets/mydb
export ORACLE_WALLET_PASSWORD='<wallet-pw>'  # if encrypted
python examples/notebook_54_checkpoint_backends.py

Without the env vars the notebook prints what's missing and exits cleanly so CI stays green. The in-memory checkpointer covered in notebook 10 is the developer default; the Oracle Database 26ai checkpointer covered in notebook 06 is the production default.

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Notebook 49: Checkpoint backends on Oracle Autonomous Database 26ai.

The checkpointer contract is backend-agnostic, but the production
default on OCI is Oracle 26ai — native JSON columns, vector and
text indexes in one schema, and the full capability set (list_threads,
search, vacuum) over a single durable store. Notebook 06 covers the
checkpointer contract itself; this notebook drives it against a real
ADB.

- Save and load AgentState via oracle_checkpointer.
- Inspect the reported capabilities.
- Walk thread history with list_threads / list_checkpoints.
- Vacuum old checkpoints with OracleBackend.vacuum.
- Full-text search across stored conversations.

Run it
    # Requires a running Autonomous Database with its wallet on disk:
    export ORACLE_DSN=mydb_low                   # tnsnames alias
    export ORACLE_USER=locus_app                 # least-privileged
    export ORACLE_PASSWORD='<app-password>'
    export ORACLE_WALLET=~/.oci/wallets/mydb
    export ORACLE_WALLET_PASSWORD='<wallet-pw>'  # if encrypted
    python examples/notebook_54_checkpoint_backends.py

Without the env vars the notebook prints what's missing and exits cleanly
so CI stays green. The in-memory checkpointer covered in notebook 06 is
the developer default; Oracle 26ai is the production default.
"""

import asyncio
import os
import sys

from locus.core.messages import Message
from locus.core.state import AgentState
from locus.memory.backends import oracle_checkpointer
from locus.memory.backends.oracle import OracleBackend


_REQUIRED_ENV = (
    "ORACLE_DSN",
    "ORACLE_USER",
    "ORACLE_PASSWORD",
    "ORACLE_WALLET",
)


def _missing_env() -> list[str]:
    return [name for name in _REQUIRED_ENV if not os.environ.get(name)]


def _common_kwargs() -> dict:
    return {
        "dsn": os.environ["ORACLE_DSN"],
        "user": os.environ["ORACLE_USER"],
        "password": os.environ["ORACLE_PASSWORD"],
        "wallet_location": os.path.expanduser(os.environ["ORACLE_WALLET"]),
        "wallet_password": os.environ.get("ORACLE_WALLET_PASSWORD", ""),
    }


async def main() -> None:
    print("=" * 60)
    print("Notebook 49: Checkpoint backends on Oracle 26ai")
    print("=" * 60)

    missing = _missing_env()
    if missing:
        print(
            "\nRequired environment variables not set; skipping the live "
            "demo so this file still runs cleanly in CI.\n"
        )
        for name in missing:
            print(f"  - {name}")
        print(
            "\nProvision an Autonomous Database, drop its wallet on disk, "
            "then set the variables above and re-run."
        )
        return

    table = "locus_notebook_54"

    # Part 1: round-trip an AgentState through the Checkpointer contract.
    print("\n=== Part 1: Save / load via oracle_checkpointer ===\n")
    cp = oracle_checkpointer(table_name=table, **_common_kwargs())

    state = AgentState(agent_id="demo_agent")
    state = state.with_message(Message.user("Hello from Oracle 26ai!"))
    state = state.with_message(Message.assistant("Hi — your state lives in ADB."))

    checkpoint_id = await cp.save(state, "thread_1")
    print(f"Saved checkpoint id={checkpoint_id} into table={table}")

    loaded = await cp.load("thread_1")
    print(f"Loaded thread_1 with {len(loaded.messages)} messages")

    # Part 2: the capability descriptor — drives feature detection at
    # runtime so generic code can ask whether search or vacuum exist.
    print("\n=== Part 2: Reported capabilities ===\n")
    caps = cp.capabilities
    print(f"  list_threads:              {caps.list_threads}")
    print(f"  persistent_checkpoint_ids: {caps.persistent_checkpoint_ids}")
    print(f"  search:                    {caps.search}")
    print(f"  metadata_query:            {caps.metadata_query}")
    print(f"  vacuum:                    {caps.vacuum}")

    # Part 3: enumerate stored conversations and their checkpoint history.
    print("\n=== Part 3: Enumerate stored conversations ===\n")
    # Save a second thread so the listing has something to show.
    other = AgentState(agent_id="demo_agent")
    other = other.with_message(Message.user("Second thread"))
    await cp.save(other, "thread_2")

    threads = await cp.list_threads()
    print(f"Threads on this backend: {threads}")
    for tid in threads:
        cps = await cp.list_checkpoints(tid)
        print(f"  {tid}: {len(cps)} checkpoint(s)")

    # Part 4: vacuum old rows. Production deployments want a periodic
    # job that prunes checkpoints older than the retention window.
    print("\n=== Part 4: Vacuum old checkpoints ===\n")
    backend = OracleBackend(table_name=table, **_common_kwargs())
    removed = await backend.vacuum(older_than_days=30)
    print(f"vacuum(older_than_days=30) removed {removed} stale row(s).")

    # Part 5: full-text search across every stored thread.
    print("\n=== Part 5: Search across checkpoints ===\n")
    hits = await backend.search("Oracle")
    print(f"search('Oracle') returned {len(hits)} thread id(s): {hits[:5]}")

    print("\nDone — every checkpoint above is durable in Oracle 26ai.")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        sys.exit(130)