Skip to content

ReAct loop

The low-level ReAct loop primitives. Most users should reach for the high-level Agent API (see Agent) — these classes are for when you need to compose your own loop shape (different node order, custom routing, batched execution).

Loop

ReActLoop

Bases: BaseModel

ReAct (Reason + Act) loop implementation.

Implements the Think -> Execute -> Reflect cycle with: - Streaming events via AsyncIterator - Conditional routing based on state - Confidence-based termination - Tool loop detection

Usage

loop = ReActLoop(model=my_model, registry=my_tools)

async for event in loop.run("Solve this problem"): match event: case ThinkEvent(): print(f"Thinking: {event.reasoning}") case ToolCompleteEvent(): print(f"Tool {event.tool_name}: {event.result}") case TerminateEvent(): print(f"Done: {event.reason}")

run async

run(prompt: str, initial_state: AgentState | None = None, **state_kwargs: Any) -> AsyncIterator[LoopEvent]

Run the ReAct loop.

Parameters:

Name Type Description Default
prompt str

User prompt to process

required
initial_state AgentState | None

Optional pre-configured state

None
**state_kwargs Any

Additional state configuration

{}

Yields:

Type Description
AsyncIterator[LoopEvent]

Loop events (ThinkEvent, ToolStartEvent, ToolCompleteEvent, ReflectEvent, TerminateEvent)

Source code in src/locus/loop/react.py
async def run(
    self,
    prompt: str,
    initial_state: AgentState | None = None,
    **state_kwargs: Any,
) -> AsyncIterator[LoopEvent]:
    """
    Run the ReAct loop.

    Args:
        prompt: User prompt to process
        initial_state: Optional pre-configured state
        **state_kwargs: Additional state configuration

    Yields:
        Loop events (ThinkEvent, ToolStartEvent, ToolCompleteEvent,
                    ReflectEvent, TerminateEvent)
    """
    # Initialize
    if initial_state is not None:
        state = initial_state.with_message(Message.user(prompt))
    else:
        state = self._create_initial_state(prompt, **state_kwargs)

    nodes = self._create_nodes()
    router = self._create_router()

    # Start with Think
    current_node = NodeType.THINK

    while current_node != NodeType.TERMINATE:
        # Execute current node
        node = nodes.get(current_node)

        if node is not None:
            result = await node.execute(state)
            state = result.state

            # Yield all events from this node
            for event in result.events:
                yield event

        # Route to next node
        decision = router.route(current_node, state)
        current_node = decision.next_node

        # Increment iteration when going back to Think
        if current_node == NodeType.THINK:
            state = state.next_iteration()

    # Emit termination event
    _should_stop, reason = state.should_terminate
    yield TerminateEvent(
        reason=reason or "complete",
        iterations_used=state.iteration,
        final_confidence=state.confidence,
        total_tool_calls=len(state.tool_executions),
    )

run_to_completion async

run_to_completion(prompt: str, initial_state: AgentState | None = None, **state_kwargs: Any) -> tuple[AgentState, list[LoopEvent]]

Run the loop and collect all events.

Convenience method that collects all events and returns the final state along with the event history.

Parameters:

Name Type Description Default
prompt str

User prompt to process

required
initial_state AgentState | None

Optional pre-configured state

None
**state_kwargs Any

Additional state configuration

{}

Returns:

Type Description
tuple[AgentState, list[LoopEvent]]

Tuple of (final_state, events)

