Skip to content

DeepAgent

Two complementary primitives for long-horizon research:

  • create_deepagent — a plain Agent with reflexion + grounding, typed termination, and optional filesystem / todo / subagent layers. Best for single-agent loops.
  • create_research_workflow — a StateGraph with a post-execution quality loop: execute (ReAct) → summarize → grounding eval → replan if needed. Best for production research where you need verifiable, grounded summaries.

Factory — single agent

create_deepagent

create_deepagent(*, model: str | Any, tools: list[Any], system_prompt: str, output_schema: type[BaseModel] | None = None, submit_tool: str = 'submit_research', min_confidence: float = 0.8, max_tokens: int = 80000, max_iterations: int = 40, reflexion: bool = True, grounding: bool = True, checkpointer: Any | None = None, enable_filesystem: bool = False, backend: Any | None = None, enable_todos: bool = False, todo_state: Any | None = None, memory_files: list[str] | tuple[str, ...] | None = None, subagents: list[Any] | None = None, summarize_after_messages: int | None = None, summarize_keep_recent: int = 10, **agent_kwargs: Any) -> Any

Construct a research-shaped locus.Agent.

Parameters:

Name Type Description Default
model str | Any

A locus model string ("oci:openai.gpt-5.5") or a ModelProtocol instance built via locus.models.get_model.

required
tools list[Any]

All tools the agent can call — MCP-derived, @tool-decorated Python tools, and the submit_tool that ends the loop.

required
system_prompt str

The agent's identity, rules, and contract for calling submit_tool with the structured payload.

required
output_schema type[BaseModel] | None

Pydantic model the agent's submit_tool payload must validate against. Locus uses the model provider's strict structured-output mode to enforce it.

None
submit_tool str

The tool name whose call signals "I'm done — here is my structured answer". Default submit_research.

'submit_research'
min_confidence float

Confidence threshold the submission must clear for early-exit. Default 0.8.

0.8
max_tokens int

Token budget. Default 80k.

80000
max_iterations int

Cap on reasoning steps. Default 40.

40
reflexion bool

Self-critique pass after each step. Default True.

True
grounding bool

Citation-grounding eval against tool-call evidence. Default True.

True
checkpointer Any | None

Optional locus.memory checkpointer for resume. Default None (no persistence).

None
enable_filesystem bool

When True, attaches the six filesystem-as- memory tools (write_file, read_file, ls, edit_file, glob, grep) to the agent's tool list so it can use a scratchspace for intermediate work. Default False.

False
backend Any | None

Optional :class:BackendProtocol used by the FS tools. Honored only when enable_filesystem=True. Defaults to a fresh :class:StateBackend (in-memory, ephemeral, scoped to the agent run). Pass a :class:FilesystemBackend for real-disk persistence.

None
enable_todos bool

When True, attaches write_todos and read_todos tools backed by an in-memory :class:TodoState. Lets the agent maintain a structured task list across reasoning steps. Default False.

False
todo_state Any | None

Optional pre-built :class:TodoState. Honored only when enable_todos=True. Pass one to inspect the list externally after the agent runs.

None
memory_files list[str] | tuple[str, ...] | None

Optional list of AGENTS.md-style Markdown file paths whose contents are joined and prepended to the system_prompt. Missing paths are skipped silently (so defaults like ["~/AGENTS.md", "./AGENTS.md"] work without checking each one).

None
subagents list[Any] | None

Optional list of :class:SubAgentDef declaring subagents the parent can spawn mid-run via a task() tool. Each subagent runs as a stateless one-shot.

None
summarize_after_messages int | None

If set, attaches locus's :class:SummarizingManager so older messages are condensed once the conversation exceeds this count. Recent summarize_keep_recent messages are always preserved verbatim. Default None (no summarization — all messages kept verbatim, may blow context on long runs).

None
summarize_keep_recent int

How many recent messages SummarizingManager preserves untouched. Default 10. Honored only when summarize_after_messages is set.

10
**agent_kwargs Any

Forwarded to locus.Agent for advanced knobs (hooks, conversation_manager, plugins, …).

{}

Returns:

Type Description
Any

A configured locus.Agent ready for agent.run(prompt)

Any

or agent.run_sync(prompt).

