Skip to content

Agent Memory

Give an agent a checkpointer and every conversation turn is persisted to a real database. Restart the process, attach a new agent to the same ADB and the same thread_id, and the conversation resumes — messages, tool history, confidence score and all.

What you'll learn:

  • Building an oracle_checkpointer against an Autonomous Database 26ai.
  • Keying conversations with thread_id.
  • Writing a checkpoint after every iteration so a crash mid-tool-call still recovers.
  • Loading the saved AgentState and inspecting it field by field.
  • Running many independent threads against a single ADB.

This notebook does not fall back to in-memory storage — Oracle is the only backend exercised here.

Run it:

export ORACLE_DSN=mydb_low
export ORACLE_USER=locus_app
export ORACLE_PASSWORD='<app-password>'
export ORACLE_WALLET=~/.oci/wallets/mydb
export ORACLE_WALLET_PASSWORD='<wallet-pw>'  # if encrypted
.venv/bin/python examples/notebook_16_agent_memory.py

If those env vars are unset the script prints a skip banner and exits 0 — convenient for CI. The agent's model goes through the default OCI Generative AI provider (canonical id: openai.gpt-4.1 or meta.llama-3.3-70b-instruct). For offline runs set LOCUS_MODEL_PROVIDER=mock; OpenAI, Anthropic and Ollama also work as the LLM.

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 10: agent memory backed by Oracle Autonomous Database 26ai.

Give an agent a checkpointer and every turn is written to a real
database. Restart the process, attach a fresh agent to the same
connection, hand it the same ``thread_id``, and the conversation picks
up where it left off. This is the production memory story for Locus.

Key ideas:
- ``oracle_checkpointer(...)`` opens a connection pool to an Autonomous
  Database 26ai and stores agent state in a table you name.
- ``thread_id`` keys conversations — one ADB can host many independent
  threads side by side.
- ``checkpoint_every_n_iterations=1`` writes after every loop iteration,
  so a crash mid-tool-call still resumes cleanly.
- The saved state is rich: messages, tool history, iteration count,
  Reflexion confidence — all loadable for inspection.

Run it:
    export ORACLE_DSN=mydb_low                   # tnsnames alias
    export ORACLE_USER=locus_app
    export ORACLE_PASSWORD='<app-password>'
    export ORACLE_WALLET=~/.oci/wallets/mydb
    export ORACLE_WALLET_PASSWORD='<wallet-pw>'  # if the wallet is encrypted
    .venv/bin/python examples/notebook_16_agent_memory.py

The agent itself goes through the default OCI Generative AI provider
(canonical pick: ``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``).
Set ``LOCUS_MODEL_PROVIDER=mock`` to bypass the model for offline runs;
OpenAI, Anthropic, and Ollama also work as the LLM.