Source code in src/locus/loop/react.py
async def run_to_completion(
    self,
    prompt: str,
    initial_state: AgentState | None = None,
    **state_kwargs: Any,
) -> tuple[AgentState, list[LoopEvent]]:
    """
    Run the loop and collect all events.

    Convenience method that collects all events and returns
    the final state along with the event history.

    Args:
        prompt: User prompt to process
        initial_state: Optional pre-configured state
        **state_kwargs: Additional state configuration

    Returns:
        Tuple of (final_state, events)
    """
    events: list[LoopEvent] = []

    # Initialize state to track
    if initial_state is not None:
        state = initial_state.with_message(Message.user(prompt))
    else:
        state = self._create_initial_state(prompt, **state_kwargs)

    nodes = self._create_nodes()
    router = self._create_router()
    current_node = NodeType.THINK

    while current_node != NodeType.TERMINATE:
        node = nodes.get(current_node)

        if node is not None:
            result = await node.execute(state)
            state = result.state
            events.extend(result.events)

        decision = router.route(current_node, state)
        current_node = decision.next_node

        if current_node == NodeType.THINK:
            state = state.next_iteration()

    # Add termination event
    _should_stop, reason = state.should_terminate
    terminate_event = TerminateEvent(
        reason=reason or "complete",
        iterations_used=state.iteration,
        final_confidence=state.confidence,
        total_tool_calls=len(state.tool_executions),
    )
    events.append(terminate_event)

    return state, events

with_config

with_config(**updates: Any) -> ReActLoop

Create a new loop with updated configuration.

Returns a new ReActLoop instance (immutable).

Source code in src/locus/loop/react.py
def with_config(self, **updates: Any) -> ReActLoop:
    """
    Create a new loop with updated configuration.

    Returns a new ReActLoop instance (immutable).
    """
    new_config = self.config.model_copy(update=updates)
    return self.model_copy(update={"config": new_config})

ReActLoopConfig

Bases: BaseModel

Configuration for the ReAct loop.

create_react_loop

create_react_loop(model: ModelProtocol, registry: ToolRegistry, *, max_iterations: int = 20, confidence_threshold: float = 0.85, enable_reflection: bool = True, system_prompt: str | None = None) -> ReActLoop

Factory function to create a ReActLoop.

Parameters:

Name Type Description Default
model ModelProtocol

LLM model to use for reasoning

required
registry ToolRegistry

Tool registry with available tools

required
max_iterations int

Maximum iterations before forced termination

20
confidence_threshold float

Confidence level for completion

0.85
enable_reflection bool

Whether to include reflection step

True
system_prompt str | None

Optional system prompt

None

Returns:

Type Description
ReActLoop

Configured ReActLoop instance

Source code in src/locus/loop/react.py
def create_react_loop(
    model: ModelProtocol,
    registry: ToolRegistry,
    *,
    max_iterations: int = 20,
    confidence_threshold: float = 0.85,
    enable_reflection: bool = True,
    system_prompt: str | None = None,
) -> ReActLoop:
    """
    Factory function to create a ReActLoop.

    Args:
        model: LLM model to use for reasoning
        registry: Tool registry with available tools
        max_iterations: Maximum iterations before forced termination
        confidence_threshold: Confidence level for completion
        enable_reflection: Whether to include reflection step
        system_prompt: Optional system prompt

    Returns:
        Configured ReActLoop instance
    """
    config = ReActLoopConfig(
        max_iterations=max_iterations,
        confidence_threshold=confidence_threshold,
        enable_reflection=enable_reflection,
        system_prompt=system_prompt,
    )

    return ReActLoop(
        model=model,
        registry=registry,
        config=config,
    )

Nodes

Node

Bases: BaseModel, ABC

Base class for ReAct loop nodes.

execute abstractmethod async

execute(state: AgentState) -> NodeResult

Execute the node.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required

Returns:

Type Description
NodeResult

Updated state and any events produced

Source code in src/locus/loop/nodes.py
@abstractmethod
async def execute(self, state: AgentState) -> NodeResult:
    """
    Execute the node.

    Args:
        state: Current agent state

    Returns:
        Updated state and any events produced
    """
    ...

NodeResult

Bases: BaseModel

Result from executing a node.

ThinkNode

Bases: Node

Node that invokes the LLM to generate reasoning and/or tool calls.

This is the "Reason" part of ReAct.

execute async

execute(state: AgentState) -> NodeResult

Generate the next thought and/or tool calls.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required

Returns:

Type Description
NodeResult

Updated state with assistant message and ThinkEvent