Source code in src/locus/deepagent/factory.py
def create_deepagent(
    *,
    model: str | Any,
    tools: list[Any],
    system_prompt: str,
    output_schema: type[BaseModel] | None = None,
    submit_tool: str = "submit_research",
    min_confidence: float = 0.8,
    max_tokens: int = 80_000,
    max_iterations: int = 40,
    reflexion: bool = True,
    grounding: bool = True,
    checkpointer: Any | None = None,
    enable_filesystem: bool = False,
    backend: Any | None = None,
    enable_todos: bool = False,
    todo_state: Any | None = None,
    memory_files: list[str] | tuple[str, ...] | None = None,
    subagents: list[Any] | None = None,
    summarize_after_messages: int | None = None,
    summarize_keep_recent: int = 10,
    **agent_kwargs: Any,
) -> Any:
    """Construct a research-shaped ``locus.Agent``.

    Args:
        model: A locus model string (``"oci:openai.gpt-5.5"``) or a
            ``ModelProtocol`` instance built via
            ``locus.models.get_model``.
        tools: All tools the agent can call — MCP-derived,
            ``@tool``-decorated Python tools, and the ``submit_tool``
            that ends the loop.
        system_prompt: The agent's identity, rules, and contract for
            calling ``submit_tool`` with the structured payload.
        output_schema: Pydantic model the agent's ``submit_tool``
            payload must validate against. Locus uses the model
            provider's strict structured-output mode to enforce it.
        submit_tool: The tool name whose call signals "I'm done — here
            is my structured answer". Default ``submit_research``.
        min_confidence: Confidence threshold the submission must clear
            for early-exit. Default 0.8.
        max_tokens: Token budget. Default 80k.
        max_iterations: Cap on reasoning steps. Default 40.
        reflexion: Self-critique pass after each step. Default True.
        grounding: Citation-grounding eval against tool-call evidence.
            Default True.
        checkpointer: Optional ``locus.memory`` checkpointer for
            resume. Default None (no persistence).
        enable_filesystem: When True, attaches the six filesystem-as-
            memory tools (``write_file``, ``read_file``, ``ls``,
            ``edit_file``, ``glob``, ``grep``) to the agent's tool list
            so it can use a scratchspace for intermediate work. Default
            False.
        backend: Optional :class:`BackendProtocol` used by the FS
            tools. Honored only when ``enable_filesystem=True``.
            Defaults to a fresh :class:`StateBackend` (in-memory,
            ephemeral, scoped to the agent run). Pass a
            :class:`FilesystemBackend` for real-disk persistence.
        enable_todos: When True, attaches ``write_todos`` and
            ``read_todos`` tools backed by an in-memory
            :class:`TodoState`. Lets the agent maintain a structured
            task list across reasoning steps. Default False.
        todo_state: Optional pre-built :class:`TodoState`. Honored
            only when ``enable_todos=True``. Pass one to inspect the
            list externally after the agent runs.
        memory_files: Optional list of ``AGENTS.md``-style Markdown
            file paths whose contents are joined and prepended to the
            ``system_prompt``. Missing paths are skipped silently
            (so defaults like ``["~/AGENTS.md", "./AGENTS.md"]`` work
            without checking each one).
        subagents: Optional list of :class:`SubAgentDef` declaring
            subagents the parent can spawn mid-run via a ``task()``
            tool. Each subagent runs as a stateless one-shot.
        summarize_after_messages: If set, attaches locus's
            :class:`SummarizingManager` so older messages are
            condensed once the conversation exceeds this count.
            Recent ``summarize_keep_recent`` messages are always
            preserved verbatim. Default ``None`` (no summarization
            — all messages kept verbatim, may blow context on long
            runs).
        summarize_keep_recent: How many recent messages
            ``SummarizingManager`` preserves untouched. Default 10.
            Honored only when ``summarize_after_messages`` is set.
        **agent_kwargs: Forwarded to ``locus.Agent`` for advanced
            knobs (hooks, conversation_manager, plugins, …).

    Returns:
        A configured ``locus.Agent`` ready for ``agent.run(prompt)``
        or ``agent.run_sync(prompt)``.
    """
    from locus.agent.agent import (
        Agent,
    )  # direct import — avoids the lazy-import-as-object mypy false positive
    from locus.core.termination import (
        ConfidenceMet,
        MaxIterations,
        TokenLimit,
        ToolCalled,
    )

    termination = (
        (ToolCalled(submit_tool) & ConfidenceMet(min_confidence))
        | TokenLimit(max_tokens)
        | MaxIterations(max_iterations)
    )

    # Splice filesystem-as-memory tools into the user-supplied list
    # before constructing the Agent. The default backend is an
    # ephemeral in-memory StateBackend so callers who flip the flag
    # don't have to think about cleanup.
    final_tools = list(tools)
    if enable_filesystem:
        from locus.deepagent.backends import StateBackend
        from locus.deepagent.tools import make_filesystem_tools

        fs_backend = backend if backend is not None else StateBackend()
        final_tools = [*final_tools, *make_filesystem_tools(fs_backend)]

    if enable_todos:
        from locus.deepagent.todos import TodoState, make_todo_tools

        td_state = todo_state if todo_state is not None else TodoState()
        final_tools = [*final_tools, *make_todo_tools(td_state)]

    # Subagent dispatch: attach a single ``task()`` tool the parent
    # can call to spawn one-shot subagents mid-run. The tool's catalog
    # of available subagents is implicit in its docstring; callers
    # who want it surfaced in the parent's system prompt can do so
    # via ``memory_files`` or by appending to ``system_prompt``.
    if subagents:
        from locus.deepagent.subagent import task_tool

        final_tools = [
            *final_tools,
            task_tool(subagents, parent_model=model),
        ]

    # Memory files: prepend to the system prompt so AGENTS.md-style
    # instructions land in front of the recipe-specific identity
    # block. Layered so users can stack base / user / project files.
    final_system_prompt = system_prompt
    if memory_files:
        from locus.deepagent.memory import load_agents_md

        memory_block = load_agents_md(list(memory_files))
        if memory_block:
            final_system_prompt = f"{memory_block}\n\n---\n\n{system_prompt}"

    # Summarization: thin pass-through to locus's SummarizingManager.
    # Active only when the caller asks for it; otherwise the agent
    # keeps every message (default locus behavior). Tier-3 knob —
    # avoids reinventing what locus already ships.
    conversation_manager: Any | None = None
    if summarize_after_messages is not None:
        from locus.memory.conversation import SummarizingManager

        conversation_manager = SummarizingManager(
            threshold=summarize_after_messages,
            keep_recent=summarize_keep_recent,
        )

    kwargs: dict[str, Any] = {
        "model": model,
        "tools": final_tools,
        "system_prompt": final_system_prompt,
        "max_iterations": max_iterations,
        "reflexion": reflexion,
        "grounding": grounding,
    }
    if conversation_manager is not None:
        kwargs["conversation_manager"] = conversation_manager
    if output_schema is not None:
        kwargs["output_schema"] = output_schema
    if checkpointer is not None:
        kwargs["checkpointer"] = checkpointer
    kwargs.update(agent_kwargs)

    agent = Agent(**kwargs)
    # Locus's Agent constructor accepts ``max_iterations`` but the
    # typed ``termination`` is the load-bearing exit criterion; attach
    # it via the public AgentConfig setter so the algebra runs.
    config = getattr(agent, "config", None)
    if config is not None and hasattr(config, "termination"):
        config.termination = termination
    return agent

Research workflow — StateGraph with quality loop

create_research_workflow

create_research_workflow(*, model: Any, tools: list[Any], system_prompt: str = '', output_schema: type[BaseModel] | None = None, grounding_threshold: float = 0.65, max_replans: int = 2, max_regenerations: int = 1, max_iterations: int = 20, summarization_model: Any | None = None, grounding_model: Any | None = None, reflexion: bool = True, causal_inference: bool = True, checkpointer: Any | None = None) -> Any

Compose the standard research workflow from individual node primitives.

Graph topology::

START → execute → [causal_inference →] summarize → grounding_eval
                                                        ↓
                                    score ≥ threshold → END
                                    regens < max      → regenerate → grounding_eval
                                    else              → replan → execute

Parameters:

Name Type Description Default
model Any

Primary model for execute + causal inference phases.

required
tools list[Any]

Tools available to the execute agent.

