Oracle 26ai versioned checkpoint agent¶
A Locus Agent driving the versioned OracleCheckpointSaver. The
agent runs three turns; after each one, the post-turn AgentState
gets serialised via AgentState.to_checkpoint and written to the
saver with a monotonic checkpoint_id and a parent link. The result:
an auditable lineage of the agent's life inside Oracle 26ai, queryable
by list_checkpoints / get(checkpoint_id=...).
Contrast with the simpler oracle_checkpointer from notebook 07:
that one MERGEs a single row per thread (latest wins). Reach for
OracleCheckpointSaver when you need history (time-travel, fork) or
intra-step durability (put_writes / get_writes). This notebook
runs both side-by-side — the single-row checkpointer for live thread
continuity, the versioned saver for the audit trail.
What this covers¶
Agent(checkpointer=oracle_checkpointer(...))providing per-turn state continuity for the live thread.- After each
agent.run_sync(prompt, thread_id=...),saver.put(thread_id, checkpoint_id=vN, parent_checkpoint_id=vN-1, checkpoint_data=result.state.to_checkpoint())appending a new versioned row. saver.list_checkpoints(thread_id=...)showing the lineage; each row carries themetadata(turn number, stop reason).saver.get(thread_id, checkpoint_id="v1")plusAgentState.from_checkpointto rehydrate the historical state for branching.saver.put_writes(...)for idempotent intra-step pending writes against the agent's latest checkpoint.
Prerequisites¶
export ORACLE_DSN=mydb_low
export ORACLE_USER=locus_app
export ORACLE_PASSWORD='<app-password>'
export ORACLE_WALLET=~/.oci/wallets/mydb
export ORACLE_WALLET_PASSWORD='<wallet-pw>' # if encrypted
If those env vars aren't set the notebook prints a skip-banner and exits cleanly — no traceback.
Run¶
See also¶
- Oracle 26ai checkpointer (single-row)
- Concepts → Checkpointers & Store
- Oracle Database 26ai — documentation
- Oracle Autonomous Database — documentation
- python-oracledb driver
Source¶
#!/usr/bin/env python3
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""Notebook 12: versioned agent history on Oracle Database 26ai.
Notebook 07's ``oracle_checkpointer`` keeps **one row per thread** —
``MERGE`` on save means the latest write wins and history is
destructive. That's the right shape for most agent threads. But when
you need a LangGraph-shape *history-preserving* saver — one row per
``(thread_id, checkpoint_ns, checkpoint_id)`` plus a sibling table for
intra-step pending writes — reach for
:class:`OracleCheckpointSaver`.
This notebook puts a Locus ``Agent`` on top of it. After every
``run_sync`` the agent's :meth:`AgentState.to_checkpoint` snapshot is
written to the saver with a monotonic ``checkpoint_id`` and a parent
link, so the saver ends up with the agent's full audit trail. We then
demonstrate ``list_checkpoints`` (lineage), ``get(checkpoint_id=v1)``
(rewind read), and ``put_writes`` (intra-step durability) — every
operation driven by the agent's actual state, not synthetic blobs.
It is the locus-native equivalent of ``langgraph-oracledb``'s
``OracleSaver`` / ``AsyncOracleSaver`` — same two-table schema, same
method surface — but with **zero** langchain / langgraph imports. The
same Autonomous Database wallet drives both this saver and the
single-row ``oracle_checkpointer`` side-by-side, so you can mix and
match per workload.
Key concepts:
- Two physical tables: ``<table>_checkpoints`` (one row per
``(thread_id, checkpoint_ns, checkpoint_id)`` with
``parent_checkpoint_id`` walking the lineage) and ``<table>_writes``
(pending intra-step writes keyed by ``task_id`` + monotonic ``idx``).
- ``put(...)`` inserts; never upserts. Re-saving a different
``checkpoint_id`` for the same thread *appends* — that's the whole
point of the versioned shape.
- ``get(thread_id=...)`` with no ``checkpoint_id`` returns the newest
row; ``list_checkpoints(...)`` enumerates the lineage newest-first.
- ``put_writes(...)`` is idempotent: it replaces any prior writes for
the same ``(thread, ns, checkpoint, task)`` tuple, so retries are
safe.
Run it::
export ORACLE_DSN=mydb_low # tnsnames alias
export ORACLE_USER=locus_app # least-privileged app schema
export ORACLE_PASSWORD='<app-password>'
export ORACLE_WALLET=~/.oci/wallets/mydb
export ORACLE_WALLET_PASSWORD='<wallet-pw>' # if encrypted
python examples/notebook_12_oracle_versioned_saver.py
If those env vars aren't set the notebook prints the wiring snippet and
exits cleanly — no traceback, no half-initialised state.
Difficulty: Intermediate. Self-contained — no prior notebook required.
"""
from __future__ import annotations
import asyncio
import os
import sys
from config import get_model, print_config
from locus.agent import Agent
from locus.core.state import AgentState
from locus.memory.backends import OracleCheckpointSaver, oracle_checkpointer
_REQUIRED_ENV = (
"ORACLE_DSN",
"ORACLE_USER",
"ORACLE_PASSWORD",
"ORACLE_WALLET",
)
TABLE_PREFIX = "locus_notebook_12"
THREAD_ID = "agent-incident-001"
_TURNS = (
"Open the incident: checkout-api p99 is up 4× over the last 15 minutes.",
"What service did we deploy at 13:58 and is anything correlated?",
"Draft a one-sentence customer-facing status update.",
)
def _missing_env() -> list[str]:
return [name for name in _REQUIRED_ENV if not os.environ.get(name)]
def _print_skip_banner(missing: list[str]) -> None:
print("\n--- Notebook 12: OracleCheckpointSaver agent ---")
print(
"Required environment variables not set; skipping the live demo so "
"this file still runs cleanly in CI.\n"
)
print("Missing:")
for name in missing:
print(f" - {name}")
print(
"\nProvision an Autonomous Database, drop its wallet under "
"$ORACLE_WALLET, then set the variables above and re-run."
)
print("\nMinimal wiring (what the live path below builds):")
print(
"""
from locus.agent import Agent
from locus.memory.backends import OracleCheckpointSaver
cp = oracle_checkpointer(dsn=..., table_name="locus_notebook_12_live")
saver = OracleCheckpointSaver(dsn=..., user=..., wallet_location=...,
table_name="locus_notebook_12")
agent = Agent(model=model, system_prompt="...", checkpointer=cp,
checkpoint_every_n_iterations=1)
parent = None
for i, turn in enumerate(turns, start=1):
result = agent.run_sync(turn, thread_id="t1") # thread_id continues
cid = f"v{i}"
await saver.put(thread_id="t1", checkpoint_id=cid,
parent_checkpoint_id=parent,
checkpoint_data=result.state.to_checkpoint())
parent = cid
""".rstrip()
)
def _build_saver() -> OracleCheckpointSaver:
return OracleCheckpointSaver(
dsn=os.environ["ORACLE_DSN"],
user=os.environ["ORACLE_USER"],
password=os.environ["ORACLE_PASSWORD"],
wallet_location=os.path.expanduser(os.environ["ORACLE_WALLET"]),
wallet_password=os.environ.get("ORACLE_WALLET_PASSWORD", ""),
table_name=TABLE_PREFIX,
)
async def _drop_tables(saver: OracleCheckpointSaver) -> None:
"""Drop both demo tables so the notebook is re-runnable."""
pool = await saver._get_pool() # noqa: SLF001 — cleanup helper, not public API
async with pool.acquire() as conn, conn.cursor() as cursor:
for table in (f"{TABLE_PREFIX}_writes", f"{TABLE_PREFIX}_checkpoints"):
try:
await cursor.execute(f"DROP TABLE {table} PURGE")
except Exception as exc: # pragma: no cover — cleanup is best-effort
print(f" (cleanup) DROP TABLE {table} skipped: {exc}")
await conn.commit()
async def run_agent_with_versioned_history(agent: Agent, saver: OracleCheckpointSaver) -> None:
"""Drive the agent through three turns, snapshotting each one.
Each ``run_sync`` returns an :class:`AgentResult` whose ``.state``
is the post-turn :class:`AgentState`. We serialise that with
:meth:`AgentState.to_checkpoint` (the same call the in-process
checkpointer uses) and write it to the saver under a fresh
``checkpoint_id`` linked to the previous one — building a lineage
inside Oracle 26ai.
"""
print("\n--- 1. Agent runs three turns; each snapshot lands in Oracle ---")
from locus.core.events import TerminateEvent # noqa: PLC0415
parent: str | None = None
for i, prompt in enumerate(_TURNS, start=1):
# The agent's per-iteration checkpointer keeps the live thread
# going (turn N sees turn N-1's messages). We additionally
# snapshot the post-turn state into the versioned saver below.
# Using agent.run() (async generator) directly — we're already
# inside an event loop, so agent.run_sync would clash.
final = ""
stop_reason = "complete"
async for ev in agent.run(prompt, thread_id=THREAD_ID):
if isinstance(ev, TerminateEvent):
final = ev.final_message or ""
stop_reason = ev.reason or "complete"
state = agent._last_run_state # type: ignore[attr-defined] # set by run()
if state is None:
raise RuntimeError("agent.run() did not populate _last_run_state")
cid = f"v{i}"
snapshot = state.to_checkpoint()
await saver.put(
thread_id=THREAD_ID,
checkpoint_id=cid,
parent_checkpoint_id=parent,
checkpoint_data=snapshot,
metadata={
"turn": i,
"iterations": len(state.messages),
"stop_reason": str(stop_reason),
},
)
print(f"\n Turn {i} ({cid}, parent={parent!r}):")
print(f" User : {prompt}")
print(f" Agent: {final.strip()[:160]}")
print(
f" saver.put -> checkpoint_id={cid!r}, "
f"messages_in_snapshot={len(snapshot.get('messages', []))}"
)
parent = cid
async def list_lineage(saver: OracleCheckpointSaver) -> None:
print("\n--- 2. list_checkpoints — the agent's full lineage ---")
history = await saver.list_checkpoints(thread_id=THREAD_ID)
print(f" list_checkpoints({THREAD_ID!r}) -> {len(history)} rows (newest first):")
for row in history:
cid = row["checkpoint_id"]
parent = row["parent_checkpoint_id"]
meta = row.get("metadata") or {}
print(
f" - id={cid!r} parent={parent!r} "
f"turn={meta.get('turn')!r} stop_reason={meta.get('stop_reason')!r}"
)
async def rewind_to_v1(saver: OracleCheckpointSaver) -> None:
"""Pull a specific historical checkpoint back as an ``AgentState``."""
print("\n--- 3. get(checkpoint_id='v1') — rewind read ---")
row = await saver.get(thread_id=THREAD_ID, checkpoint_id="v1")
if row is None:
print(" no v1 row found (saver state inconsistent)")
return
rehydrated = AgentState.from_checkpoint(row["checkpoint"])
print(f" hydrated AgentState: {len(rehydrated.messages)} messages")
for m in rehydrated.messages:
role = getattr(m, "role", "?")
text = (getattr(m, "content", "") or "").replace("\n", " ")[:80]
print(f" [{role}] {text}")
print(
"\n This is enough to spin up an alternate-branch agent on a new "
"thread by writing the snapshot back into its checkpointer. The "
"lineage in 26ai is the source of truth — the live thread is "
"just one realisation of it."
)
async def write_pending(saver: OracleCheckpointSaver) -> None:
"""Demonstrate ``put_writes`` against the agent's latest checkpoint."""
print("\n--- 4. put_writes — intra-step pending writes against v3 ---")
await saver.put_writes(
thread_id=THREAD_ID,
checkpoint_id="v3",
task_id="status-page-update",
writes=[
("channel:status", "we are investigating"),
("channel:pagerduty", "warmed"),
],
)
writes = await saver.get_writes(thread_id=THREAD_ID, checkpoint_id="v3")
print(f" get_writes(thread={THREAD_ID!r}, checkpoint='v3') -> {len(writes)} rows:")
for w in writes:
print(
f" - task={w['task_id']!r} idx={w['idx']} "
f"channel={w['channel']!r} value={w['value']!r}"
)
print(
"\n put_writes is idempotent: a retry for the same (thread, ns, "
"checkpoint, task) replaces the prior set inside one transaction. "
"Safe to call from a tool that may execute twice on retry."
)
async def main() -> None:
missing = _missing_env()
if missing:
_print_skip_banner(missing)
return
print_config()
print("Opening pool against Oracle 26ai…")
saver = _build_saver()
# Live thread continuity comes from the single-row checkpointer
# (notebook 07's primitive); the versioned saver adds an audit
# trail on top. Both ride the same Autonomous Database wallet.
live_cp = oracle_checkpointer(
dsn=os.environ["ORACLE_DSN"],
user=os.environ["ORACLE_USER"],
password=os.environ["ORACLE_PASSWORD"],
wallet_location=os.path.expanduser(os.environ["ORACLE_WALLET"]),
wallet_password=os.environ.get("ORACLE_WALLET_PASSWORD", ""),
table_name=f"{TABLE_PREFIX}_live",
)
agent = Agent(
model=get_model(max_tokens=180),
system_prompt=(
"You are the on-call incident commander. Each turn, advance "
"the incident: clarify what's known, identify likely causes, "
"and draft the next public update. Stay concise — one short "
"paragraph per turn."
),
checkpointer=live_cp,
checkpoint_every_n_iterations=1,
)
try:
await run_agent_with_versioned_history(agent, saver)
await list_lineage(saver)
await rewind_to_v1(saver)
await write_pending(saver)
await saver.delete_thread(THREAD_ID)
print(
"\nThe agent ran live; Oracle 26ai kept the full lineage of "
"its state. Use the single-row oracle_checkpointer (notebook "
"07) for live durability and OracleCheckpointSaver for the "
"auditable version history on top — same wallet, same pool."
)
finally:
print("\n--- Cleanup: drop the demo tables ---")
await _drop_tables(saver)
# Also delete the live thread row from the single-row checkpointer.
try:
await live_cp.delete(THREAD_ID)
except Exception as exc: # pragma: no cover — best-effort
print(f" (cleanup) live_cp.delete skipped: {exc}")
await saver.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
sys.exit(130)