Source code in src/locus/loop/nodes.py
async def execute(self, state: AgentState) -> NodeResult:
    """
    Generate the next thought and/or tool calls.

    Args:
        state: Current agent state

    Returns:
        Updated state with assistant message and ThinkEvent
    """
    # Build messages for model
    messages = list(state.messages)

    # Add system prompt if provided and not already present
    if self.system_prompt and (not messages or messages[0].role.value != "system"):
        messages.insert(0, Message.system(self.system_prompt))

    # Get tool schemas
    tool_schemas = self.registry.to_openai_schemas() if len(self.registry) > 0 else None

    # Call model
    response: ModelResponse = await self.model.complete(
        messages=messages,
        tools=tool_schemas,
    )

    # Extract response content
    assistant_message = response.message
    reasoning = assistant_message.content
    tool_calls = list(assistant_message.tool_calls)

    # Create ThinkEvent
    event = ThinkEvent(
        iteration=state.iteration,
        reasoning=reasoning,
        tool_calls=tool_calls,
    )

    # Update state with assistant message
    new_state = state.with_message(assistant_message)

    return NodeResult(state=new_state, events=[event])

ExecuteNode

Bases: Node

Node that executes tool calls.

This is the "Act" part of ReAct.

execute async

execute(state: AgentState) -> NodeResult

Execute pending tool calls.

This is the "Act" part of ReAct. Tool calls whose tool declared idempotent=True are de-duplicated against prior executions on the current state: if the same (tool_name, arguments) pair has already been executed during this agent run, the prior result is reused and the tool function is NOT invoked again. This prevents models that re-emit the same call from causing duplicate side-effects (double bookings, double transfers, etc.).

Parameters:

Name Type Description Default
state AgentState

Current agent state with tool calls

required

Returns:

Type Description
NodeResult

Updated state with tool results and events

Source code in src/locus/loop/nodes.py
async def execute(self, state: AgentState) -> NodeResult:
    """
    Execute pending tool calls.

    This is the "Act" part of ReAct. Tool calls whose tool declared
    ``idempotent=True`` are de-duplicated against prior executions on
    the current state: if the same (tool_name, arguments) pair has
    already been executed during this agent run, the prior result is
    reused and the tool function is NOT invoked again. This prevents
    models that re-emit the same call from causing duplicate
    side-effects (double bookings, double transfers, etc.).

    Args:
        state: Current agent state with tool calls

    Returns:
        Updated state with tool results and events
    """
    from locus.tools.executor import ToolResult

    tool_calls = state.last_tool_calls
    # ``NodeResult.events`` is invariantly typed as the broader event
    # union; declare the local list with that union so the eventual
    # ``return NodeResult(events=events)`` type-checks.
    events: list[ThinkEvent | ToolStartEvent | ToolCompleteEvent | ReflectEvent] = []

    if not tool_calls:
        return NodeResult(state=state, events=[])

    # Create context factory
    ctx_factory = ToolContextFactory(
        run_id=state.run_id,
        agent_id=state.agent_id,
        iteration=state.iteration,
    )

    # Emit start events (dedup is transparent to observers)
    for tc in tool_calls:
        events.append(
            ToolStartEvent(
                tool_name=tc.name,
                tool_call_id=tc.id,
                arguments=tc.arguments,
            )
        )

    # Split into fresh vs. cached (idempotent tools with a prior match).
    cached_results: dict[str, ToolResult] = {}
    fresh_calls = []
    for tc in tool_calls:
        tool = self.registry.get(tc.name) if hasattr(self.registry, "get") else None
        if tool is not None and getattr(tool, "idempotent", False):
            prior = _find_matching_execution(state, tc.name, dict(tc.arguments))
            if prior is not None:
                cached_results[tc.id] = ToolResult(
                    tool_call_id=tc.id,
                    name=tc.name,
                    content=prior.result if prior.result is not None else "",
                    error=prior.error,
                    duration_ms=0.0,
                )
                continue
        fresh_calls.append(tc)

    # Execute the fresh ones; cached ones re-use prior output.
    fresh_results = (
        await self.executor.execute(
            tool_calls=fresh_calls,
            registry=self.registry,
            ctx_factory=ctx_factory,
        )
        if fresh_calls
        else []
    )
    results_by_id: dict[str, ToolResult] = {r.tool_call_id: r for r in fresh_results}
    results_by_id.update(cached_results)
    results = [results_by_id[tc.id] for tc in tool_calls if tc.id in results_by_id]

    # Process results and update state
    new_state = state
    tool_messages: list[Message] = []

    for result in results:
        # Record tool execution
        execution = ToolExecution(
            tool_name=result.name,
            tool_call_id=result.tool_call_id,
            arguments=next(
                (tc.arguments for tc in tool_calls if tc.id == result.tool_call_id),
                {},
            ),
            result=result.content if result.success else None,
            error=result.error,
            duration_ms=result.duration_ms,
        )
        new_state = new_state.with_tool_execution(execution)

        # Create tool message
        tool_messages.append(Message.tool(result))

        # Emit complete event
        events.append(
            ToolCompleteEvent(
                tool_name=result.name,
                tool_call_id=result.tool_call_id,
                result=result.content if result.success else None,
                error=result.error,
                duration_ms=result.duration_ms,
            )
        )

    # Add all tool result messages
    new_state = new_state.with_messages(tool_messages)

    return NodeResult(state=new_state, events=events)