If the Oracle env vars are missing the script prints a skip banner and
exits cleanly — it never falls back to an in-memory checkpointer.
"""

import asyncio
import os
import sys

from config import get_model, print_config

from locus.agent import Agent
from locus.memory.backends import oracle_checkpointer
from locus.tools import tool


_REQUIRED_ENV = (
    "ORACLE_DSN",
    "ORACLE_USER",
    "ORACLE_PASSWORD",
    "ORACLE_WALLET",
)


def _missing_env() -> list[str]:
    return [name for name in _REQUIRED_ENV if not os.environ.get(name)]


def _build_checkpointer(table_suffix: str = "default"):
    """Build an oracle_checkpointer against the ADB described by env vars."""
    return oracle_checkpointer(
        dsn=os.environ["ORACLE_DSN"],
        user=os.environ["ORACLE_USER"],
        password=os.environ["ORACLE_PASSWORD"],
        wallet_location=os.path.expanduser(os.environ["ORACLE_WALLET"]),
        wallet_password=os.environ.get("ORACLE_WALLET_PASSWORD", ""),
        table_name=f"locus_notebook_16_{table_suffix}",
    )


# =============================================================================
# Part 1: a two-turn conversation that survives in Oracle
# =============================================================================


def example_conversation_memory():
    """Same thread_id across two run_sync calls — the agent recalls turn one."""
    print("=== Part 1: Conversation memory (Oracle 26ai) ===\n")

    model = get_model(max_tokens=100)
    checkpointer = _build_checkpointer("convo")

    agent = Agent(
        model=model,
        system_prompt="You are a helpful assistant. Remember what the user tells you.",
        checkpointer=checkpointer,
    )

    thread_id = "conversation_001"

    result1 = agent.run_sync("My name is Alice.", thread_id=thread_id)
    print("User: My name is Alice.")
    print(f"Agent: {result1.message}")

    # Same thread_id — the agent loads prior state from Oracle before
    # the next model call.
    result2 = agent.run_sync("What's my name?", thread_id=thread_id)
    print("\nUser: What's my name?")
    print(f"Agent: {result2.message}")
    print()


# =============================================================================
# Part 2: write a checkpoint after every iteration
# =============================================================================


_NOTES: list[str] = []


@tool
def save_note(content: str) -> str:
    """Save a note for later reference."""
    _NOTES.append(content)
    return f"Note saved: {content}"


@tool
def get_notes() -> str:
    """Get all saved notes."""
    if not _NOTES:
        return "No notes saved yet."
    lines = "\n".join(f"- {n}" for n in _NOTES)
    return f"You have {len(_NOTES)} note(s):\n{lines}"


def example_checkpointing_with_tools():
    """checkpoint_every_n_iterations=1 means a mid-loop crash still recovers."""
    print("=== Part 2: Checkpoint after each iteration ===\n")

    model = get_model(max_tokens=150)
    checkpointer = _build_checkpointer("notes")

    agent = Agent(
        model=model,
        tools=[save_note, get_notes],
        system_prompt="You are a note-taking assistant.",
        checkpointer=checkpointer,
        checkpoint_every_n_iterations=1,
    )

    thread_id = "notes_session"

    result1 = agent.run_sync("Save a note: Buy groceries", thread_id=thread_id)
    print("User: Save a note: Buy groceries")
    print(f"Agent: {result1.message}")
    print(f"Tool calls: {result1.metrics.tool_calls}")

    result2 = agent.run_sync("What notes do I have?", thread_id=thread_id)
    print("\nUser: What notes do I have?")
    print(f"Agent: {result2.message}")
    print()


# =============================================================================
# Part 3: a fresh agent picks up where the old one stopped
# =============================================================================


def example_persistence_across_processes():
    """Two Agent objects, one DSN. The second loads the first's state from Oracle."""
    print("=== Part 3: Cross-process persistence ===\n")

    model = get_model(max_tokens=100)

    agent1 = Agent(
        model=model,
        system_prompt="You are a helpful assistant.",
        checkpointer=_build_checkpointer("persist"),
    )

    thread_id = "persistent_chat"

    result1 = agent1.run_sync("Remember: The secret code is 42.", thread_id=thread_id)
    print("User: Remember: The secret code is 42.")
    print(f"Agent: {result1.message}")

    # Pretend the process restarted. A *new* Agent + checkpointer object
    # opens its own pool against the same ADB and reads back the state.
    agent2 = Agent(
        model=model,
        system_prompt="You are a helpful assistant.",
        checkpointer=_build_checkpointer("persist"),
    )

    result2 = agent2.run_sync("What was the secret code?", thread_id=thread_id)
    print("\n[New process — same Oracle DSN]")
    print("User: What was the secret code?")
    print(f"Agent: {result2.message}")
    print()


# =============================================================================
# Part 4: many threads sharing one database
# =============================================================================