required
system_prompt str

Domain identity for the execute agent.

''
output_schema type[BaseModel] | None

Optional Pydantic model for structured output.

None
grounding_threshold float

Minimum grounding score to accept (default 0.65).

0.65
max_replans int

Maximum full-execute retries (default 2).

2
max_regenerations int

Maximum lightweight summary rewrites (default 1).

1
max_iterations int

Max ReAct iterations per execute phase (default 20).

20
summarization_model Any | None

Model for summarize + regenerate nodes.

None
grounding_model Any | None

Model for grounding_eval node.

None
reflexion bool

Enable per-turn self-evaluation in execute agent.

True
causal_inference bool

Insert causal_inference node before summarize.

True
checkpointer Any | None

Optional locus checkpointer for the StateGraph.

None

Returns:

Type Description
Any

A compiled locus.StateGraph.

Source code in src/locus/deepagent/workflow.py
def create_research_workflow(
    *,
    model: Any,
    tools: list[Any],
    system_prompt: str = "",
    output_schema: type[BaseModel] | None = None,
    grounding_threshold: float = 0.65,
    max_replans: int = 2,
    max_regenerations: int = 1,
    max_iterations: int = 20,
    summarization_model: Any | None = None,
    grounding_model: Any | None = None,
    reflexion: bool = True,
    causal_inference: bool = True,
    checkpointer: Any | None = None,
) -> Any:
    """Compose the standard research workflow from individual node primitives.

    Graph topology::

        START → execute → [causal_inference →] summarize → grounding_eval

                                            score ≥ threshold → END
                                            regens < max      → regenerate → grounding_eval
                                            else              → replan → execute

    Args:
        model: Primary model for execute + causal inference phases.
        tools: Tools available to the execute agent.
        system_prompt: Domain identity for the execute agent.
        output_schema: Optional Pydantic model for structured output.
        grounding_threshold: Minimum grounding score to accept (default 0.65).
        max_replans: Maximum full-execute retries (default 2).
        max_regenerations: Maximum lightweight summary rewrites (default 1).
        max_iterations: Max ReAct iterations per execute phase (default 20).
        summarization_model: Model for summarize + regenerate nodes.
        grounding_model: Model for grounding_eval node.
        reflexion: Enable per-turn self-evaluation in execute agent.
        causal_inference: Insert causal_inference node before summarize.
        checkpointer: Optional locus checkpointer for the StateGraph.

    Returns:
        A compiled ``locus.StateGraph``.
    """
    from locus.multiagent.graph import END, START, StateGraph  # noqa: PLC0415

    _sum_model = summarization_model or model
    _grd_model = grounding_model or model

    graph = StateGraph()

    graph.add_node(
        "execute",
        make_execute_node(model, tools, system_prompt, reflexion, max_iterations),
    )
    if causal_inference:
        graph.add_node("causal_inference", make_causal_inference_node(model))
    graph.add_node("summarize", make_summarize_node(_sum_model, output_schema))
    graph.add_node("grounding_eval", make_grounding_eval_node(_grd_model))
    graph.add_node("regenerate", make_regenerate_summary_node(_sum_model, output_schema))
    graph.add_node("replan", make_replan_node())

    router = route_after_grounding(grounding_threshold, max_replans, max_regenerations)

    graph.add_edge(START, "execute")
    if causal_inference:
        graph.add_edge("execute", "causal_inference")
        graph.add_edge("causal_inference", "summarize")
    else:
        graph.add_edge("execute", "summarize")
    graph.add_edge("summarize", "grounding_eval")
    graph.add_conditional_edges(
        "grounding_eval",
        router,
        {"regenerate": "regenerate", "replan": "replan", END: END},
    )
    graph.add_edge("regenerate", "grounding_eval")
    graph.add_edge("replan", "execute")

    kwargs: dict[str, Any] = {}
    if checkpointer:
        kwargs["checkpointer"] = checkpointer

    return graph.compile(**kwargs)

make_execute_node

make_execute_node(model: Any, tools: list[Any], system_prompt: str = '', reflexion: bool = True, max_iterations: int = 20) -> Any

Return an async node that runs the ReAct agent loop.

Reads: KEY_EXECUTE_PROMPT (falls back to KEY_PROMPT) Writes: KEY_EVIDENCE, KEY_GROUNDING_FACTS

Each tool result is appended to evidence as a raw string and also stored in grounding_facts with a stable fact_id so downstream nodes (grounding_eval, regenerate) can cite specific pieces of evidence.

Source code in src/locus/deepagent/workflow.py
def make_execute_node(
    model: Any,
    tools: list[Any],
    system_prompt: str = "",
    reflexion: bool = True,
    max_iterations: int = 20,
) -> Any:
    """Return an async node that runs the ReAct agent loop.

    Reads:  ``KEY_EXECUTE_PROMPT`` (falls back to ``KEY_PROMPT``)
    Writes: ``KEY_EVIDENCE``, ``KEY_GROUNDING_FACTS``

    Each tool result is appended to ``evidence`` as a raw string and also
    stored in ``grounding_facts`` with a stable ``fact_id`` so downstream
    nodes (grounding_eval, regenerate) can cite specific pieces of evidence.
    """
    from locus.agent.agent import Agent  # noqa: PLC0415
    from locus.core.events import TerminateEvent, ToolCompleteEvent  # noqa: PLC0415
    from locus.observability.emit import (  # noqa: PLC0415
        EV_RESEARCH_EXECUTE_COMPLETED,
        EV_RESEARCH_EXECUTE_STARTED,
        emit,
    )

    base_prompt = system_prompt or (
        "You are a research agent. Use tools to investigate the given topic. "
        "Gather as much evidence as possible before concluding."
    )

    async def _execute(state: dict[str, Any]) -> dict[str, Any]:
        prompt = state.get(KEY_EXECUTE_PROMPT) or state.get(KEY_PROMPT, "")
        replan_count = state.get(KEY_REPLAN_COUNT, 0)

        await emit(EV_RESEARCH_EXECUTE_STARTED, prompt_preview=prompt[:120], replan=replan_count)

        agent = Agent(
            model=model,
            tools=tools,
            system_prompt=base_prompt,
            reflexion=reflexion,
            max_iterations=max_iterations,
        )

        evidence: list[str] = []
        grounding_facts: list[dict[str, Any]] = []
        fact_idx = 0

        async for event in agent.run(prompt):
            if isinstance(event, ToolCompleteEvent) and event.result:
                raw = str(event.result)[:2000]
                evidence.append(raw)
                grounding_facts.append(
                    {
                        "id": f"fact_{fact_idx:03d}",
                        "text": raw,
                        "source": event.tool_name,
                    }
                )
                fact_idx += 1
            elif isinstance(event, TerminateEvent) and event.final_message:
                conclusion = f"[conclusion] {event.final_message[:2000]}"
                evidence.append(conclusion)
                grounding_facts.append(
                    {
                        "id": f"fact_{fact_idx:03d}",
                        "text": conclusion,
                        "source": "agent_conclusion",
                    }
                )

        await emit(EV_RESEARCH_EXECUTE_COMPLETED, fact_count=len(grounding_facts))
        return {KEY_EVIDENCE: evidence, KEY_GROUNDING_FACTS: grounding_facts}

    return _execute