ReflectNode

Bases: Node

Node that evaluates progress and adjusts confidence.

This implements a simplified Reflexion-style self-evaluation.

execute async

execute(state: AgentState) -> NodeResult

Reflect on the current progress and update confidence.

Parameters:

Name Type Description Default
state AgentState

Current agent state

required

Returns:

Type Description
NodeResult

Updated state with adjusted confidence and ReflectEvent

Source code in src/locus/loop/nodes.py
async def execute(self, state: AgentState) -> NodeResult:
    """
    Reflect on the current progress and update confidence.

    Args:
        state: Current agent state

    Returns:
        Updated state with adjusted confidence and ReflectEvent
    """
    assessment, guidance = self._assess_progress(state)

    # Get confidence delta
    delta = self.confidence_adjustments.get(assessment, 0.0)

    # Update state with new confidence
    new_state = state.adjust_confidence(delta, diminishing=True)

    # Create reasoning step record
    step = ReasoningStep(
        iteration=state.iteration,
        thought=self._get_last_thought(state),
        tool_calls=state.last_tool_calls,
        tool_results=list(state.tool_executions[-len(state.last_tool_calls) :])
        if state.last_tool_calls
        else [],
        reflection=guidance,
        confidence_delta=delta,
    )
    new_state = new_state.with_reasoning_step(step)

    # Create event
    event = ReflectEvent(
        iteration=state.iteration,
        assessment=assessment,
        confidence_delta=delta,
        new_confidence=new_state.confidence,
        guidance=guidance,
    )

    return NodeResult(state=new_state, events=[event])

Router

Router

Bases: BaseModel

Conditional routing logic for the ReAct loop.

Determines which node to execute next based on the current state.

route_from_think

route_from_think(state: AgentState) -> RouteDecision

Route from the Think node.

After thinking: - If tool calls exist -> Execute - If no tool calls and should terminate -> Terminate - If no tool calls -> Reflect (if enabled) or Terminate

Source code in src/locus/loop/router.py
def route_from_think(self, state: AgentState) -> RouteDecision:
    """
    Route from the Think node.

    After thinking:
    - If tool calls exist -> Execute
    - If no tool calls and should terminate -> Terminate
    - If no tool calls -> Reflect (if enabled) or Terminate
    """
    # Check for termination conditions first
    should_stop, reason = state.should_terminate
    if should_stop:
        return RouteDecision(
            next_node=NodeType.TERMINATE,
            reason=f"Termination condition met: {reason}",
            metadata={"termination_reason": reason},
        )

    # If there are tool calls, execute them
    if state.last_tool_calls:
        return RouteDecision(
            next_node=NodeType.EXECUTE,
            reason=f"Executing {len(state.last_tool_calls)} tool call(s)",
            metadata={"tool_count": len(state.last_tool_calls)},
        )

    # No tool calls - this is a potential termination point
    # The agent has responded without requesting any actions
    if self.enable_reflection and not self.skip_reflect_without_tools:
        return RouteDecision(
            next_node=NodeType.REFLECT,
            reason="No tool calls - reflecting on response",
        )

    return RouteDecision(
        next_node=NodeType.TERMINATE,
        reason="No tool calls - completing",
        metadata={"termination_reason": "no_tools"},
    )

