Skip to content

Oracle 26ai Checkpointer agent

In-memory and on-disk checkpointers are fine for local development — when the process restarts, the conversation is gone. This notebook wires Locus's oracle_checkpointer into an Agent so the agent loop itself writes AgentState to Oracle 26ai on every iteration. A second Agent instance — built fresh, simulating a different process — loads the same thread_id and the conversation continues uninterrupted. The user never calls cp.save — the agent does that for them.

What this covers

  • oracle_checkpointer(...) plugged into Agent(checkpointer=...). The agent loop persists AgentState after every iteration.
  • agent.run_sync(prompt, thread_id="...") loading state for the thread before the first model call and saving after each iteration.
  • checkpoint_every_n_iterations=1 — a mid-loop crash recovers to the last completed iteration, not the start of the turn.
  • A brand-new Agent object resuming the same thread_id and recalling turn 1 without being told what was said.
  • cp.list_threads() as the admin view dashboards reach for.

Prerequisites

The checkpointer and the vector store from notebook 05 can share a single Autonomous Database:

export ORACLE_DSN=mydb_low                   # tnsnames alias in the wallet
export ORACLE_USER=locus_app                 # least-privileged app schema
export ORACLE_PASSWORD='<app-password>'
export ORACLE_WALLET=~/.oci/wallets/mydb     # directory holding tnsnames.ora
export ORACLE_WALLET_PASSWORD='<wallet-pw>'  # if wallet is encrypted

If ORACLE_DSN / ORACLE_PASSWORD aren't set the notebook prints the wiring snippet and exits cleanly — no traceback, no half-initialised state.

Run

python examples/notebook_07_oracle_26ai_checkpointer.py

Schema hygiene

This notebook writes to locus_notebook_06 (overridable via table_name=). For production, pre-create the table out-of-band as a least-privileged app schema owner and run the application user with INSERT / SELECT / UPDATE / DELETE only — see the checkpointer concepts page for the DDL and the rotation guidance.

See also

Source

#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/

"""Notebook 07: durable agent threads on Oracle Database 26ai.

In-memory and on-disk checkpointers are fine for local development —
when the process restarts, the conversation is gone. This notebook
wires Locus's ``oracle_checkpointer`` into an ``Agent`` so the agent
loop itself writes ``AgentState`` (messages, metrics, all of it) to
Oracle 26ai after every iteration. A second ``Agent`` instance — built
fresh, simulating a different process — loads the same ``thread_id``
and the conversation continues uninterrupted.

The point: it isn't a state-blob primitive demo. The *Locus agent
runtime* is the protagonist; Oracle 26ai is the durable substrate it
checkpoints into. The user never calls ``cp.save`` — the agent does
that for them.

Key concepts:

- ``oracle_checkpointer(...)`` returns a ``StorageBackendAdapter``
  wrapped around ``OracleBackend``. Pass it to ``Agent(checkpointer=...)``
  and the agent loop persists state on every iteration.
- ``agent.run_sync(prompt, thread_id="...")`` loads state for that
  thread before the first model call and saves after each iteration.
- ``checkpoint_every_n_iterations=1`` means a mid-loop crash recovers
  to the last completed iteration, not the start of the turn.
- ``cp.list_threads()`` enumerates every persisted conversation — the
  primitive admin dashboards use to enumerate sessions.

The demo simulates two sessions. Session 1 starts a conversation and
the agent persists the state to Oracle automatically. Session 2 spins
up a brand-new ``Agent`` instance — different Python object, same
``thread_id``, same Oracle pool — and the agent recalls turn 1 without
being told what was said.

Run it::

    export ORACLE_DSN=mydb_low                   # tnsnames alias
    export ORACLE_USER=locus_app                 # least-privileged app schema
    export ORACLE_PASSWORD='<app-password>'
    export ORACLE_WALLET=~/.oci/wallets/mydb
    export ORACLE_WALLET_PASSWORD='<wallet-pw>'  # if the wallet is encrypted

    python examples/notebook_07_oracle_26ai_checkpointer.py

If ``ORACLE_DSN`` / ``ORACLE_PASSWORD`` aren't set the notebook prints
the wiring snippet and exits cleanly — no traceback, no
half-initialised state.

Difficulty: Intermediate. Self-contained — no prior notebook required.
"""

from __future__ import annotations

import asyncio
import os
import sys

from config import get_model, print_config

from locus.agent import Agent
from locus.memory.backends import oracle_checkpointer


_REQUIRED_ENV = (
    "ORACLE_DSN",
    "ORACLE_USER",
    "ORACLE_PASSWORD",
    "ORACLE_WALLET",
)


THREAD_ID = "notebook_07_thread"
TABLE_NAME = "locus_notebook_07"