make_causal_inference_node

make_causal_inference_node(model: Any, max_nodes: int = 10) -> Any

Return an async node that builds a causal chain from evidence.

Uses an LLM call to extract causal events from the evidence, then constructs a locus.reasoning.causal.CausalChain and identifies the primary root-cause hypothesis.

Reads: KEY_EVIDENCE, KEY_PROMPT Writes: KEY_CAUSAL_CHAIN, KEY_CAUSAL_HYPOTHESIS, KEY_CAUSAL_CONFIDENCE

Source code in src/locus/deepagent/workflow.py
def make_causal_inference_node(
    model: Any,
    max_nodes: int = 10,
) -> Any:
    """Return an async node that builds a causal chain from evidence.

    Uses an LLM call to extract causal events from the evidence, then
    constructs a ``locus.reasoning.causal.CausalChain`` and identifies
    the primary root-cause hypothesis.

    Reads:  ``KEY_EVIDENCE``, ``KEY_PROMPT``
    Writes: ``KEY_CAUSAL_CHAIN``, ``KEY_CAUSAL_HYPOTHESIS``, ``KEY_CAUSAL_CONFIDENCE``
    """
    from locus.agent.agent import Agent  # noqa: PLC0415
    from locus.observability.emit import EV_RESEARCH_CAUSAL_BUILT, emit  # noqa: PLC0415
    from locus.reasoning.causal import build_causal_chain  # noqa: PLC0415

    async def _causal_inference(state: dict[str, Any]) -> dict[str, Any]:
        evidence = state.get(KEY_EVIDENCE, [])
        prompt = state.get(KEY_PROMPT, "")

        if not evidence:
            return {
                KEY_CAUSAL_CHAIN: None,
                KEY_CAUSAL_HYPOTHESIS: "",
                KEY_CAUSAL_CONFIDENCE: 0.0,
            }

        evidence_block = "\n".join(f"- {e[:500]}" for e in evidence[:20])
        causal_prompt = (
            f"Research goal: {prompt}\n\n"
            f"Evidence:\n{evidence_block}\n\n"
            f"Extract up to {max_nodes} causal events as a JSON array. "
            f'Each item: {{"label": "event description", "causes": ["prior event"], '
            f'"type": "root_cause|intermediate|symptom|unknown", "confidence": 0.0-1.0}}.\n'
            f"Order from root cause to final symptom. Return ONLY the JSON array."
        )

        extractor = Agent(
            model=model,
            system_prompt="You extract causal chains from evidence. Return only valid JSON.",
        )
        result = extractor.run_sync(causal_prompt)

        events: list[dict[str, Any]] = []
        try:
            raw = result.message or ""
            start = raw.find("[")
            end = raw.rfind("]") + 1
            if start >= 0 and end > start:
                events = json.loads(raw[start:end])
        except (json.JSONDecodeError, ValueError):
            pass

        if not events:
            return {
                KEY_CAUSAL_CHAIN: None,
                KEY_CAUSAL_HYPOTHESIS: "",
                KEY_CAUSAL_CONFIDENCE: 0.0,
            }

        chain = build_causal_chain(events)

        # Extract primary hypothesis from root-cause nodes
        from locus.reasoning.causal import NodeType  # noqa: PLC0415

        root_causes = [n for n in chain.nodes.values() if n.node_type == NodeType.ROOT_CAUSE]
        hypothesis = (
            root_causes[0].label
            if root_causes
            else (next(iter(chain.nodes.values())).label if chain.nodes else "")
        )
        confidence = root_causes[0].confidence if root_causes else 0.5

        await emit(
            EV_RESEARCH_CAUSAL_BUILT,
            node_count=len(chain.nodes),
            hypothesis_preview=hypothesis[:120],
            confidence=confidence,
        )
        return {
            KEY_CAUSAL_CHAIN: chain,
            KEY_CAUSAL_HYPOTHESIS: hypothesis,
            KEY_CAUSAL_CONFIDENCE: confidence,
        }

    return _causal_inference

make_summarize_node

make_summarize_node(model: Any, output_schema: type[BaseModel] | None = None) -> Any

Return an async node that distills evidence + causal context into a summary.

Incorporates the causal hypothesis (if present) so the summary narrative is causally grounded before claims are evaluated.

Reads: KEY_EVIDENCE, KEY_PROMPT, KEY_CAUSAL_HYPOTHESIS Writes: KEY_SUMMARY, KEY_STRUCTURED_OUTPUT (if output_schema set)