route_from_execute

route_from_execute(state: AgentState) -> RouteDecision

Route from the Execute node.

After executing tools: - If should terminate -> Terminate - If reflection enabled and interval met -> Reflect - Otherwise -> Think

Source code in src/locus/loop/router.py
def route_from_execute(self, state: AgentState) -> RouteDecision:
    """
    Route from the Execute node.

    After executing tools:
    - If should terminate -> Terminate
    - If reflection enabled and interval met -> Reflect
    - Otherwise -> Think
    """
    # Check termination
    should_stop, reason = state.should_terminate
    if should_stop:
        return RouteDecision(
            next_node=NodeType.TERMINATE,
            reason=f"Termination condition met: {reason}",
            metadata={"termination_reason": reason},
        )

    # Check if we should reflect
    if self._should_reflect(state):
        return RouteDecision(
            next_node=NodeType.REFLECT,
            reason="Reflecting on progress",
            metadata={"iteration": state.iteration},
        )

    # Continue to think
    return RouteDecision(
        next_node=NodeType.THINK,
        reason="Continuing to next thought",
    )

route_from_reflect

route_from_reflect(state: AgentState) -> RouteDecision

Route from the Reflect node.

After reflecting: - If should terminate -> Terminate - Otherwise -> Think (with new iteration)

Source code in src/locus/loop/router.py
def route_from_reflect(self, state: AgentState) -> RouteDecision:
    """
    Route from the Reflect node.

    After reflecting:
    - If should terminate -> Terminate
    - Otherwise -> Think (with new iteration)
    """
    # Check termination
    should_stop, reason = state.should_terminate
    if should_stop:
        return RouteDecision(
            next_node=NodeType.TERMINATE,
            reason=f"Termination condition met: {reason}",
            metadata={"termination_reason": reason},
        )

    # Continue to think
    return RouteDecision(
        next_node=NodeType.THINK,
        reason="Starting next iteration",
    )

route

route(current_node: NodeType, state: AgentState) -> RouteDecision

Route from the given node based on current state.

Parameters:

Name Type Description Default
current_node NodeType

The node that just completed

required
state AgentState

Current agent state

required

Returns:

Type Description
RouteDecision

Decision about which node to execute next

Source code in src/locus/loop/router.py
def route(self, current_node: NodeType, state: AgentState) -> RouteDecision:
    """
    Route from the given node based on current state.

    Args:
        current_node: The node that just completed
        state: Current agent state

    Returns:
        Decision about which node to execute next
    """
    if current_node == NodeType.THINK:
        return self.route_from_think(state)
    if current_node == NodeType.EXECUTE:
        return self.route_from_execute(state)
    if current_node == NodeType.REFLECT:
        return self.route_from_reflect(state)
    # Terminate node - stay terminated
    return RouteDecision(
        next_node=NodeType.TERMINATE,
        reason="Already terminated",
    )

ConditionalRouter

Bases: Router

Router with custom condition functions.

Allows injecting custom routing logic for advanced use cases.

add_condition

add_condition(name: str, condition: Any) -> ConditionalRouter

Add a custom routing condition.

The condition function receives the state and returns either: - A RouteDecision to override default routing - None to continue with default routing

Returns a new router with the condition added (immutable).

Source code in src/locus/loop/router.py
def add_condition(
    self,
    name: str,
    condition: Any,  # Callable[[AgentState], RouteDecision | None]
) -> ConditionalRouter:
    """
    Add a custom routing condition.

    The condition function receives the state and returns either:
    - A RouteDecision to override default routing
    - None to continue with default routing

    Returns a new router with the condition added (immutable).
    """
    new_conditions = [*self.custom_conditions, (name, condition)]
    return self.model_copy(update={"custom_conditions": new_conditions})

route

route(current_node: NodeType, state: AgentState) -> RouteDecision