def _missing_env() -> list[str]:
    return [name for name in _REQUIRED_ENV if not os.environ.get(name)]


def _print_skip_banner(missing: list[str]) -> None:
    print("\n--- Notebook 07: Oracle 26ai durable agent thread ---")
    print(
        "Required environment variables not set; skipping the live demo so "
        "this file still runs cleanly in CI.\n"
    )
    print("Missing:")
    for name in missing:
        print(f"  - {name}")
    print(
        "\nProvision an Autonomous Database, drop its wallet under "
        "$ORACLE_WALLET, then set the variables above and re-run."
    )
    print("\nMinimal wiring (what the live path below builds):")
    print(
        """
    from locus.agent import Agent
    from locus.memory.backends import oracle_checkpointer

    cp = oracle_checkpointer(
        dsn=os.environ["ORACLE_DSN"],
        user=os.environ["ORACLE_USER"],
        password=os.environ["ORACLE_PASSWORD"],
        wallet_location=os.environ["ORACLE_WALLET"],
        wallet_password=os.environ.get("ORACLE_WALLET_PASSWORD", ""),
        table_name="locus_checkpoints",
    )
    agent = Agent(model=model, checkpointer=cp, checkpoint_every_n_iterations=1)
    agent.run_sync("My name is Alice.", thread_id="t1")
    agent.run_sync("What's my name?", thread_id="t1")  # recalls turn 1
        """.rstrip()
    )


def _build_checkpointer():
    return oracle_checkpointer(
        dsn=os.environ["ORACLE_DSN"],
        user=os.environ["ORACLE_USER"],
        password=os.environ["ORACLE_PASSWORD"],
        wallet_location=os.path.expanduser(os.environ["ORACLE_WALLET"]),
        wallet_password=os.environ.get("ORACLE_WALLET_PASSWORD", ""),
        table_name=TABLE_NAME,
    )


def _build_agent():
    """Fresh ``Agent`` instance, fresh checkpointer pool — same Oracle table.

    Returning a new ``Agent`` per call is the whole point: it proves the
    durability lives in 26ai, not in the Python object. The two agents
    below share nothing in memory; the conversation survives because
    the Locus agent loop loaded state from Oracle before its first
    model call.
    """
    return Agent(
        model=get_model(max_tokens=120),
        system_prompt=(
            "You are an on-call SRE assistant. Be concise — one or two "
            "sentences per turn. Remember everything the user has told "
            "you earlier in this thread."
        ),
        checkpointer=_build_checkpointer(),
        # Persist after every iteration, not just at session end. A
        # mid-loop crash on a tool call recovers to the last completed
        # iteration instead of the start of the turn.
        checkpoint_every_n_iterations=1,
    )


async def first_session() -> None:
    """Session 1 — operator starts a conversation; agent persists turn 1."""
    print("\n--- Session 1: agent persists turn 1 to Oracle 26ai ---")
    agent = _build_agent()

    result = agent.run_sync(
        "p99 on checkout-api spiked at 14:02. The deploy at 13:58 looks suspicious.",
        thread_id=THREAD_ID,
    )
    print(f"User : p99 on checkout-api spiked at 14:02 (after the 13:58 deploy).")
    print(f"Agent: {result.message.strip()}")
    print(f"  state size after this turn: {len(result.state.messages)} messages")

    # The state hit Oracle, not just memory. Confirm via the
    # checkpointer's admin surface — the same one a dashboard would use.
    cp = _build_checkpointer()
    threads = await cp.list_threads()
    print(f"  threads currently in {TABLE_NAME}: {threads}")


async def second_session() -> None:
    """Session 2 — different ``Agent`` object, same ``thread_id``.

    The new agent loads state from Oracle on the first ``run_sync``
    call. The model never sees turn 1 in the prompt — Locus reads
    ``thread_id`` from the checkpointer and injects the recovered
    messages into the agent loop.
    """
    print("\n--- Session 2: a brand-new Agent resumes the thread ---")
    agent = _build_agent()

    result = agent.run_sync(
        "Has anything like that happened in the last fortnight?",
        thread_id=THREAD_ID,
    )
    print("User : Has anything like that happened in the last fortnight?")
    print(f"Agent: {result.message.strip()}")
    print(f"  state size after this turn: {len(result.state.messages)} messages")

    print(
        "\nThe second Agent loaded prior turns from Oracle 26ai before "
        "the first model call — no AgentState was hand-built, no save/load "
        "was called manually. The Locus agent loop did it on every iteration."
    )


async def main() -> None:
    missing = _missing_env()
    if missing:
        _print_skip_banner(missing)
        return

    print_config()
    await first_session()
    await second_session()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        sys.exit(130)