Source code in src/locus/deepagent/workflow.py
def make_summarize_node(
    model: Any,
    output_schema: type[BaseModel] | None = None,
) -> Any:
    """Return an async node that distills evidence + causal context into a summary.

    Incorporates the causal hypothesis (if present) so the summary narrative
    is causally grounded before claims are evaluated.

    Reads:  ``KEY_EVIDENCE``, ``KEY_PROMPT``, ``KEY_CAUSAL_HYPOTHESIS``
    Writes: ``KEY_SUMMARY``, ``KEY_STRUCTURED_OUTPUT`` (if output_schema set)
    """
    from locus.agent.agent import Agent  # noqa: PLC0415
    from locus.observability.emit import EV_RESEARCH_SUMMARIZE_COMPLETED, emit  # noqa: PLC0415

    async def _summarize(state: dict[str, Any]) -> dict[str, Any]:
        evidence = state.get(KEY_EVIDENCE, [])
        prompt = state.get(KEY_PROMPT, "")
        causal_hypothesis = state.get(KEY_CAUSAL_HYPOTHESIS, "")

        evidence_block = "\n\n".join(f"[{i + 1}] {e}" for i, e in enumerate(evidence))

        causal_context = f"\n\nCausal hypothesis: {causal_hypothesis}" if causal_hypothesis else ""

        schema_hint = ""
        if output_schema:
            schema_hint = (
                f"\n\nReturn your answer as a JSON object matching this schema:\n"
                f"{json.dumps(output_schema.model_json_schema(), indent=2)}"
            )

        summarize_prompt = (
            f"Research goal: {prompt}{causal_context}\n\n"
            f"Evidence:\n{evidence_block}\n\n"
            f"Write a concise, factually grounded summary of your findings. "
            f"Only assert what the evidence supports.{schema_hint}"
        )

        summarizer = Agent(
            model=model,
            system_prompt="You are a precise summarizer. Cite only what the evidence supports.",
            output_schema=output_schema,
        )
        result = summarizer.run_sync(summarize_prompt)

        update: dict[str, Any] = {KEY_SUMMARY: result.message or ""}
        if output_schema and result.parsed:
            update[KEY_STRUCTURED_OUTPUT] = result.parsed

        await emit(
            EV_RESEARCH_SUMMARIZE_COMPLETED,
            summary_length=len(result.message or ""),
            has_structured_output=bool(output_schema and result.parsed),
        )
        return update

    return _summarize

make_grounding_eval_node

make_grounding_eval_node(model: Any) -> Any

Return an async node that scores summary claims against evidence (LLM-as-judge).

Reads: KEY_SUMMARY, KEY_EVIDENCE Writes: KEY_GROUNDING_SCORE, KEY_UNGROUNDED_CLAIMS

Source code in src/locus/deepagent/workflow.py
def make_grounding_eval_node(model: Any) -> Any:
    """Return an async node that scores summary claims against evidence (LLM-as-judge).

    Reads:  ``KEY_SUMMARY``, ``KEY_EVIDENCE``
    Writes: ``KEY_GROUNDING_SCORE``, ``KEY_UNGROUNDED_CLAIMS``
    """
    from locus.observability.emit import EV_RESEARCH_GROUNDING_EVALUATED, emit  # noqa: PLC0415
    from locus.reasoning.grounding import GroundingEvaluator  # noqa: PLC0415

    evaluator = GroundingEvaluator()

    async def _grounding_eval(state: dict[str, Any]) -> dict[str, Any]:
        summary = state.get(KEY_SUMMARY, "")
        evidence = state.get(KEY_EVIDENCE, [])

        if not summary or not evidence:
            return {KEY_GROUNDING_SCORE: 0.0, KEY_UNGROUNDED_CLAIMS: []}

        claims = [s.strip() for s in summary.replace("\n", " ").split(".") if len(s.strip()) > 10]

        grounding_result = await evaluator.evaluate_with_llm(
            claims=claims,
            evidence=evidence,
            model=model,
        )

        await emit(
            EV_RESEARCH_GROUNDING_EVALUATED,
            score=grounding_result.score,
            claims_evaluated=len(claims),
            ungrounded_count=len(grounding_result.ungrounded_claims),
            requires_replan=grounding_result.requires_replan,
        )
        return {
            KEY_GROUNDING_SCORE: grounding_result.score,
            KEY_UNGROUNDED_CLAIMS: grounding_result.ungrounded_claims,
        }

    return _grounding_eval

make_regenerate_summary_node

make_regenerate_summary_node(model: Any, output_schema: type[BaseModel] | None = None) -> Any

Return an async node that rewrites the summary using grounding feedback.

Cheaper than a full replan: preserves all tool outputs and only re-synthesizes the narrative, targeting ungrounded claims specifically.

KEY_SUMMARY, KEY_EVIDENCE, KEY_UNGROUNDED_CLAIMS,

KEY_GROUNDING_FACTS, KEY_REGENERATION_COUNT

Writes: KEY_SUMMARY, KEY_STRUCTURED_OUTPUT, KEY_REGENERATION_COUNT

Source code in src/locus/deepagent/workflow.py
def make_regenerate_summary_node(
    model: Any,
    output_schema: type[BaseModel] | None = None,
) -> Any:
    """Return an async node that rewrites the summary using grounding feedback.

    Cheaper than a full replan: preserves all tool outputs and only
    re-synthesizes the narrative, targeting ungrounded claims specifically.

    Reads:  ``KEY_SUMMARY``, ``KEY_EVIDENCE``, ``KEY_UNGROUNDED_CLAIMS``,
            ``KEY_GROUNDING_FACTS``, ``KEY_REGENERATION_COUNT``
    Writes: ``KEY_SUMMARY``, ``KEY_STRUCTURED_OUTPUT``, ``KEY_REGENERATION_COUNT``
    """
    from locus.agent.agent import Agent  # noqa: PLC0415
    from locus.observability.emit import (  # noqa: PLC0415
        EV_RESEARCH_REGENERATE_COMPLETED,
        EV_RESEARCH_REGENERATE_STARTED,
        emit,
    )

    async def _regenerate(state: dict[str, Any]) -> dict[str, Any]:
        ungrounded = state.get(KEY_UNGROUNDED_CLAIMS, [])
        evidence = state.get(KEY_EVIDENCE, [])
        old_summary = state.get(KEY_SUMMARY, "")
        regen_count = state.get(KEY_REGENERATION_COUNT, 0)

        await emit(EV_RESEARCH_REGENERATE_STARTED, ungrounded_count=len(ungrounded))

        ungrounded_block = "\n".join(f"- {c}" for c in ungrounded[:8])
        evidence_block = "\n\n".join(f"[{i + 1}] {e}" for i, e in enumerate(evidence))

        schema_hint = ""
        if output_schema:
            schema_hint = (
                f"\n\nReturn as JSON matching:\n"
                f"{json.dumps(output_schema.model_json_schema(), indent=2)}"
            )

        regen_prompt = (
            f"Previous summary:\n{old_summary}\n\n"
            f"The following claims were NOT grounded by the evidence:\n"
            f"{ungrounded_block}\n\n"
            f"Evidence available:\n{evidence_block}\n\n"
            f"Rewrite the summary. Remove or qualify any ungrounded claims. "
            f"Only assert what the evidence explicitly supports.{schema_hint}"
        )

        agent = Agent(
            model=model,
            system_prompt="You are a precise editor. Remove ungrounded claims.",
            output_schema=output_schema,
        )
        result = agent.run_sync(regen_prompt)

        update: dict[str, Any] = {
            KEY_SUMMARY: result.message or old_summary,
            KEY_REGENERATION_COUNT: regen_count + 1,
        }
        if output_schema and result.parsed:
            update[KEY_STRUCTURED_OUTPUT] = result.parsed

        await emit(EV_RESEARCH_REGENERATE_COMPLETED, regeneration=regen_count + 1)
        return update

    return _regenerate