Route with custom conditions checked first.

Custom conditions are checked in order. The first one to return a RouteDecision wins.

Source code in src/locus/loop/router.py
def route(self, current_node: NodeType, state: AgentState) -> RouteDecision:
    """
    Route with custom conditions checked first.

    Custom conditions are checked in order. The first one to return
    a RouteDecision wins.
    """
    # Check custom conditions first
    for name, condition in self.custom_conditions:
        try:
            result = condition(state)
            if result is not None:
                updated: RouteDecision = result.model_copy(
                    update={"metadata": {**result.metadata, "custom_condition": name}}
                )
                return updated
        except Exception:  # noqa: BLE001
            # Custom condition failed, continue with others
            continue

    # Fall back to default routing
    return super().route(current_node, state)

route_from_think

route_from_think(state: AgentState) -> RouteDecision

Route from the Think node.

After thinking: - If tool calls exist -> Execute - If no tool calls and should terminate -> Terminate - If no tool calls -> Reflect (if enabled) or Terminate

Source code in src/locus/loop/router.py
def route_from_think(self, state: AgentState) -> RouteDecision:
    """
    Route from the Think node.

    After thinking:
    - If tool calls exist -> Execute
    - If no tool calls and should terminate -> Terminate
    - If no tool calls -> Reflect (if enabled) or Terminate
    """
    # Check for termination conditions first
    should_stop, reason = state.should_terminate
    if should_stop:
        return RouteDecision(
            next_node=NodeType.TERMINATE,
            reason=f"Termination condition met: {reason}",
            metadata={"termination_reason": reason},
        )

    # If there are tool calls, execute them
    if state.last_tool_calls:
        return RouteDecision(
            next_node=NodeType.EXECUTE,
            reason=f"Executing {len(state.last_tool_calls)} tool call(s)",
            metadata={"tool_count": len(state.last_tool_calls)},
        )

    # No tool calls - this is a potential termination point
    # The agent has responded without requesting any actions
    if self.enable_reflection and not self.skip_reflect_without_tools:
        return RouteDecision(
            next_node=NodeType.REFLECT,
            reason="No tool calls - reflecting on response",
        )

    return RouteDecision(
        next_node=NodeType.TERMINATE,
        reason="No tool calls - completing",
        metadata={"termination_reason": "no_tools"},
    )

route_from_execute

route_from_execute(state: AgentState) -> RouteDecision

Route from the Execute node.

After executing tools: - If should terminate -> Terminate - If reflection enabled and interval met -> Reflect - Otherwise -> Think

Source code in src/locus/loop/router.py
def route_from_execute(self, state: AgentState) -> RouteDecision:
    """
    Route from the Execute node.

    After executing tools:
    - If should terminate -> Terminate
    - If reflection enabled and interval met -> Reflect
    - Otherwise -> Think
    """
    # Check termination
    should_stop, reason = state.should_terminate
    if should_stop:
        return RouteDecision(
            next_node=NodeType.TERMINATE,
            reason=f"Termination condition met: {reason}",
            metadata={"termination_reason": reason},
        )

    # Check if we should reflect
    if self._should_reflect(state):
        return RouteDecision(
            next_node=NodeType.REFLECT,
            reason="Reflecting on progress",
            metadata={"iteration": state.iteration},
        )

    # Continue to think
    return RouteDecision(
        next_node=NodeType.THINK,
        reason="Continuing to next thought",
    )

route_from_reflect

route_from_reflect(state: AgentState) -> RouteDecision

Route from the Reflect node.

After reflecting: - If should terminate -> Terminate - Otherwise -> Think (with new iteration)

Source code in src/locus/loop/router.py
def route_from_reflect(self, state: AgentState) -> RouteDecision:
    """
    Route from the Reflect node.

    After reflecting:
    - If should terminate -> Terminate
    - Otherwise -> Think (with new iteration)
    """
    # Check termination
    should_stop, reason = state.should_terminate
    if should_stop:
        return RouteDecision(
            next_node=NodeType.TERMINATE,
            reason=f"Termination condition met: {reason}",
            metadata={"termination_reason": reason},
        )

    # Continue to think
    return RouteDecision(
        next_node=NodeType.THINK,
        reason="Starting next iteration",
    )