def example_multiple_threads():
    """Two users, two thread_ids, one ADB — no cross-talk."""
    print("=== Part 4: Multiple threads ===\n")

    model = get_model(max_tokens=100)
    checkpointer = _build_checkpointer("multi")

    agent = Agent(
        model=model,
        system_prompt="You are a helpful assistant.",
        checkpointer=checkpointer,
    )

    thread_alice = "thread_alice"
    thread_bob = "thread_bob"

    agent.run_sync("I'm Alice and I like pizza.", thread_id=thread_alice)
    agent.run_sync("I'm Bob and I like sushi.", thread_id=thread_bob)

    result_alice = agent.run_sync("What's my favorite food?", thread_id=thread_alice)
    print("Thread 'alice': What's my favorite food?")
    print(f"Agent: {result_alice.message}")

    result_bob = agent.run_sync("What's my favorite food?", thread_id=thread_bob)
    print("\nThread 'bob': What's my favorite food?")
    print(f"Agent: {result_bob.message}")
    print()


# =============================================================================
# Part 5: load and walk a saved checkpoint
# =============================================================================


async def example_inspect_checkpoint():
    """Load the persisted AgentState directly and look at every field.

    Reflexion's confidence score only moves when tools succeed, so the
    inspector gets a ``record_fact`` tool. After two turns that each
    fire a tool call, ``state.confidence`` should be > 0.
    """
    print("=== Part 5: Inspecting an Oracle-backed checkpoint ===\n")

    model = get_model(max_tokens=200)
    checkpointer = _build_checkpointer("inspect")

    _facts: list[str] = []

    @tool
    def record_fact(fact: str) -> str:
        """Persist a fact the user has shared so the agent can remember it."""
        _facts.append(fact)
        return f"Recorded fact #{len(_facts)}: {fact}"

    agent = Agent(
        agent_id="inspector",
        model=model,
        system_prompt=(
            "You are a helpful assistant. Whenever the user shares a fact "
            "about themselves (their name, job, hobbies, etc.), call "
            "record_fact exactly once with a short summary of that fact, "
            "then reply naturally."
        ),
        tools=[record_fact],
        checkpointer=checkpointer,
        reflexion=True,
    )

    thread_id = "inspect_thread"

    agent.run_sync("Hello, my name is Charlie.", thread_id=thread_id)
    agent.run_sync("I work as a data scientist.", thread_id=thread_id)

    state = await checkpointer.load(thread_id)

    if state:
        print(f"Thread ID: {thread_id}")
        print(f"Agent ID: {state.agent_id}")
        print(f"Iteration: {state.iteration}")
        print(f"Message count: {len(state.messages)}")
        print(f"Tool calls so far: {len(state.tool_history)}")
        print(f"Confidence: {state.confidence:.2f}")
        print(f"Confidence history: {[round(c, 2) for c in state.confidence_history]}")

        print("\nMessages:")
        for i, msg in enumerate(state.messages):
            content = (
                msg.content[:50] + "..." if msg.content and len(msg.content) > 50 else msg.content
            )
            print(f"  {i}. [{msg.role.value}] {content}")

    print(f"\nFacts recorded by record_fact: {_facts}")
    print()


# =============================================================================
# Main
# =============================================================================


def _print_skip_banner(missing: list[str]) -> None:
    print("\n--- Notebook 10: Agent Memory & Checkpointing ---")
    print(
        "Required environment variables not set; skipping the live demo "
        "so this file still runs cleanly in CI.\n"
    )
    for name in missing:
        print(f"  - {name}")
    print(
        "\nProvision an Autonomous Database 26ai, drop its wallet on disk, "
        "set the variables above and re-run."
    )


def main():
    """Run all notebook parts."""
    print("=" * 60)
    print("Notebook 10: Agent Memory & Checkpointing on Oracle 26ai")
    print("=" * 60)
    print()

    missing = _missing_env()
    if missing:
        _print_skip_banner(missing)
        return

    print_config()
    print()

    example_conversation_memory()
    example_checkpointing_with_tools()
    example_persistence_across_processes()
    example_multiple_threads()
    asyncio.run(example_inspect_checkpoint())

    print("=" * 60)
    print("Next: Notebook 11 — Agent Streaming")
    print("=" * 60)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        sys.exit(130)