make_replan_node

make_replan_node() -> Any

Return an async node that generates a focused re-plan prompt.

Unlike regenerate_summary, this triggers a full execute phase with a narrower scope targeting the ungrounded claims.

Reads: KEY_PROMPT, KEY_UNGROUNDED_CLAIMS, KEY_REPLAN_COUNT Writes: KEY_EXECUTE_PROMPT, KEY_REPLAN_COUNT

Source code in src/locus/deepagent/workflow.py
def make_replan_node() -> Any:
    """Return an async node that generates a focused re-plan prompt.

    Unlike regenerate_summary, this triggers a full execute phase with a
    narrower scope targeting the ungrounded claims.

    Reads:  ``KEY_PROMPT``, ``KEY_UNGROUNDED_CLAIMS``, ``KEY_REPLAN_COUNT``
    Writes: ``KEY_EXECUTE_PROMPT``, ``KEY_REPLAN_COUNT``
    """

    from locus.observability.emit import EV_RESEARCH_REPLAN, emit  # noqa: PLC0415

    async def _replan(state: dict[str, Any]) -> dict[str, Any]:
        ungrounded = state.get(KEY_UNGROUNDED_CLAIMS, [])
        prompt = state.get(KEY_PROMPT, "")
        replan_count = state.get(KEY_REPLAN_COUNT, 0)

        if ungrounded:
            focused = "\n".join(f"- {c}" for c in ungrounded[:6])
            execute_prompt = (
                f"Previous investigation of '{prompt}' left these claims unverified:\n"
                f"{focused}\n\n"
                f"Gather specific evidence to verify or refute each claim. "
                f"Focus only on the gaps above."
            )
        else:
            execute_prompt = (
                f"Re-investigate '{prompt}' with a focus on gathering "
                f"stronger, more specific evidence."
            )

        await emit(
            EV_RESEARCH_REPLAN,
            replan=replan_count + 1,
            ungrounded_count=len(ungrounded),
            prompt_preview=execute_prompt[:120],
        )
        return {
            KEY_EXECUTE_PROMPT: execute_prompt,
            KEY_REPLAN_COUNT: replan_count + 1,
        }

    return _replan

route_after_grounding

route_after_grounding(threshold: float = 0.65, max_replans: int = 2, max_regenerations: int = 1) -> Any

Return a routing function for the grounding_eval conditional edge.