NodeType

Bases: StrEnum

Types of nodes in the ReAct loop.

RouteDecision

Bases: BaseModel

Result of a routing decision.

Runner

LoopRunner

Bases: BaseModel

High-level executor for ReAct loops.

Provides additional features: - Event callbacks/hooks - Error handling and retries - Timeout management - Progress tracking

events property

events: list[LoopEvent]

Get events from the last run.

final_state property

final_state: AgentState | None

Get final state from the last run.

run async

run(prompt: str, initial_state: AgentState | None = None, **state_kwargs: Any) -> AsyncIterator[LoopEvent]

Run the loop with callbacks and error handling.

Parameters:

Name Type Description Default
prompt str

User prompt

required
initial_state AgentState | None

Optional initial state

None
**state_kwargs Any

Additional state configuration

{}

Yields:

Type Description
AsyncIterator[LoopEvent]

Loop events

Source code in src/locus/loop/runner.py
async def run(
    self,
    prompt: str,
    initial_state: AgentState | None = None,
    **state_kwargs: Any,
) -> AsyncIterator[LoopEvent]:
    """
    Run the loop with callbacks and error handling.

    Args:
        prompt: User prompt
        initial_state: Optional initial state
        **state_kwargs: Additional state configuration

    Yields:
        Loop events
    """
    self._events = []
    self._final_state = None
    retries = 0

    while True:
        try:
            async for event in self._run_with_timeout(prompt, initial_state, **state_kwargs):
                self._events.append(event)

                # Call event callback
                if self.on_event:
                    self.on_event(event)

                yield event

                # Track final state from terminate event
                if isinstance(event, TerminateEvent):
                    break

            # Success - exit retry loop
            break

        except Exception as e:
            retries += 1

            if self.on_error:
                # Create state for error callback
                error_state = initial_state or AgentState()
                self.on_error(e, error_state)

            if not self.retry_on_error or retries > self.max_retries:
                raise

            # Wait before retry (exponential backoff)
            await asyncio.sleep(min(2**retries, 30))

    # Call completion callback
    if self.on_complete and self._final_state:
        self.on_complete(self._final_state, self._events)

run_to_completion async

run_to_completion(prompt: str, initial_state: AgentState | None = None, **state_kwargs: Any) -> tuple[AgentState, list[LoopEvent]]

Run to completion and return final state with events.

Parameters:

Name Type Description Default
prompt str

User prompt

required
initial_state AgentState | None

Optional initial state

None
**state_kwargs Any

Additional state configuration

{}

Returns:

Type Description
tuple[AgentState, list[LoopEvent]]

Tuple of (final_state, events)

Source code in src/locus/loop/runner.py
async def run_to_completion(
    self,
    prompt: str,
    initial_state: AgentState | None = None,
    **state_kwargs: Any,
) -> tuple[AgentState, list[LoopEvent]]:
    """
    Run to completion and return final state with events.

    Args:
        prompt: User prompt
        initial_state: Optional initial state
        **state_kwargs: Additional state configuration

    Returns:
        Tuple of (final_state, events)
    """
    events: list[LoopEvent] = []

    async for event in self.run(prompt, initial_state, **state_kwargs):
        events.append(event)

    # Get final state from the loop
    state, _ = await self.loop.run_to_completion(prompt, initial_state, **state_kwargs)

    return state, events

BatchRunner

Bases: BaseModel

Run multiple prompts through the loop.

Supports parallel execution with concurrency limits.

run_batch async

run_batch(prompts: list[str], on_result: Callable[[str, AgentState, list[LoopEvent]], None] | None = None) -> list[tuple[str, AgentState, list[LoopEvent]]]

Run multiple prompts in parallel.

Parameters:

Name Type Description Default
prompts list[str]

List of prompts to process

required
on_result Callable[[str, AgentState, list[LoopEvent]], None] | None

Optional callback for each result

None

Returns:

Type Description
list[tuple[str, AgentState, list[LoopEvent]]]

