Oracle 26ai Checkpointer agent¶
In-memory and on-disk checkpointers are fine for local development —
when the process restarts, the conversation is gone. This notebook
wires Locus's oracle_checkpointer into an Agent so the agent loop
itself writes AgentState to Oracle 26ai on every iteration. A second
Agent instance — built fresh, simulating a different process — loads
the same thread_id and the conversation continues uninterrupted.
The user never calls cp.save — the agent does that for them.
What this covers¶
oracle_checkpointer(...)plugged intoAgent(checkpointer=...). The agent loop persistsAgentStateafter every iteration.agent.run_sync(prompt, thread_id="...")loading state for the thread before the first model call and saving after each iteration.checkpoint_every_n_iterations=1— a mid-loop crash recovers to the last completed iteration, not the start of the turn.- A brand-new
Agentobject resuming the samethread_idand recalling turn 1 without being told what was said. cp.list_threads()as the admin view dashboards reach for.
Prerequisites¶
The checkpointer and the vector store from notebook 05 can share a single Autonomous Database:
export ORACLE_DSN=mydb_low # tnsnames alias in the wallet
export ORACLE_USER=locus_app # least-privileged app schema
export ORACLE_PASSWORD='<app-password>'
export ORACLE_WALLET=~/.oci/wallets/mydb # directory holding tnsnames.ora
export ORACLE_WALLET_PASSWORD='<wallet-pw>' # if wallet is encrypted
If ORACLE_DSN / ORACLE_PASSWORD aren't set the notebook prints the
wiring snippet and exits cleanly — no traceback, no half-initialised
state.
Run¶
Schema hygiene¶
This notebook writes to locus_notebook_06 (overridable via
table_name=). For production, pre-create the table out-of-band as a
least-privileged app schema owner and run the application user with
INSERT / SELECT / UPDATE / DELETE only — see the
checkpointer concepts page for the DDL
and the rotation guidance.
See also¶
- Concepts — Checkpointers & Store
- Notebook 05 — Oracle 26ai RAG (shares the same wallet)
- Oracle Database 26ai — documentation
- Oracle Autonomous Database — documentation
- python-oracledb driver
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Notebook 07: durable agent threads on Oracle Database 26ai.
In-memory and on-disk checkpointers are fine for local development —
when the process restarts, the conversation is gone. This notebook
wires Locus's ``oracle_checkpointer`` into an ``Agent`` so the agent
loop itself writes ``AgentState`` (messages, metrics, all of it) to
Oracle 26ai after every iteration. A second ``Agent`` instance — built
fresh, simulating a different process — loads the same ``thread_id``
and the conversation continues uninterrupted.
The point: it isn't a state-blob primitive demo. The *Locus agent
runtime* is the protagonist; Oracle 26ai is the durable substrate it
checkpoints into. The user never calls ``cp.save`` — the agent does
that for them.
Key concepts:
- ``oracle_checkpointer(...)`` returns a ``StorageBackendAdapter``
wrapped around ``OracleBackend``. Pass it to ``Agent(checkpointer=...)``
and the agent loop persists state on every iteration.
- ``agent.run_sync(prompt, thread_id="...")`` loads state for that
thread before the first model call and saves after each iteration.
- ``checkpoint_every_n_iterations=1`` means a mid-loop crash recovers
to the last completed iteration, not the start of the turn.
- ``cp.list_threads()`` enumerates every persisted conversation — the
primitive admin dashboards use to enumerate sessions.
The demo simulates two sessions. Session 1 starts a conversation and
the agent persists the state to Oracle automatically. Session 2 spins
up a brand-new ``Agent`` instance — different Python object, same
``thread_id``, same Oracle pool — and the agent recalls turn 1 without
being told what was said.
Run it::
export ORACLE_DSN=mydb_low # tnsnames alias
export ORACLE_USER=locus_app # least-privileged app schema
export ORACLE_PASSWORD='<app-password>'
export ORACLE_WALLET=~/.oci/wallets/mydb
export ORACLE_WALLET_PASSWORD='<wallet-pw>' # if the wallet is encrypted
python examples/notebook_07_oracle_26ai_checkpointer.py
If ``ORACLE_DSN`` / ``ORACLE_PASSWORD`` aren't set the notebook prints
the wiring snippet and exits cleanly — no traceback, no
half-initialised state.
Difficulty: Intermediate. Self-contained — no prior notebook required.
"""
from __future__ import annotations
import asyncio
import os
import sys
from config import get_model, print_config
from locus.agent import Agent
from locus.memory.backends import oracle_checkpointer
_REQUIRED_ENV = (
"ORACLE_DSN",
"ORACLE_USER",
"ORACLE_PASSWORD",
"ORACLE_WALLET",
)
THREAD_ID = "notebook_07_thread"
TABLE_NAME = "locus_notebook_07"
def _missing_env() -> list[str]:
return [name for name in _REQUIRED_ENV if not os.environ.get(name)]
def _print_skip_banner(missing: list[str]) -> None:
print("\n--- Notebook 07: Oracle 26ai durable agent thread ---")
print(
"Required environment variables not set; skipping the live demo so "
"this file still runs cleanly in CI.\n"
)
print("Missing:")
for name in missing:
print(f" - {name}")
print(
"\nProvision an Autonomous Database, drop its wallet under "
"$ORACLE_WALLET, then set the variables above and re-run."
)
print("\nMinimal wiring (what the live path below builds):")
print(
"""
from locus.agent import Agent
from locus.memory.backends import oracle_checkpointer
cp = oracle_checkpointer(
dsn=os.environ["ORACLE_DSN"],
user=os.environ["ORACLE_USER"],
password=os.environ["ORACLE_PASSWORD"],
wallet_location=os.environ["ORACLE_WALLET"],
wallet_password=os.environ.get("ORACLE_WALLET_PASSWORD", ""),
table_name="locus_checkpoints",
)
agent = Agent(model=model, checkpointer=cp, checkpoint_every_n_iterations=1)
agent.run_sync("My name is Alice.", thread_id="t1")
agent.run_sync("What's my name?", thread_id="t1") # recalls turn 1
""".rstrip()
)
def _build_checkpointer():
return oracle_checkpointer(
dsn=os.environ["ORACLE_DSN"],
user=os.environ["ORACLE_USER"],
password=os.environ["ORACLE_PASSWORD"],
wallet_location=os.path.expanduser(os.environ["ORACLE_WALLET"]),
wallet_password=os.environ.get("ORACLE_WALLET_PASSWORD", ""),
table_name=TABLE_NAME,
)
def _build_agent():
"""Fresh ``Agent`` instance, fresh checkpointer pool — same Oracle table.
Returning a new ``Agent`` per call is the whole point: it proves the
durability lives in 26ai, not in the Python object. The two agents
below share nothing in memory; the conversation survives because
the Locus agent loop loaded state from Oracle before its first
model call.
"""
return Agent(
model=get_model(max_tokens=120),
system_prompt=(
"You are an on-call SRE assistant. Be concise — one or two "
"sentences per turn. Remember everything the user has told "
"you earlier in this thread."
),
checkpointer=_build_checkpointer(),
# Persist after every iteration, not just at session end. A
# mid-loop crash on a tool call recovers to the last completed
# iteration instead of the start of the turn.
checkpoint_every_n_iterations=1,
)
async def first_session() -> None:
"""Session 1 — operator starts a conversation; agent persists turn 1."""
print("\n--- Session 1: agent persists turn 1 to Oracle 26ai ---")
agent = _build_agent()
result = agent.run_sync(
"p99 on checkout-api spiked at 14:02. The deploy at 13:58 looks suspicious.",
thread_id=THREAD_ID,
)
print(f"User : p99 on checkout-api spiked at 14:02 (after the 13:58 deploy).")
print(f"Agent: {result.message.strip()}")
print(f" state size after this turn: {len(result.state.messages)} messages")
# The state hit Oracle, not just memory. Confirm via the
# checkpointer's admin surface — the same one a dashboard would use.
cp = _build_checkpointer()
threads = await cp.list_threads()
print(f" threads currently in {TABLE_NAME}: {threads}")
async def second_session() -> None:
"""Session 2 — different ``Agent`` object, same ``thread_id``.
The new agent loads state from Oracle on the first ``run_sync``
call. The model never sees turn 1 in the prompt — Locus reads
``thread_id`` from the checkpointer and injects the recovered
messages into the agent loop.
"""
print("\n--- Session 2: a brand-new Agent resumes the thread ---")
agent = _build_agent()
result = agent.run_sync(
"Has anything like that happened in the last fortnight?",
thread_id=THREAD_ID,
)
print("User : Has anything like that happened in the last fortnight?")
print(f"Agent: {result.message.strip()}")
print(f" state size after this turn: {len(result.state.messages)} messages")
print(
"\nThe second Agent loaded prior turns from Oracle 26ai before "
"the first model call — no AgentState was hand-built, no save/load "
"was called manually. The Locus agent loop did it on every iteration."
)
async def main() -> None:
missing = _missing_env()
if missing:
_print_skip_banner(missing)
return
print_config()
await first_session()
await second_session()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
sys.exit(130)