Recovery strategy (mirrors Optic's two-level approach): 1. First failure → "regenerate" (cheap: rewrite without re-running tools) 2. Subsequent → "replan" (expensive: full execute retry) 3. Limits reached → END

Compatible with StateGraph.add_conditional_edges.

Source code in src/locus/deepagent/workflow.py
def route_after_grounding(
    threshold: float = 0.65,
    max_replans: int = 2,
    max_regenerations: int = 1,
) -> Any:
    """Return a routing function for the grounding_eval conditional edge.

    Recovery strategy (mirrors Optic's two-level approach):
    1. First failure  → ``"regenerate"``  (cheap: rewrite without re-running tools)
    2. Subsequent     → ``"replan"``      (expensive: full execute retry)
    3. Limits reached → ``END``

    Compatible with ``StateGraph.add_conditional_edges``.
    """
    from locus.multiagent.graph import END  # noqa: PLC0415

    def _route(state: dict[str, Any]) -> str:
        score = state.get(KEY_GROUNDING_SCORE, 0.0)
        replans = state.get(KEY_REPLAN_COUNT, 0)
        regens = state.get(KEY_REGENERATION_COUNT, 0)

        if score >= threshold:
            return END
        if replans >= max_replans and regens >= max_regenerations:
            return END
        if regens < max_regenerations:
            return "regenerate"
        return "replan"

    return _route

Subagents

SubAgentDef

Bases: BaseModel

Declarative definition of a subagent the parent can spawn.

Mirrors deepagents' SubAgent TypedDict, mapped to a Pydantic model so locus's typed-config conventions hold.

task_tool

task_tool(subagents: list[SubAgentDef], *, parent_model: Any) -> Any

Build a single task() tool the parent agent can call.

The tool's parameters are flat: subagent_type (str) and description (str). On call:

  1. Look up subagent_type in the registered subagents.
  2. Spawn a fresh locus.Agent with the subagent's tools, prompt, and (optional) model override.
  3. Run the subagent on description using agent.run() and capture the final TerminateEvent.final_message.
  4. Return that string as the tool's output to the parent.

Parameters:

Name Type Description Default
subagents list[SubAgentDef]

List of :class:SubAgentDef declaring the available subagent types.

required
parent_model Any

Model used when a subagent doesn't override.

required

Returns:

Type Description
Any

A locus Tool instance ready to splice into an Agent's

Any

tool list.

Source code in src/locus/deepagent/subagent.py
def task_tool(
    subagents: list[SubAgentDef],
    *,
    parent_model: Any,
) -> Any:
    """Build a single ``task()`` tool the parent agent can call.

    The tool's parameters are flat: ``subagent_type`` (str) and
    ``description`` (str). On call:

    1. Look up ``subagent_type`` in the registered subagents.
    2. Spawn a fresh ``locus.Agent`` with the subagent's tools, prompt,
       and (optional) model override.
    3. Run the subagent on ``description`` using ``agent.run()`` and
       capture the final ``TerminateEvent.final_message``.
    4. Return that string as the tool's output to the parent.

    Args:
        subagents: List of :class:`SubAgentDef` declaring the
            available subagent types.
        parent_model: Model used when a subagent doesn't override.

    Returns:
        A locus ``Tool`` instance ready to splice into an Agent's
        tool list.
    """
    by_name: dict[str, SubAgentDef] = {sa.name: sa for sa in subagents}
    if not by_name:
        msg = "task_tool requires at least one SubAgentDef"
        raise ValueError(msg)

    # Build the docstring at runtime so the parent's prompt sees the
    # actual subagent catalogue.
    catalog = "\n".join(f"- {sa.name}: {sa.description}" for sa in subagents)
    available = ", ".join(sorted(by_name))

    @tool
    async def task(subagent_type: str, description: str) -> str:
        """Spawn a one-shot subagent to handle a focused subproblem.

        Args:
            subagent_type: Which subagent to spawn (one of: see catalogue
                in the system prompt).
            description: Detailed instructions for the subagent.

        Returns:
            The subagent's final message.
        """
        if subagent_type not in by_name:
            return f"unknown subagent_type {subagent_type!r}; available: {available}"
        defn = by_name[subagent_type]

        import time as _time

        from locus.agent.agent import Agent
        from locus.core.events import TerminateEvent
        from locus.observability.emit import (
            EV_DEEPAGENT_SUBAGENT_COMPLETED,
            EV_DEEPAGENT_SUBAGENT_SPAWNED,
            emit,
        )

        await emit(
            EV_DEEPAGENT_SUBAGENT_SPAWNED,
            subagent_type=subagent_type,
            description_preview=description[:200],
            max_iterations=defn.max_iterations,
        )
        _started = _time.perf_counter()

        sub_model = defn.model if defn.model is not None else parent_model
        sub_agent = Agent(
            model=sub_model,
            tools=defn.tools,
            system_prompt=defn.system_prompt,
            max_iterations=defn.max_iterations,
            reflexion=False,  # Subagents are short-lived; reflexion is overkill.
            grounding=False,
        )
        final_message = ""
        async for event in sub_agent.run(description):
            if isinstance(event, TerminateEvent):
                final_message = event.final_message or ""
        await emit(
            EV_DEEPAGENT_SUBAGENT_COMPLETED,
            subagent_type=subagent_type,
            output_length=len(final_message),
            duration_ms=(_time.perf_counter() - _started) * 1000,
            success=bool(final_message),
        )
        return final_message

    # Stash the catalogue on the tool so callers can inject it into
    # the parent's prompt if they want the parent to see what's
    # available without overloading the docstring.
    task._subagent_catalog = catalog  # type: ignore[attr-defined]  # noqa: SLF001 — extension point

    return task

Todos

TodoState

TodoState()

Thread-safe holder for the agent's TODO list.

A single TodoState is shared between write_todos and read_todos for one agent run. Pass the same state to :func:make_todo_tools if you want to inspect the list externally after the run.

Source code in src/locus/deepagent/todos.py
def __init__(self) -> None:
    self._todos: list[Todo] = []
    self._lock = threading.Lock()

Todo

Bases: BaseModel

One task on the agent's structured task list.

make_todo_tools

make_todo_tools(state: TodoState | None = None) -> list[Any]

Build the write_todos and read_todos tools.

Both tools take / return a JSON string (a list of {content, status} objects). Strings keep OpenAI's strict structured-output mode happy — nested object schemas would require recursive additionalProperties: false annotations.

Source code in src/locus/deepagent/todos.py
def make_todo_tools(state: TodoState | None = None) -> list[Any]:
    """Build the ``write_todos`` and ``read_todos`` tools.

    Both tools take / return a JSON string (a list of
    ``{content, status}`` objects). Strings keep OpenAI's strict
    structured-output mode happy — nested object schemas would
    require recursive ``additionalProperties: false`` annotations.
    """
    state = state if state is not None else TodoState()

    @tool
    def write_todos(todos_json: str) -> str:
        """Replace the agent's TODO list.

        ``todos_json`` is a JSON-encoded list of ``{content, status}``
        objects. Status is one of ``pending | in_progress | completed``.
        Returns the canonical list as JSON so the agent can confirm
        what was stored.

        Use this aggressively for multi-step research: write the plan
        as a list of pending TODOs at the start, mark each
        ``in_progress`` before working it, mark it ``completed`` after.
        """
        try:
            raw = json.loads(todos_json)
        except json.JSONDecodeError as exc:
            return f"invalid JSON: {exc}"
        if not isinstance(raw, list):
            return "todos_json must be a JSON array of {content, status} objects"
        items: list[Todo] = []
        for entry in raw:
            if not isinstance(entry, dict):
                return "each todo must be an object with `content` and optional `status`"
            try:
                items.append(Todo(**entry))
            except (TypeError, ValueError) as exc:
                return f"invalid todo entry {entry!r}: {exc}"
        stored = state.replace(items)
        from locus.observability.emit import (
            EV_DEEPAGENT_TODO_ADDED,
            EV_DEEPAGENT_TODO_COMPLETED,
            emit_sync,
        )

        for todo in stored:
            ev = (
                EV_DEEPAGENT_TODO_COMPLETED
                if todo.status == "completed"
                else EV_DEEPAGENT_TODO_ADDED
            )
            emit_sync(ev, content=todo.content, status=todo.status)
        return json.dumps([t.model_dump() for t in stored])

    @tool
    def read_todos() -> str:
        """Return the current TODO list as a JSON-encoded array."""
        return json.dumps([t.model_dump() for t in state.snapshot()])

    return [write_todos, read_todos]

Filesystem

make_filesystem_tools

make_filesystem_tools(backend: BackendProtocol) -> list[Any]

Build the six filesystem-as-memory tools bound to backend.

Returns:

Type Description
list[Any]

list of locus Tool instances ready to splice into an

list[Any]

Agent's tools=[...] argument.

Source code in src/locus/deepagent/tools/filesystem.py
def make_filesystem_tools(backend: BackendProtocol) -> list[Any]:
    """Build the six filesystem-as-memory tools bound to ``backend``.

    Returns:
        list of locus ``Tool`` instances ready to splice into an
        ``Agent``'s ``tools=[...]`` argument.
    """

    @tool
    def write_file(path: str, contents: str) -> str:
        """Write ``contents`` to ``path``, creating or overwriting the
        file. Parent directories are created implicitly. Returns the
        path on success, or a standardized error code on failure
        (``"is_directory"``, ``"permission_denied"``, ``"invalid_path"``).
        """
        from locus.observability.emit import EV_DEEPAGENT_FS_WRITE, emit_sync

        try:
            backend.write(path, contents)
        except BackendError as exc:
            return exc.code
        emit_sync(EV_DEEPAGENT_FS_WRITE, path=path, byte_count=len(contents))
        return path

    @tool
    def read_file(path: str, offset: int = 0, limit: int = 100) -> str:
        """Read up to ``limit`` lines from ``path`` starting at the
        0-indexed line ``offset``. Output is line-numbered (1-based,
        ``cat -n`` style). Default ``limit=100`` to avoid blowing the
        token budget on large files; raise it explicitly when needed.

        Returns the file content slice or a standardized error code
        (``"file_not_found"``, ``"is_directory"``,
        ``"permission_denied"``, ``"invalid_path"``).
        """
        from locus.observability.emit import EV_DEEPAGENT_FS_READ, emit_sync

        try:
            content = backend.read(path, offset=offset, limit=limit)
        except BackendError as exc:
            return exc.code
        emit_sync(EV_DEEPAGENT_FS_READ, path=path, byte_count=len(content))
        return content

    @tool
    def ls(path: str = "/", recursive: bool = False) -> str:
        """List entries under ``path``. Set ``recursive=True`` to walk
        the entire subtree. Returns a JSON-encoded list of
        ``{path, is_dir, size, modified_at}`` records, or a
        standardized error code on failure.
        """
        try:
            entries = backend.ls(path, recursive=recursive)
        except BackendError as exc:
            return exc.code
        return json.dumps(
            [
                {
                    "path": fi.path,
                    "is_dir": fi.is_dir,
                    "size": fi.size,
                    "modified_at": fi.modified_at.isoformat() if fi.modified_at else None,
                }
                for fi in entries
            ]
        )

    @tool
    def edit_file(path: str, old_str: str, new_str: str) -> str:
        """Replace exactly one occurrence of ``old_str`` with ``new_str``
        in ``path``. The match must be unique — supply a longer or more
        specific snippet if the same substring appears multiple times.

        Returns the path on success, the standardized error code on
        backend failure, or a human-readable message describing the
        zero/multi-match condition (so the agent can retry with a
        better snippet).
        """
        try:
            backend.edit(path, old_str, new_str)
        except BackendError as exc:
            return exc.code
        except ValueError as exc:
            return str(exc)
        return path

    @tool
    def glob(pattern: str, path: str = "/") -> str:
        """POSIX-glob match files under ``path`` against ``pattern``
        (e.g. ``"**/*.md"`` or ``"notes-*.txt"``). Returns a
        JSON-encoded list of ``FileInfo`` records. Soft-bounded at 20
        seconds (the contract surface — the in-memory backend resolves
        instantly; on-disk backends terminate within budget).
        """
        try:
            entries = backend.glob(pattern, path=path, timeout_s=20.0)
        except BackendError as exc:
            return exc.code
        return json.dumps(
            [
                {
                    "path": fi.path,
                    "is_dir": fi.is_dir,
                    "size": fi.size,
                    "modified_at": fi.modified_at.isoformat() if fi.modified_at else None,
                }
                for fi in entries
            ]
        )

    @tool
    def grep(pattern: str, path: str = "/", recursive: bool = True) -> str:
        """Regex-search file contents under ``path``. Returns a
        JSON-encoded list of ``{path, line, text}`` matches with
        1-based line numbers. ``recursive=True`` (default) walks
        nested directories.
        """
        try:
            matches = backend.grep(pattern, path=path, recursive=recursive)
        except BackendError as exc:
            return exc.code
        return json.dumps([asdict(m) for m in matches])

    return [write_file, read_file, ls, edit_file, glob, grep]

FilesystemBackend

FilesystemBackend(root: str | PathLike[str])

On-disk backend rooted at a caller-supplied directory.

The agent sees / as the root; on disk this maps to root. All resolved targets must remain under root — symlink traversal and absolute-path escapes are rejected.

Source code in src/locus/deepagent/backends/filesystem.py
def __init__(self, root: str | os.PathLike[str]) -> None:
    self._root = Path(root).resolve(strict=False)
    self._root.mkdir(parents=True, exist_ok=True)

StateBackend

StateBackend()

In-memory dict-of-paths-to-bytes backend.

Files are stored as UTF-8 text. Directories are implicit: a file at "/notes/draft.md" makes "/notes" a directory in listings even though no explicit entry exists for it.

Source code in src/locus/deepagent/backends/state.py
def __init__(self) -> None:
    self._files: dict[str, str] = {}
    self._mtime: dict[str, datetime] = {}
    self._lock = threading.Lock()

Knowledge protocol

KnowledgeProvider

Bases: Protocol

The contract a research domain implements.

The deepagent runtime calls these methods in order:

  1. open() once per scan
  2. discover(query) to enumerate ItemRefs
  3. for each item: a. ground(item) for live evidence b. agent runs with tools_for_agent() and submits structured output c. merge_to_row(item, grounding, research, model_id, prompt_hash)
  4. close() at end of scan

All methods are async. The runtime calls them from its event loop.

tools_for_agent

tools_for_agent() -> list[Any]

Locus tools the deepagent should attach for this provider.

Source code in src/locus/deepagent/protocol.py
def tools_for_agent(self) -> list[Any]:
    """Locus tools the deepagent should attach for this provider."""
    ...

output_schema

output_schema() -> type[BaseModel]

Pydantic schema the deepagent's submit_research expects.

Source code in src/locus/deepagent/protocol.py
def output_schema(self) -> type[BaseModel]:
    """Pydantic schema the deepagent's ``submit_research`` expects."""
    ...

KnowledgeRow

Bases: BaseModel

One catalog row — what the agent ultimately produces per item.

Concrete providers subclass this to add typed fields. The base keeps only the cross-provider invariants so a generic catalog layer can search / persist across providers without coupling to any one schema.

ItemRef

Bases: BaseModel

A discoverable research target.

The key uniquely identifies the item across runs (used as a checkpointer thread id and as a catalog primary key). The provider field tags which provider produced the ref so a cross-provider catalog can keep them straight.