List of (prompt, final_state, events) tuples

Source code in src/locus/loop/runner.py
async def run_batch(
    self,
    prompts: list[str],
    on_result: Callable[[str, AgentState, list[LoopEvent]], None] | None = None,
) -> list[tuple[str, AgentState, list[LoopEvent]]]:
    """
    Run multiple prompts in parallel.

    Args:
        prompts: List of prompts to process
        on_result: Optional callback for each result

    Returns:
        List of (prompt, final_state, events) tuples
    """
    semaphore = asyncio.Semaphore(self.max_concurrency)
    results: list[tuple[str, AgentState, list[LoopEvent]]] = []

    async def run_one(prompt: str) -> tuple[str, AgentState, list[LoopEvent]]:
        async with semaphore:
            state, events = await self.loop.run_to_completion(prompt)
            if on_result:
                on_result(prompt, state, events)
            return (prompt, state, events)

    tasks = [run_one(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)

    return list(results)

StreamingCollector

Bases: BaseModel

Collect events from a streaming loop run.

Useful for capturing events while also processing them.

is_complete property

is_complete: bool

Check if the loop has completed.

iterations property

iterations: int

Get number of iterations completed.

final_confidence property

final_confidence: float

Get final confidence score.

collect

collect(event: LoopEvent) -> None

Add an event to the collection.

Source code in src/locus/loop/runner.py
def collect(self, event: LoopEvent) -> None:
    """Add an event to the collection."""
    self.events.append(event)

    if event.event_type == "think":
        self.think_events.append(event)
    elif event.event_type in ("tool_start", "tool_complete"):
        self.tool_events.append(event)
    elif event.event_type == "reflect":
        self.reflect_events.append(event)
    elif event.event_type == "terminate":
        self.terminate_event = event

reset

reset() -> None

Reset the collector for reuse.

Source code in src/locus/loop/runner.py
def reset(self) -> None:
    """Reset the collector for reuse."""
    self.events = []
    self.think_events = []
    self.tool_events = []
    self.reflect_events = []
    self.terminate_event = None

create_runner

create_runner(model: ModelProtocol, registry: ToolRegistry, *, max_iterations: int = 20, confidence_threshold: float = 0.85, enable_reflection: bool = True, system_prompt: str | None = None, timeout: float | None = None, on_event: Callable[[LoopEvent], None] | None = None) -> LoopRunner

Factory function to create a configured LoopRunner.

Parameters:

Name Type Description Default
model ModelProtocol

LLM model for reasoning

required
registry ToolRegistry

Tool registry

required
max_iterations int

Maximum iterations

20
confidence_threshold float

Confidence for completion

0.85
enable_reflection bool

Enable reflection step

True
system_prompt str | None

System prompt

None
timeout float | None

Optional timeout in seconds

None
on_event Callable[[LoopEvent], None] | None

Optional event callback

None

Returns:

Type Description
LoopRunner

Configured LoopRunner

Source code in src/locus/loop/runner.py
def create_runner(
    model: ModelProtocol,
    registry: ToolRegistry,
    *,
    max_iterations: int = 20,
    confidence_threshold: float = 0.85,
    enable_reflection: bool = True,
    system_prompt: str | None = None,
    timeout: float | None = None,
    on_event: Callable[[LoopEvent], None] | None = None,
) -> LoopRunner:
    """
    Factory function to create a configured LoopRunner.

    Args:
        model: LLM model for reasoning
        registry: Tool registry
        max_iterations: Maximum iterations
        confidence_threshold: Confidence for completion
        enable_reflection: Enable reflection step
        system_prompt: System prompt
        timeout: Optional timeout in seconds
        on_event: Optional event callback

    Returns:
        Configured LoopRunner
    """
    config = ReActLoopConfig(
        max_iterations=max_iterations,
        confidence_threshold=confidence_threshold,
        enable_reflection=enable_reflection,
        system_prompt=system_prompt,
    )

    loop = ReActLoop(model=model, registry=registry, config=config)

    return LoopRunner(
        loop=loop,
        timeout=timeout,
        on_event=on_event,
    )