Skip to content

Multi-agent

Composition

SequentialPipeline

Bases: BaseModel

Run agents in order, passing each output as the next agent's prompt.

Each agent receives either the original task (first agent) or the previous agent's output (subsequent agents). A prompt_template can customize how the previous output is passed.

Example

pipeline = SequentialPipeline( ... agents=[researcher, writer, editor], ... prompt_template="Based on the following:\n{previous_output}\n\nOriginal task: {task}", ... ) result = await pipeline.run("Write about quantum computing")

run async

run(task: str) -> PipelineResult

Execute agents sequentially, chaining outputs.

Source code in src/locus/agent/composition.py
async def run(self, task: str) -> PipelineResult:
    """Execute agents sequentially, chaining outputs."""
    # Local import — observability is optional; emit is a no-op
    # outside an active run_context.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_PIPELINE_STAGE_COMPLETED,
        EV_PIPELINE_STAGE_STARTED,
        emit,
    )

    start_time = time.perf_counter()
    outputs: list[str] = []
    current_input = task

    try:
        for i, agent in enumerate(self.agents):
            if i > 0 and outputs:
                # Format prompt with previous output
                current_input = self.prompt_template.format(
                    previous_output=outputs[-1],
                    task=task,
                )

            stage_started = time.perf_counter()
            await emit(
                EV_PIPELINE_STAGE_STARTED,
                pipeline_kind="sequential",
                stage=i,
                stage_count=len(self.agents),
            )
            result = agent.run_sync(current_input)
            output = result.message or ""
            outputs.append(output)
            await emit(
                EV_PIPELINE_STAGE_COMPLETED,
                pipeline_kind="sequential",
                stage=i,
                output_length=len(output),
                duration_ms=(time.perf_counter() - stage_started) * 1000,
                success=True,
            )

        duration_ms = (time.perf_counter() - start_time) * 1000
        return PipelineResult(
            success=True,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return PipelineResult(
            success=False,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
            error=str(e),
        )

ParallelPipeline

Bases: BaseModel

Run agents concurrently and merge their results.

All agents receive the same task (or custom prompts via task_map). Results are collected and merged using the merge_strategy.

Example

pipeline = ParallelPipeline( ... agents=[fact_checker, analyst, summarizer], ... merge_strategy="concatenate", ... ) result = await pipeline.run("Analyze climate change impacts")

run async

run(task: str, task_map: dict[int, str] | None = None) -> PipelineResult

Execute agents in parallel and merge results.

Parameters:

Name Type Description Default
task str

Default task for all agents

required
task_map dict[int, str] | None

Optional mapping of agent index to custom task

None
Source code in src/locus/agent/composition.py
async def run(self, task: str, task_map: dict[int, str] | None = None) -> PipelineResult:
    """Execute agents in parallel and merge results.

    Args:
        task: Default task for all agents
        task_map: Optional mapping of agent index to custom task
    """
    from locus.observability.emit import (  # noqa: PLC0415
        EV_PIPELINE_FANOUT_COMPLETED,
        EV_PIPELINE_FANOUT_STARTED,
        emit,
    )

    start_time = time.perf_counter()
    await emit(
        EV_PIPELINE_FANOUT_STARTED,
        agent_count=len(self.agents),
        merge_strategy=self.merge_strategy,
    )

    async def run_agent(index: int, agent: Any) -> str:
        prompt = task_map.get(index, task) if task_map else task
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(None, agent.run_sync, prompt)
        return result.message or ""

    # ``return_exceptions=True`` so one stuck/failed agent doesn't
    # collapse the whole result into an empty ``outputs=[]`` (which
    # forced every caller into defensive ``if result.success`` and
    # ate which-agent-failed context). Now each slot in ``outputs``
    # is either the agent's reply text or the stringified exception,
    # and ``error`` summarises the per-agent failures.
    tasks = [run_agent(i, agent) for i, agent in enumerate(self.agents)]
    gathered = await asyncio.gather(*tasks, return_exceptions=True)

    outputs: list[str] = []
    agent_errors: list[str] = []
    for i, item in enumerate(gathered):
        if isinstance(item, BaseException):
            outputs.append("")
            agent_errors.append(f"agent[{i}] {type(item).__name__}: {item}")
        else:
            outputs.append(item)

    if self.merge_strategy == "last":
        final = outputs[-1] if outputs else ""
    else:
        final = self.separator.join(o for o in outputs if o)

    duration_ms = (time.perf_counter() - start_time) * 1000
    await emit(
        EV_PIPELINE_FANOUT_COMPLETED,
        agents_succeeded=len(self.agents) - len(agent_errors),
        agents_failed=len(agent_errors),
        duration_ms=duration_ms,
    )
    return PipelineResult(
        success=not agent_errors,
        outputs=outputs,
        final_output=final,
        duration_ms=duration_ms,
        error="; ".join(agent_errors) if agent_errors else None,
    )

LoopAgent

Bases: BaseModel

Run an agent repeatedly until a condition is met.

The agent is called in a loop. After each iteration, the condition function is called with the latest output. If it returns True, the loop stops. A max_loops limit prevents infinite execution.

Example

loop = LoopAgent( ... agent=editor, ... condition=lambda output: "APPROVED" in output, ... max_loops=5, ... loop_prompt="Review and improve:\n{previous_output}\n\nSay APPROVED if quality is good.", ... ) result = await loop.run("Write a haiku about Python")

run async

run(task: str) -> PipelineResult

Execute agent in a loop until condition is met or max_loops reached.

Source code in src/locus/agent/composition.py
async def run(self, task: str) -> PipelineResult:
    """Execute agent in a loop until condition is met or max_loops reached."""
    from locus.observability.emit import (  # noqa: PLC0415
        EV_LOOP_ITERATION_COMPLETED,
        EV_LOOP_ITERATION_STARTED,
        EV_LOOP_TERMINATED,
        emit,
    )

    start_time = time.perf_counter()
    outputs: list[str] = []
    current_input = task
    terminated_by = "max_loops"
    try:
        for i in range(self.max_loops):
            await emit(
                EV_LOOP_ITERATION_STARTED,
                iteration=i,
                max_loops=self.max_loops,
            )
            result = self.agent.run_sync(current_input)
            output = result.message or ""
            outputs.append(output)
            stopped = self.condition(output)
            await emit(
                EV_LOOP_ITERATION_COMPLETED,
                iteration=i,
                output_length=len(output),
                condition_met=stopped,
            )

            # Check termination condition
            if stopped:
                terminated_by = "condition"
                break

            # Prepare next iteration prompt
            if i < self.max_loops - 1:
                current_input = self.loop_prompt.format(
                    previous_output=output,
                    task=task,
                )

        duration_ms = (time.perf_counter() - start_time) * 1000
        await emit(
            EV_LOOP_TERMINATED,
            terminated_by=terminated_by,
            iterations=len(outputs),
            duration_ms=duration_ms,
        )
        return PipelineResult(
            success=True,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return PipelineResult(
            success=False,
            outputs=outputs,
            final_output=outputs[-1] if outputs else "",
            duration_ms=duration_ms,
            error=str(e),
        )

Orchestrator + Specialists

Orchestrator

Bases: BaseModel

Orchestrator for coordinating specialist agents.

Features: - Selects which specialists to invoke based on the task - Routes tasks to appropriate specialists - Correlates findings from multiple specialists - Summarizes results into a coherent response

register_specialist

register_specialist(specialist: Specialist) -> None

Register a specialist with the orchestrator.

Source code in src/locus/multiagent/orchestrator.py
def register_specialist(self, specialist: Specialist) -> None:
    """Register a specialist with the orchestrator."""
    self.specialists[specialist.id] = specialist

register_specialists

register_specialists(specialists: list[Specialist]) -> None

Register multiple specialists.

Source code in src/locus/multiagent/orchestrator.py
def register_specialists(self, specialists: list[Specialist]) -> None:
    """Register multiple specialists."""
    for specialist in specialists:
        self.register_specialist(specialist)

execute async

execute(task: str, context: dict[str, Any] | None = None) -> OrchestratorResult

Execute the orchestration workflow.

Parameters:

Name Type Description Default
task str

The task to process

required
context dict[str, Any] | None

Optional additional context

None

Returns:

Type Description
OrchestratorResult

OrchestratorResult with summary and all findings

Source code in src/locus/multiagent/orchestrator.py
async def execute(
    self,
    task: str,
    context: dict[str, Any] | None = None,
) -> OrchestratorResult:
    """
    Execute the orchestration workflow.

    Args:
        task: The task to process
        context: Optional additional context

    Returns:
        OrchestratorResult with summary and all findings
    """
    # Local import — keeps observability optional. If the user
    # never enters a run_context, ``emit`` is a no-op and the
    # bus singleton is never instantiated.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_ORCHESTRATOR_DECISION,
        EV_ORCHESTRATOR_ROUTING,
        EV_ORCHESTRATOR_SPECIALISTS_INVOKED,
        EV_ORCHESTRATOR_SUMMARY,
        emit,
    )

    start_time = time.perf_counter()
    decisions: list[RoutingDecision] = []

    try:
        await emit(
            EV_ORCHESTRATOR_ROUTING,
            orchestrator_id=self.id,
            task_preview=task[:160],
            specialist_count=len(self.specialists),
        )

        # Step 1: Make routing decision
        routing_decision = await self._make_routing_decision(task)
        decisions.append(routing_decision)

        await emit(
            EV_ORCHESTRATOR_DECISION,
            orchestrator_id=self.id,
            decision="invoke_specialist",
            specialists_selected=routing_decision.specialists,
            reasoning=routing_decision.reasoning,
        )

        # Step 2: Invoke specialists
        specialist_results = await self._invoke_specialists(task, routing_decision)

        await emit(
            EV_ORCHESTRATOR_SPECIALISTS_INVOKED,
            orchestrator_id=self.id,
            specialists_invoked=list(specialist_results.keys()),
            specialists_succeeded=[sid for sid, r in specialist_results.items() if r.success],
            specialists_failed=[sid for sid, r in specialist_results.items() if not r.success],
        )

        # Step 3: Correlate findings
        correlation_decision = RoutingDecision(
            decision_type="correlate",
            reasoning="Correlating findings from specialists",
        )
        decisions.append(correlation_decision)

        await emit(
            EV_ORCHESTRATOR_DECISION,
            orchestrator_id=self.id,
            decision="correlate",
            reasoning="Correlating specialist findings",
        )

        correlation = await self._correlate_findings(task, specialist_results)

        # Step 4: Summarize
        summary_decision = RoutingDecision(
            decision_type="summarize",
            reasoning="Generating final summary",
        )
        decisions.append(summary_decision)

        await emit(
            EV_ORCHESTRATOR_DECISION,
            orchestrator_id=self.id,
            decision="summarize",
            reasoning="Generating final summary",
        )

        summary = await self._summarize(task, correlation, specialist_results)
        await emit(
            EV_ORCHESTRATOR_SUMMARY,
            orchestrator_id=self.id,
            summary_length=len(summary or ""),
        )

        duration_ms = (time.perf_counter() - start_time) * 1000

        return OrchestratorResult(
            orchestrator_id=self.id,
            success=True,
            summary=summary,
            specialist_results=specialist_results,
            decisions=decisions,
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return OrchestratorResult(
            orchestrator_id=self.id,
            success=False,
            decisions=decisions,
            duration_ms=duration_ms,
            error=str(e),
        )

with_model

with_model(model: Any) -> Orchestrator

Return a copy of this orchestrator with the given model.

Source code in src/locus/multiagent/orchestrator.py
def with_model(self, model: Any) -> Orchestrator:
    """Return a copy of this orchestrator with the given model."""
    # Also update specialists with the model
    updated_specialists = {
        spec_id: spec.with_model(model) for spec_id, spec in self.specialists.items()
    }
    return self.model_copy(
        update={
            "model": model,
            "specialists": updated_specialists,
        }
    )

RoutingDecision

Bases: BaseModel

A decision made by the orchestrator.

create_orchestrator

create_orchestrator(name: str = 'Orchestrator', specialists: list[Specialist] | None = None, model: Any = None) -> Orchestrator

Create an orchestrator with the given specialists.

Parameters:

Name Type Description Default
name str

Orchestrator name

'Orchestrator'
specialists list[Specialist] | None

List of specialists to register

None
model Any

Model for decision making

None

Returns:

Type Description
Orchestrator

Configured Orchestrator instance

Source code in src/locus/multiagent/orchestrator.py
def create_orchestrator(
    name: str = "Orchestrator",
    specialists: list[Specialist] | None = None,
    model: Any = None,
) -> Orchestrator:
    """
    Create an orchestrator with the given specialists.

    Args:
        name: Orchestrator name
        specialists: List of specialists to register
        model: Model for decision making

    Returns:
        Configured Orchestrator instance
    """
    orchestrator = Orchestrator(name=name, model=model)

    if specialists:
        orchestrator.register_specialists(specialists)

    return orchestrator

Specialist

Bases: BaseModel

A specialist agent focused on a specific domain.

Features: - Domain-specific system prompt - Focused tool set - Optional playbook integration - Confidence-based execution

select_playbook

select_playbook(task: str) -> Playbook | None

Select the most appropriate playbook for a task.

Parameters:

Name Type Description Default
task str

The task description

required

Returns:

Type Description
Playbook | None

Best matching playbook or None

Source code in src/locus/multiagent/specialist.py
def select_playbook(self, task: str) -> Playbook | None:
    """
    Select the most appropriate playbook for a task.

    Args:
        task: The task description

    Returns:
        Best matching playbook or None
    """
    # Simple keyword matching - could be enhanced with embeddings
    task_lower = task.lower()

    best_match: Playbook | None = None
    best_score = 0

    for playbook in self.playbooks:
        # Count matching keywords
        score = 0
        playbook_words = set(playbook.name.lower().split())
        playbook_words.update(playbook.description.lower().split())

        for word in task_lower.split():
            if word in playbook_words:
                score += 1

        if score > best_score:
            best_score = score
            best_match = playbook

    return best_match

execute async

execute(task: str, context: dict[str, Any] | None = None, registry: ToolRegistry | None = None) -> SpecialistResult

Execute the specialist on a task.

Parameters:

Name Type Description Default
task str

The task to perform

required
context dict[str, Any] | None

Optional context from orchestrator or other specialists

None
registry ToolRegistry | None

Tool registry (uses self.tools if not provided)

None

Returns:

Type Description
SpecialistResult

SpecialistResult with output and confidence

Source code in src/locus/multiagent/specialist.py
async def execute(
    self,
    task: str,
    context: dict[str, Any] | None = None,
    registry: ToolRegistry | None = None,
) -> SpecialistResult:
    """
    Execute the specialist on a task.

    Args:
        task: The task to perform
        context: Optional context from orchestrator or other specialists
        registry: Tool registry (uses self.tools if not provided)

    Returns:
        SpecialistResult with output and confidence
    """
    if self.model is None:
        return SpecialistResult(
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            error="No model configured for specialist",
        )

    # Local import — observability is optional; this is a no-op
    # outside a run_context.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_SPECIALIST_COMPLETED,
        EV_SPECIALIST_STARTED,
        emit,
    )

    start_time = time.perf_counter()

    await emit(
        EV_SPECIALIST_STARTED,
        specialist_id=self.id,
        specialist_type=self.specialist_type,
        task_preview=task[:160],
    )

    # Construct the typed event for back-compat consumers that iterate
    # the agent loop's event stream. Stored under a leading underscore
    # because nothing currently consumes the local — the observability
    # emit above is the live publication path.
    _start_event = SpecialistStartEvent(
        specialist_id=self.id,
        specialist_type=self.specialist_type,
        task=task,
    )

    # Select appropriate playbook
    playbook = self.select_playbook(task)

    # Build system prompt
    system_prompt = self._build_system_prompt(task, playbook)

    # Initialize state
    state = AgentState(
        agent_id=self.id,
        max_iterations=self.max_iterations,
        confidence_threshold=self.confidence_threshold,
    )

    # Add system message
    state = state.with_message(Message.system(system_prompt))

    # Add context if provided
    if context:
        context_str = self._format_context(context)
        state = state.with_message(Message.user(context_str))

    # Add task message
    state = state.with_message(Message.user(task))

    # Get tool schemas
    tool_schemas = None
    if self.tools:
        tool_schemas = [tool.to_openai_schema() for tool in self.tools]

    try:
        # Simple single-turn execution for now
        # Full agentic loop would integrate with the main loop system
        response = await self.model.complete(
            messages=list(state.messages),
            tools=tool_schemas,
        )

        # Update state with response
        state = state.with_message(response.message)

        # Extract confidence from response (simple heuristic)
        confidence = self._estimate_confidence(response.message.content or "")

        duration_ms = (time.perf_counter() - start_time) * 1000

        await emit(
            EV_SPECIALIST_COMPLETED,
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            output_preview=(response.message.content or "")[:200],
            confidence=confidence,
            duration_ms=duration_ms,
            success=True,
        )

        # Local construction kept for parity with other typed-event sites.
        complete_event = SpecialistCompleteEvent(  # noqa: F841
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            result=response.message.content,
            confidence=confidence,
            duration_ms=duration_ms,
        )

        return SpecialistResult(
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            output=response.message.content,
            confidence=confidence,
            duration_ms=duration_ms,
            state=state,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        await emit(
            EV_SPECIALIST_COMPLETED,
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            error=str(e),
            duration_ms=duration_ms,
            success=False,
        )
        return SpecialistResult(
            specialist_id=self.id,
            specialist_type=self.specialist_type,
            error=str(e),
            duration_ms=duration_ms,
            state=state,
        )

with_model

with_model(model: Any) -> Specialist

Return a copy of this specialist with the given model.

Source code in src/locus/multiagent/specialist.py
def with_model(self, model: Any) -> Specialist:
    """Return a copy of this specialist with the given model."""
    return self.model_copy(update={"model": model})

Swarm

Swarm

Bases: BaseModel

A self-organizing swarm of agents.

Features: - Agents coordinate autonomously - Shared context/memory for communication - Dynamic task allocation based on capabilities - Parallel execution with coordination

add_agent

add_agent(agent: SwarmAgent) -> Swarm

Add an agent to the swarm.

If the swarm has a model configured and the incoming agent does not, the agent inherits the swarm's model. Without this, the agent's first work_on_task would fail with "No model configured for agent" — which used to be the most common silent-failure mode for new users of the swarm API.

Source code in src/locus/multiagent/swarm.py
def add_agent(self, agent: SwarmAgent) -> Swarm:
    """Add an agent to the swarm.

    If the swarm has a model configured and the incoming agent does
    not, the agent inherits the swarm's model. Without this, the
    agent's first ``work_on_task`` would fail with
    "No model configured for agent" — which used to be the most
    common silent-failure mode for new users of the swarm API.
    """
    if self.model is not None and agent.model is None:
        agent = agent.with_model(self.model)
    self.agents.append(agent)
    return self

add_task

add_task(description: str, priority: int = 0, parent_task_id: str | None = None, metadata: dict[str, Any] | None = None) -> SwarmTask

Add a task to the queue.

Source code in src/locus/multiagent/swarm.py
def add_task(
    self,
    description: str,
    priority: int = 0,
    parent_task_id: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> SwarmTask:
    """Add a task to the queue."""
    task = SwarmTask(
        description=description,
        priority=priority,
        parent_task_id=parent_task_id,
        metadata=metadata or {},
    )
    self.task_queue.append(task)
    # Sort by priority (higher first)
    self.task_queue.sort(key=lambda t: -t.priority)
    return task

execute async

execute(initial_task: str | None = None, decompose_tasks: bool = True) -> SwarmResult

Execute the swarm.

Parameters:

Name Type Description Default
initial_task str | None

Optional initial task to add

None
decompose_tasks bool

Whether to decompose tasks into subtasks

True

Returns:

Type Description
SwarmResult

SwarmResult with all completed work

Source code in src/locus/multiagent/swarm.py
async def execute(
    self,
    initial_task: str | None = None,
    decompose_tasks: bool = True,
) -> SwarmResult:
    """
    Execute the swarm.

    Args:
        initial_task: Optional initial task to add
        decompose_tasks: Whether to decompose tasks into subtasks

    Returns:
        SwarmResult with all completed work
    """
    start_time = time.perf_counter()

    # Add initial task if provided
    if initial_task:
        main_task = self.add_task(initial_task, priority=10)

        # Optionally decompose into subtasks
        if decompose_tasks:
            await self._generate_subtasks(main_task)

    try:
        semaphore = asyncio.Semaphore(self.max_parallel_agents)

        async def run_with_limit(agent: SwarmAgent) -> list[SwarmTask]:
            async with semaphore:
                return await self._run_agent_loop(agent)

        # Group tasks by priority and run in waves (high priority first).
        # Tasks within the same priority wave run in parallel.
        # Lower-priority waves wait for higher-priority waves to complete,
        # so they can see the earlier findings in SharedContext.
        iteration = 0
        all_completed: list[SwarmTask] = []

        # Get unique priority levels (sorted descending = highest first)
        priority_levels = sorted({t.priority for t in self.task_queue}, reverse=True)

        for priority in priority_levels:
            if iteration >= self.max_iterations:
                break

            # Check if there are pending tasks at this priority
            pending_at_level = [
                t
                for t in self.task_queue
                if t.status == TaskStatus.PENDING and t.priority == priority
            ]
            if not pending_at_level:
                continue

            # Run agents for this priority wave.
            # Agents run sequentially to avoid concurrent API issues
            # (some providers return empty responses under parallel load).
            # Each agent claims one task, completes it, then the next agent goes.
            results = []
            for agent in self.agents:
                agent_results = await self._run_agent_loop(agent)
                results.append(agent_results)

            for completed_list in results:
                all_completed.extend(completed_list)

            iteration += 1

        # Handle any remaining pending tasks (fallback)
        remaining = [t for t in self.task_queue if t.status == TaskStatus.PENDING]
        while remaining and iteration < self.max_iterations:
            tasks = [run_with_limit(agent) for agent in self.agents]
            results = await asyncio.gather(*tasks)
            for completed_list in results:
                all_completed.extend(completed_list)
            remaining = [t for t in self.task_queue if t.status == TaskStatus.PENDING]
            iteration += 1

        # Collect results
        completed = [t for t in self.task_queue if t.status == TaskStatus.COMPLETED]
        failed = [t for t in self.task_queue if t.status == TaskStatus.FAILED]

        # Generate summary
        summary = await self._generate_summary()

        duration_ms = (time.perf_counter() - start_time) * 1000

        return SwarmResult(
            swarm_id=self.id,
            success=len(failed) == 0,
            completed_tasks=completed,
            failed_tasks=failed,
            context=self.context,
            summary=summary,
            duration_ms=duration_ms,
        )

    except Exception as e:  # noqa: BLE001
        duration_ms = (time.perf_counter() - start_time) * 1000
        return SwarmResult(
            swarm_id=self.id,
            success=False,
            duration_ms=duration_ms,
            error=str(e),
        )

with_model

with_model(model: Any) -> Swarm

Return a copy with the given model for all agents.

Source code in src/locus/multiagent/swarm.py
def with_model(self, model: Any) -> Swarm:
    """Return a copy with the given model for all agents."""
    updated_agents = [agent.with_model(model) for agent in self.agents]
    return self.model_copy(
        update={
            "model": model,
            "agents": updated_agents,
        }
    )

SharedContext

Bases: BaseModel

Shared context/memory for swarm agents.

All agents can read from and write to this shared state.

add_finding async

add_finding(key: str, value: Any, agent_id: str) -> None

Add a finding to the shared context.

Source code in src/locus/multiagent/swarm.py
async def add_finding(self, key: str, value: Any, agent_id: str) -> None:
    """Add a finding to the shared context."""
    async with self._lock:
        self.findings[key] = value
        self.discovery_log.append(
            {
                "type": "finding",
                "key": key,
                "value": value,
                "agent_id": agent_id,
                "timestamp": datetime.now(UTC).isoformat(),
            }
        )

post_to_blackboard async

post_to_blackboard(key: str, message: str, agent_id: str) -> None

Post a message to the blackboard.

Source code in src/locus/multiagent/swarm.py
async def post_to_blackboard(self, key: str, message: str, agent_id: str) -> None:
    """Post a message to the blackboard."""
    async with self._lock:
        self.blackboard[key] = message
        self.discovery_log.append(
            {
                "type": "blackboard",
                "key": key,
                "message": message,
                "agent_id": agent_id,
                "timestamp": datetime.now(UTC).isoformat(),
            }
        )

record_task_result async

record_task_result(task_id: str, result: str) -> None

Record a task result.

Source code in src/locus/multiagent/swarm.py
async def record_task_result(self, task_id: str, result: str) -> None:
    """Record a task result."""
    async with self._lock:
        self.task_results[task_id] = result

get_summary

get_summary() -> str

Get a summary of the current context state.

Source code in src/locus/multiagent/swarm.py
def get_summary(self) -> str:
    """Get a summary of the current context state."""
    lines = ["## Shared Context Summary"]

    if self.findings:
        lines.append("\n### Findings:")
        for key, value in self.findings.items():
            lines.append(f"- **{key}**: {value}")

    if self.task_results:
        lines.append("\n### Task Results:")
        for tid, result in self.task_results.items():
            snippet = result if len(result) <= 600 else result[:600].rstrip() + "…"
            lines.append(f"- **{tid}**:\n{snippet}")

    if self.blackboard:
        lines.append("\n### Blackboard Messages:")
        for key, msg in self.blackboard.items():
            lines.append(f"- **{key}**: {msg}")

    if len(self.discovery_log) > 5:
        lines.append(f"\n### Recent Activity: ({len(self.discovery_log)} total entries)")
        for entry in self.discovery_log[-5:]:
            lines.append(f"- [{entry['type']}] {entry.get('key', 'unknown')}")

    return "\n".join(lines)

SwarmAgent

Bases: BaseModel

An agent in the swarm.

Autonomously claims and works on tasks from the shared queue.

can_handle

can_handle(task: SwarmTask) -> bool

Decide whether this agent is eligible to claim task.

Resolution order:

  1. Tag set-membership — if the task declares required_tags, every required tag must appear in :attr:capabilities. This is the deterministic path — the swarm's primary discovery mechanism.
  2. Generalist — if neither side declares anything, the agent claims the task (capabilities=[] ⇒ generalist).
  3. Substring fallback — kept for backwards compatibility with pre-tag swarms: if the agent has capabilities but the task has no tags, match keywords against the description.
Source code in src/locus/multiagent/swarm.py
def can_handle(self, task: SwarmTask) -> bool:
    """Decide whether this agent is eligible to claim ``task``.

    Resolution order:

    1. **Tag set-membership** — if the task declares
       ``required_tags``, every required tag must appear in
       :attr:`capabilities`. This is the deterministic path —
       the swarm's primary discovery mechanism.
    2. **Generalist** — if neither side declares anything,
       the agent claims the task (``capabilities=[]`` ⇒
       generalist).
    3. **Substring fallback** — kept for backwards compatibility
       with pre-tag swarms: if the agent has capabilities but the
       task has no tags, match keywords against the description.
    """
    if task.required_tags:
        agent_tags = {c.lower() for c in self.capabilities}
        return all(req.lower() in agent_tags for req in task.required_tags)
    if not self.capabilities:
        return True
    task_lower = task.description.lower()
    return any(cap.lower() in task_lower for cap in self.capabilities)

priority_for_task

priority_for_task(task: SwarmTask) -> float

Score this agent's fit for task in the range [0, 1].

Tag-driven scoring (when the task declares tags):

  • Each required tag the agent advertises adds 1.0 weight.
  • Each preferred tag the agent advertises adds 0.5 weight.
  • Score = (sum of weights) / (max possible weights), clamped.

Falls through to the legacy substring score for tag-less tasks so pre-tag swarms keep their old behaviour.

Source code in src/locus/multiagent/swarm.py
def priority_for_task(self, task: SwarmTask) -> float:
    """Score this agent's fit for ``task`` in the range ``[0, 1]``.

    Tag-driven scoring (when the task declares tags):

    - Each required tag the agent advertises adds 1.0 weight.
    - Each preferred tag the agent advertises adds 0.5 weight.
    - Score = (sum of weights) / (max possible weights), clamped.

    Falls through to the legacy substring score for tag-less
    tasks so pre-tag swarms keep their old behaviour.
    """
    if task.required_tags or task.preferred_tags:
        agent_tags = {c.lower() for c in self.capabilities}
        req_hits = sum(1 for t in task.required_tags if t.lower() in agent_tags)
        pref_hits = sum(1 for t in task.preferred_tags if t.lower() in agent_tags)
        max_weight = float(len(task.required_tags)) + 0.5 * len(task.preferred_tags)
        if max_weight == 0:
            return 0.5
        return min(1.0, (req_hits + 0.5 * pref_hits) / max_weight)
    if not self.capabilities:
        return 0.5
    task_lower = task.description.lower()
    matches = sum(1 for cap in self.capabilities if cap.lower() in task_lower)
    return min(1.0, matches / len(self.capabilities))

work_on_task async

work_on_task(task: SwarmTask, context: SharedContext) -> tuple[str | None, str | None]

Work on a task using the shared context.

Parameters:

Name Type Description Default
task SwarmTask

The task to work on

required
context SharedContext

Shared context with other agents

required

Returns:

Type Description
tuple[str | None, str | None]

Tuple of (result, error)

Source code in src/locus/multiagent/swarm.py
    async def work_on_task(
        self,
        task: SwarmTask,
        context: SharedContext,
    ) -> tuple[str | None, str | None]:
        """
        Work on a task using the shared context.

        Args:
            task: The task to work on
            context: Shared context with other agents

        Returns:
            Tuple of (result, error)
        """
        if self.model is None:
            return None, "No model configured for agent"

        # Build prompt with context
        context_summary = context.get_summary()

        prompt = f"""## Your Role
{self.system_prompt}

## Shared Context
{context_summary}

## Task
{task.description}

## Instructions
1. Analyze the task and shared context
2. If you discover new findings, note them clearly
3. If you need information from other agents, post a request to the blackboard
4. Complete the task to the best of your ability
5. Report your findings clearly

Format your response as:
### Findings
(Any new discoveries)

### Analysis
(Your analysis and conclusions)

### Blackboard (optional)
(Any messages for other agents)"""

        messages = [
            Message.system("You are a collaborative agent in a swarm."),
            Message.user(prompt),
        ]

        try:
            response = await self.model.complete(messages=messages)
            content = response.message.content or ""

            # Extract findings and update context
            await self._extract_and_share(content, context, task.id)

            return content, None

        except Exception as e:  # noqa: BLE001
            return None, str(e)

with_model

with_model(model: Any) -> SwarmAgent

Return a copy with the given model.

Source code in src/locus/multiagent/swarm.py
def with_model(self, model: Any) -> SwarmAgent:
    """Return a copy with the given model."""
    return self.model_copy(update={"model": model})

SwarmTask

Bases: BaseModel

A task in the swarm task queue.

required_tags declares the capability tags an agent must advertise to claim this task — set-membership against :attr:SwarmAgent.capabilities. Empty means any agent may claim it; preferred_tags boost an agent's priority score without being a hard requirement.

Backwards-compat: agents that pre-date this change still work — if no tags are set, the prior substring-match path runs as a fallback (see SwarmAgent.can_handle).

SwarmResult

Bases: BaseModel

Result from swarm execution.

Handoff

Handoff

Bases: BaseModel

Manages handoffs between agents.

Features: - Context transfer between agents - State preservation - Handoff event emission - Chain of custody tracking

register_agent

register_agent(agent: HandoffAgent) -> None

Register an agent for handoffs.

Source code in src/locus/multiagent/handoff.py
def register_agent(self, agent: HandoffAgent) -> None:
    """Register an agent for handoffs."""
    self.agents[agent.id] = agent

register_agents

register_agents(agents: list[HandoffAgent]) -> None

Register multiple agents.

Source code in src/locus/multiagent/handoff.py
def register_agents(self, agents: list[HandoffAgent]) -> None:
    """Register multiple agents."""
    for agent in agents:
        self.register_agent(agent)

create_handoff async

create_handoff(source_agent: HandoffAgent, target_agent_id: str, task: str, reason: HandoffReason, state: AgentState | None = None, findings: dict[str, Any] | None = None, instructions: str | None = None) -> HandoffContext

Create a handoff context.

Parameters:

Name Type Description Default
source_agent HandoffAgent

The agent initiating the handoff

required
target_agent_id str

ID of the target agent

required
task str

The original task

required
reason HandoffReason

Reason for the handoff

required
state AgentState | None

Current agent state

None
findings dict[str, Any] | None

Findings to transfer

None
instructions str | None

Specific instructions for target

None

Returns:

Type Description
HandoffContext

HandoffContext for the target agent

Source code in src/locus/multiagent/handoff.py
async def create_handoff(
    self,
    source_agent: HandoffAgent,
    target_agent_id: str,
    task: str,
    reason: HandoffReason,
    state: AgentState | None = None,
    findings: dict[str, Any] | None = None,
    instructions: str | None = None,
) -> HandoffContext:
    """
    Create a handoff context.

    Args:
        source_agent: The agent initiating the handoff
        target_agent_id: ID of the target agent
        task: The original task
        reason: Reason for the handoff
        state: Current agent state
        findings: Findings to transfer
        instructions: Specific instructions for target

    Returns:
        HandoffContext for the target agent
    """
    # Extract information from state
    key_messages: list[Message] = []
    state_snapshot: dict[str, Any] = {}
    conversation_summary: str | None = None
    confidence = 0.0

    if state:
        key_messages = self._extract_key_messages(state)
        conversation_summary = self._summarize_conversation(list(state.messages))
        state_snapshot = {
            "iteration": state.iteration,
            "tool_history": list(state.tool_history[-5:]),
            "errors": list(state.errors[-3:]),
        }
        confidence = state.confidence

    # Build handoff chain
    handoff_chain = [source_agent.id]
    if self.history:
        last_context = self.history[-1]
        if last_context.target_agent_id == source_agent.id:
            handoff_chain = last_context.handoff_chain + [source_agent.id]

    context = HandoffContext(
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        reason=reason,
        original_task=task,
        conversation_summary=conversation_summary,
        key_messages=key_messages if self.preserve_full_history else [],
        state_snapshot=state_snapshot,
        findings=findings or {},
        confidence=confidence,
        instructions=instructions,
        handoff_chain=handoff_chain,
    )

    self.history.append(context)

    return context

execute_handoff async

execute_handoff(source_agent: HandoffAgent, target_agent_id: str, task: str, reason: HandoffReason, state: AgentState | None = None, findings: dict[str, Any] | None = None, instructions: str | None = None) -> HandoffResult

Execute a complete handoff.

Parameters:

Name Type Description Default
source_agent HandoffAgent

The agent initiating the handoff

required
target_agent_id str

ID of the target agent

required
task str

The original task

required
reason HandoffReason

Reason for the handoff

required
state AgentState | None

Current agent state

None
findings dict[str, Any] | None

Findings to transfer

None
instructions str | None

Specific instructions for target

None

Returns:

Type Description
HandoffResult

HandoffResult from the target agent

Source code in src/locus/multiagent/handoff.py
async def execute_handoff(
    self,
    source_agent: HandoffAgent,
    target_agent_id: str,
    task: str,
    reason: HandoffReason,
    state: AgentState | None = None,
    findings: dict[str, Any] | None = None,
    instructions: str | None = None,
) -> HandoffResult:
    """
    Execute a complete handoff.

    Args:
        source_agent: The agent initiating the handoff
        target_agent_id: ID of the target agent
        task: The original task
        reason: Reason for the handoff
        state: Current agent state
        findings: Findings to transfer
        instructions: Specific instructions for target

    Returns:
        HandoffResult from the target agent
    """
    # Validate target exists
    target_agent = self.agents.get(target_agent_id)
    if target_agent is None:
        return HandoffResult(
            handoff_id="",
            success=False,
            source_agent_id=source_agent.id,
            target_agent_id=target_agent_id,
            error=f"Target agent not found: {target_agent_id}",
        )

    # Check handoff chain limit
    chain_length = len(self.history)
    if chain_length >= self.max_handoff_chain:
        return HandoffResult(
            handoff_id="",
            success=False,
            source_agent_id=source_agent.id,
            target_agent_id=target_agent_id,
            error=f"Maximum handoff chain length ({self.max_handoff_chain}) exceeded",
        )

    # Create handoff context
    context = await self.create_handoff(
        source_agent=source_agent,
        target_agent_id=target_agent_id,
        task=task,
        reason=reason,
        state=state,
        findings=findings,
        instructions=instructions,
    )

    # Local import — observability is optional. The emit calls
    # below are no-ops when there is no run_context active.
    from locus.observability.emit import (  # noqa: PLC0415
        EV_HANDOFF_COMPLETED,
        EV_HANDOFF_INITIATED,
        emit,
    )

    await emit(
        EV_HANDOFF_INITIATED,
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        reason=getattr(reason, "value", str(reason)),
        context_summary=context.progress_summary,
    )

    # Construct the typed event for back-compat consumers — emit
    # above is the live publication path.
    _handoff_event = HandoffEvent(  # noqa: F841
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        reason=reason,
        context_summary=context.progress_summary,
    )

    # Execute handoff
    result = await target_agent.receive_handoff(context)
    result.returned_context = context

    await emit(
        EV_HANDOFF_COMPLETED,
        source_agent_id=source_agent.id,
        target_agent_id=target_agent_id,
        success=result.success,
        output_length=len(result.output or ""),
    )

    return result

chain_handoff async

chain_handoff(agent_chain: list[str], task: str, initial_state: AgentState | None = None) -> list[HandoffResult]

Execute a chain of handoffs through multiple agents.

Parameters:

Name Type Description Default
agent_chain list[str]

List of agent IDs to process through

required
task str

The task to process

required
initial_state AgentState | None

Initial state

None

Returns:

Type Description
list[HandoffResult]

List of results from each handoff

Source code in src/locus/multiagent/handoff.py
async def chain_handoff(
    self,
    agent_chain: list[str],
    task: str,
    initial_state: AgentState | None = None,
) -> list[HandoffResult]:
    """
    Execute a chain of handoffs through multiple agents.

    Args:
        agent_chain: List of agent IDs to process through
        task: The task to process
        initial_state: Initial state

    Returns:
        List of results from each handoff
    """
    results: list[HandoffResult] = []
    current_state = initial_state
    current_findings: dict[str, Any] = {}

    for i in range(len(agent_chain) - 1):
        source_id = agent_chain[i]
        target_id = agent_chain[i + 1]

        source_agent = self.agents.get(source_id)
        if source_agent is None:
            results.append(
                HandoffResult(
                    handoff_id="",
                    success=False,
                    source_agent_id=source_id,
                    target_agent_id=target_id,
                    error=f"Source agent not found: {source_id}",
                )
            )
            break

        result = await self.execute_handoff(
            source_agent=source_agent,
            target_agent_id=target_id,
            task=task,
            reason=HandoffReason.DELEGATION,
            state=current_state,
            findings=current_findings,
        )

        results.append(result)

        if not result.success:
            break

        # Update findings for next handoff
        if result.output:
            current_findings[f"from_{source_id}"] = result.output

    return results

HandoffContext

Bases: BaseModel

Context transferred during a handoff.

Contains all information needed for the target agent to continue.

to_prompt

to_prompt() -> str

Convert handoff context to a prompt for the target agent.

Source code in src/locus/multiagent/handoff.py
def to_prompt(self) -> str:
    """Convert handoff context to a prompt for the target agent."""
    lines = [
        "## Handoff Context",
        "",
        f"**Reason:** {self.reason.value}",
        f"**From:** {self.source_agent_id}",
        f"**Confidence so far:** {self.confidence:.2f}",
        "",
        "### Original Task",
        self.original_task,
        "",
    ]

    if self.progress_summary:
        lines.extend(
            [
                "### Progress So Far",
                self.progress_summary,
                "",
            ]
        )

    if self.findings:
        lines.append("### Findings")
        for key, value in self.findings.items():
            lines.append(f"- **{key}:** {value}")
        lines.append("")

    if self.conversation_summary:
        lines.extend(
            [
                "### Conversation Summary",
                self.conversation_summary,
                "",
            ]
        )

    if self.instructions:
        lines.extend(
            [
                "### Instructions",
                self.instructions,
                "",
            ]
        )

    if self.handoff_chain:
        lines.extend(
            [
                "### Handoff Chain",
                " -> ".join(self.handoff_chain + [self.target_agent_id]),
                "",
            ]
        )

    return "\n".join(lines)

HandoffReason

Bases: StrEnum

Reason for a handoff between agents.

create_handoff_agent

create_handoff_agent(name: str, description: str = '', system_prompt: str = '', tools: list[Tool] | None = None, model: Any = None) -> HandoffAgent

Create a handoff-capable agent.

Parameters:

Name Type Description Default
name str

Agent name

required
description str

Agent description

''
system_prompt str

System prompt

''
tools list[Tool] | None

Available tools

None
model Any

Model for the agent

None

Returns:

Type Description
HandoffAgent

Configured HandoffAgent

Source code in src/locus/multiagent/handoff.py
def create_handoff_agent(
    name: str,
    description: str = "",
    system_prompt: str = "",
    tools: list[Tool] | None = None,
    model: Any = None,
) -> HandoffAgent:
    """
    Create a handoff-capable agent.

    Args:
        name: Agent name
        description: Agent description
        system_prompt: System prompt
        tools: Available tools
        model: Model for the agent

    Returns:
        Configured HandoffAgent
    """
    return HandoffAgent(
        name=name,
        description=description,
        system_prompt=system_prompt,
        tools=tools or [],
        model=model,
    )

create_handoff_manager

create_handoff_manager(agents: list[HandoffAgent] | None = None, max_chain: int = 5) -> Handoff

Create a handoff manager.

Parameters:

Name Type Description Default
agents list[HandoffAgent] | None

Agents to register

None
max_chain int

Maximum handoff chain length

5

Returns:

Type Description
Handoff

Configured Handoff manager

Source code in src/locus/multiagent/handoff.py
def create_handoff_manager(
    agents: list[HandoffAgent] | None = None,
    max_chain: int = 5,
) -> Handoff:
    """
    Create a handoff manager.

    Args:
        agents: Agents to register
        max_chain: Maximum handoff chain length

    Returns:
        Configured Handoff manager
    """
    manager = Handoff(max_handoff_chain=max_chain)

    if agents:
        manager.register_agents(agents)

    return manager

StateGraph

StateGraph

Bases: BaseModel

A stateful graph for workflow execution.

Supports: - Conditional edges with dynamic routing - Cycles (optional, with max iteration limit) - Human-in-the-loop interrupts - Map-reduce via Send - Subgraph composition - State reducers for composable updates

model_post_init

model_post_init(__context: Any) -> None

Initialize after model creation.

Source code in src/locus/multiagent/graph.py
def model_post_init(self, __context: Any) -> None:
    """Initialize after model creation."""
    # Add virtual START and END nodes
    if START not in self.nodes:
        self.nodes[START] = Node(
            id=START,
            name="START",
            executor=lambda x: x,  # Pass-through
        )
    if END not in self.nodes:
        self.nodes[END] = Node(
            id=END,
            name="END",
            executor=lambda x: x,  # Pass-through
        )

    # Extract reducers from state schema
    if self.state_schema:
        from locus.core.reducers import extract_reducers_from_model

        self._reducers = extract_reducers_from_model(self.state_schema)

    self._rebuild_adjacency()

set_entry_point

set_entry_point(node_id: str) -> StateGraph

Set the entry point node (after START).

Source code in src/locus/multiagent/graph.py
def set_entry_point(self, node_id: str) -> StateGraph:
    """Set the entry point node (after START)."""
    if node_id not in self.nodes:
        raise ValueError(f"Node not found: {node_id}")
    self._entry_point = node_id
    # Add edge from START to entry point
    self.add_edge(START, node_id)
    return self

set_finish_point

set_finish_point(node_id: str) -> StateGraph

Set a finish point node (before END).

Source code in src/locus/multiagent/graph.py
def set_finish_point(self, node_id: str) -> StateGraph:
    """Set a finish point node (before END)."""
    if node_id not in self.nodes:
        raise ValueError(f"Node not found: {node_id}")
    self.add_edge(node_id, END)
    return self

add_node

add_node(node_id: str | Node, executor: Callable[..., Any] | StateGraph | None = None, *, description: str = '', condition: Callable[[dict[str, Any]], bool] | None = None, max_retries: int = 0, timeout_ms: float | None = None, retry_policy: RetryPolicy | None = None, cache_policy: CachePolicy | None = None, defer: bool = False) -> StateGraph

Add a node to the graph.

Parameters:

Name Type Description Default
node_id str | Node

Unique identifier for the node, or a Node object (for backward compatibility)

required
executor Callable[..., Any] | StateGraph | None

Function or subgraph to execute (optional if node_id is a Node)

None
description str

Node description

''
condition Callable[[dict[str, Any]], bool] | None

Optional condition for execution

None
max_retries int

Retry attempts on failure

0
timeout_ms float | None

Execution timeout

None

Returns:

Type Description
StateGraph

Self for chaining

Source code in src/locus/multiagent/graph.py
def add_node(
    self,
    node_id: str | Node,
    executor: Callable[..., Any] | StateGraph | None = None,
    *,
    description: str = "",
    condition: Callable[[dict[str, Any]], bool] | None = None,
    max_retries: int = 0,
    timeout_ms: float | None = None,
    retry_policy: RetryPolicy | None = None,
    cache_policy: CachePolicy | None = None,
    defer: bool = False,
) -> StateGraph:
    """
    Add a node to the graph.

    Args:
        node_id: Unique identifier for the node, or a Node object (for backward compatibility)
        executor: Function or subgraph to execute (optional if node_id is a Node)
        description: Node description
        condition: Optional condition for execution
        max_retries: Retry attempts on failure
        timeout_ms: Execution timeout

    Returns:
        Self for chaining
    """
    # Support old API: add_node(Node)
    if isinstance(node_id, Node):
        node = node_id
        if node.id in self.nodes:
            raise ValueError(f"Node already exists: {node.id}")
        self.nodes[node.id] = node
        self._rebuild_adjacency()
        return self

    # New API: add_node(node_id, executor)
    if executor is None:
        raise TypeError("add_node() missing 1 required positional argument: 'executor'")

    if node_id in self.nodes:
        raise ValueError(f"Node already exists: {node_id}")

    # Check if executor is a subgraph
    is_subgraph = isinstance(executor, StateGraph)

    # When ``is_subgraph`` is True, ``executor`` is a StateGraph
    # instance; mypy can't narrow the union without an isinstance
    # check, but the precondition is enforced upstream.
    node_executor: Callable[..., Any] = (
        executor.execute  # type: ignore[union-attr]
        if is_subgraph
        else executor  # type: ignore[assignment]
    )
    node = Node(
        id=node_id,
        name=node_id,
        description=description,
        executor=node_executor,
        condition=condition,
        max_retries=max_retries,
        timeout_ms=timeout_ms,
        retry_policy=retry_policy,
        cache_policy=cache_policy,
        defer=defer,
        is_subgraph=is_subgraph,
        subgraph=executor if is_subgraph else None,
    )
    self.nodes[node_id] = node
    self._rebuild_adjacency()
    return self

add_edge

add_edge(source: str | Node, target: str | Node, key_mapping: dict[str, str] | None = None, transform: Callable[[Any], Any] | None = None) -> StateGraph

Add a directed edge between nodes.

Parameters:

Name Type Description Default
source str | Node

Source node ID or Node object

required
target str | Node

Target node ID or Node object

required
key_mapping dict[str, str] | None

Optional key mapping for data transformation

None
transform Callable[[Any], Any] | None

Optional transform function

None

Returns:

Type Description
StateGraph

Self for chaining

Source code in src/locus/multiagent/graph.py
def add_edge(
    self,
    source: str | Node,
    target: str | Node,
    key_mapping: dict[str, str] | None = None,
    transform: Callable[[Any], Any] | None = None,
) -> StateGraph:
    """
    Add a directed edge between nodes.

    Args:
        source: Source node ID or Node object
        target: Target node ID or Node object
        key_mapping: Optional key mapping for data transformation
        transform: Optional transform function

    Returns:
        Self for chaining
    """
    # Support old API: add_edge(Node, Node)
    source_id = source.id if isinstance(source, Node) else source
    target_id = target.id if isinstance(target, Node) else target

    # Allow START and END as valid nodes
    valid_sources = set(self.nodes.keys()) | {START}
    valid_targets = set(self.nodes.keys()) | {END}

    if source_id not in valid_sources:
        raise ValueError(f"Source node not found: {source_id}")
    if target_id not in valid_targets:
        raise ValueError(f"Target node not found: {target_id}")

    edge = Edge(
        source_id=source_id,
        target_id=target_id,
        key_mapping=key_mapping,
        transform=transform,
    )
    self.edges.append(edge)
    self._rebuild_adjacency()

    # Validate no cycles (unless allowed)
    if not self.config.allow_cycles and self._has_cycle():
        self.edges.pop()
        self._rebuild_adjacency()
        raise ValueError(f"Adding edge {source_id} -> {target_id} would create a cycle")

    return self

add_conditional_edges

add_conditional_edges(source: str, router: Callable[[dict[str, Any]], str | list[str]], targets: dict[str, str] | None = None, default: str | None = None) -> StateGraph

Add conditional edges with dynamic routing.

Parameters:

Name Type Description Default
source str

Source node ID

required
router Callable[[dict[str, Any]], str | list[str]]

Function that returns target node ID(s) based on state

required
targets dict[str, str] | None

Optional mapping from router return values to node IDs

None
default str | None

Default target if router returns unmapped value

None

Returns:

Type Description
StateGraph

Self for chaining

Example

def route_by_type(state): return "error" if state["has_error"] else "success"

graph.add_conditional_edges("check", route_by_type, { "error": "handle_error", "success": "continue" })

Source code in src/locus/multiagent/graph.py
def add_conditional_edges(
    self,
    source: str,
    router: Callable[[dict[str, Any]], str | list[str]],
    targets: dict[str, str] | None = None,
    default: str | None = None,
) -> StateGraph:
    """
    Add conditional edges with dynamic routing.

    Args:
        source: Source node ID
        router: Function that returns target node ID(s) based on state
        targets: Optional mapping from router return values to node IDs
        default: Default target if router returns unmapped value

    Returns:
        Self for chaining

    Example:
        def route_by_type(state):
            return "error" if state["has_error"] else "success"

        graph.add_conditional_edges("check", route_by_type, {
            "error": "handle_error",
            "success": "continue"
        })
    """
    if source not in self.nodes:
        raise ValueError(f"Source node not found: {source}")

    cond_edge = ConditionalEdge(
        source_id=source,
        router=router,
        targets=targets or {},
        default_target=default,
    )
    self.conditional_edges.append(cond_edge)
    return self

execute async

execute(inputs: dict[str, Any] | Command | None = None, *, config: GraphConfig | None = None, _event_sink: Any = None) -> GraphResult

Execute the graph.

Parameters:

Name Type Description Default
inputs dict[str, Any] | Command | None

Initial state or Command (for resume)

None
config GraphConfig | None

Optional execution configuration

None
_event_sink Any

Internal-use only. When provided, an async callable (StreamEvent) -> None invoked after each node completes so :meth:stream can yield intermediate events. Public callers should use :meth:stream instead of touching this.

None

Returns:

Type Description
GraphResult

GraphResult with final state and outputs

Source code in src/locus/multiagent/graph.py
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
async def execute(
    self,
    inputs: dict[str, Any] | Command | None = None,
    *,
    config: GraphConfig | None = None,
    _event_sink: Any = None,
) -> GraphResult:
    """
    Execute the graph.

    Args:
        inputs: Initial state or Command (for resume)
        config: Optional execution configuration
        _event_sink: Internal-use only. When provided, an async callable
            ``(StreamEvent) -> None`` invoked after each node completes
            so :meth:`stream` can yield intermediate events. Public
            callers should use :meth:`stream` instead of touching this.

    Returns:
        GraphResult with final state and outputs
    """
    start_time = datetime.now(UTC)
    cfg = config or self.config

    # Handle resume from interrupt
    resume_value = None
    resume_node = None
    if isinstance(inputs, Command) and inputs.has_resume:
        resume_value = inputs.resume
        # Load state from checkpointer if available
        if cfg.checkpointer and cfg.thread_id:
            saved_state = await cfg.checkpointer.load(cfg.thread_id)
            if saved_state:
                inputs = saved_state.metadata.get("graph_state", {})
                resume_node = saved_state.metadata.get("interrupted_node")
                from locus.observability.emit import (  # noqa: PLC0415
                    EV_CHECKPOINT_LOADED,
                    emit,
                )

                await emit(
                    EV_CHECKPOINT_LOADED,
                    thread_id=cfg.thread_id,
                    backend=type(cfg.checkpointer).__name__,
                    resume_node=resume_node,
                )
        else:
            # Without checkpointer, get resume node from state
            state_data = inputs.update or {}
            resume_node = state_data.pop("__resume_node__", None)
            inputs = state_data
    elif isinstance(inputs, Command):
        inputs = inputs.update

    # Initialize state
    state: dict[str, Any] = dict(inputs or {})
    node_results: dict[str, NodeResult] = {}
    execution_order: list[str] = []
    iterations = 0

    # Determine starting node(s)
    if resume_node:
        current_nodes = [resume_node]
    else:
        current_nodes = self._adjacency.get(START, [])
        if not current_nodes and self._entry_point:
            current_nodes = [self._entry_point]

    # Main execution loop
    while current_nodes and iterations < cfg.max_iterations:
        iterations += 1
        next_nodes: list[str] = []

        # Check for interrupt_before
        for node_id in current_nodes:
            if node_id in cfg.interrupt_before:
                # Create a placeholder interrupt value for interrupt_before
                from locus.core.interrupt import InterruptValue

                placeholder_interrupt = InterruptValue(
                    payload={"type": "interrupt_before", "node": node_id},
                    node_id=node_id,
                    graph_id=self.id,
                )
                interrupt_state = InterruptState(
                    interrupt=placeholder_interrupt,
                    node_id=node_id,
                    pending_nodes=current_nodes,
                    state_snapshot=state,
                )
                # Include resume node in final state
                final_state_with_resume = {**state, "__resume_node__": node_id}
                return GraphResult(
                    graph_id=self.id,
                    success=False,
                    node_results=node_results,
                    final_state=final_state_with_resume,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

        # Lazy import — observability is opt-in.
        from locus.observability.emit import (  # noqa: PLC0415
            EV_GRAPH_NODE_COMPLETED,
            EV_GRAPH_NODE_STARTED,
            emit,
        )

        # Execute current nodes (parallel if enabled)
        if cfg.parallel and len(current_nodes) > 1:
            tasks = []
            node_span_ids: dict[str, str] = {}
            for node_id in current_nodes:
                if node_id == END:
                    continue
                node = self.nodes[node_id]
                node_inputs = self._gather_inputs(node_id, state)
                is_resume = node_id == resume_node
                span_id = uuid4().hex[:8]
                node_span_ids[node_id] = span_id
                await emit(
                    EV_GRAPH_NODE_STARTED,
                    graph_id=self.id,
                    node_id=node_id,
                    iteration=iterations,
                    span_id=span_id,
                    parallel=True,
                    is_resuming=is_resume,
                )
                tasks.append(
                    node.execute(
                        node_inputs,
                        resume_value=resume_value if is_resume else None,
                        is_resuming=is_resume,
                    )
                )

            if tasks:
                parallel_started = time.perf_counter()
                results = await asyncio.gather(*tasks)
                for node_id, result in zip(
                    [n for n in current_nodes if n != END],
                    results,
                    strict=True,
                ):
                    node_results[node_id] = result
                    execution_order.append(node_id)
                    await emit(
                        EV_GRAPH_NODE_COMPLETED,
                        graph_id=self.id,
                        node_id=node_id,
                        span_id=node_span_ids.get(node_id),
                        status=str(result.status),
                        duration_ms=(time.perf_counter() - parallel_started) * 1000,
                        parallel=True,
                    )
                    await _emit_node_events(
                        _event_sink, node_id, result, state, cfg.stream_mode
                    )
        else:
            # Sequential execution
            for node_id in current_nodes:
                if node_id == END:
                    continue

                node = self.nodes[node_id]
                node_inputs = self._gather_inputs(node_id, state)
                is_resume = node_id == resume_node
                span_id = uuid4().hex[:8]
                started_at = time.perf_counter()
                await emit(
                    EV_GRAPH_NODE_STARTED,
                    graph_id=self.id,
                    node_id=node_id,
                    iteration=iterations,
                    span_id=span_id,
                    parallel=False,
                    is_resuming=is_resume,
                )
                result = await node.execute(
                    node_inputs,
                    resume_value=resume_value if is_resume else None,
                    is_resuming=is_resume,
                )
                node_results[node_id] = result
                execution_order.append(node_id)
                await emit(
                    EV_GRAPH_NODE_COMPLETED,
                    graph_id=self.id,
                    node_id=node_id,
                    span_id=span_id,
                    status=str(result.status),
                    duration_ms=(time.perf_counter() - started_at) * 1000,
                    parallel=False,
                )
                await _emit_node_events(_event_sink, node_id, result, state, cfg.stream_mode)

        # Clear resume context after first node
        resume_node = None
        resume_value = None

        # Process results and determine next nodes
        for node_id in [n for n in current_nodes if n != END]:
            # Use a different name from the earlier ``result`` so mypy
            # doesn't widen the previously narrowed type.
            node_result = node_results.get(node_id)
            if not node_result:
                continue
            result = node_result

            # Handle interrupt
            if result.status == NodeStatus.INTERRUPTED:
                interrupt_state = InterruptState(
                    interrupt=result.output,
                    node_id=node_id,
                    pending_nodes=[n for n in current_nodes if n != node_id],
                    state_snapshot=state,
                )

                # Save to checkpointer if available
                if cfg.checkpointer and cfg.thread_id:
                    await cfg.checkpointer.save(
                        state=None,
                        thread_id=cfg.thread_id,
                        metadata={
                            "graph_state": state,
                            "interrupted_node": node_id,
                            "interrupt": interrupt_state.model_dump(),
                        },
                    )
                    from locus.observability.emit import (  # noqa: PLC0415
                        EV_CHECKPOINT_SAVED,
                        emit,
                    )

                    await emit(
                        EV_CHECKPOINT_SAVED,
                        thread_id=cfg.thread_id,
                        backend=type(cfg.checkpointer).__name__,
                        trigger="graph_interrupt",
                        interrupted_node=node_id,
                    )

                # Store resume node in state for checkpointer-less resumption
                final_state_with_resume = {**state, "__resume_node__": node_id}

                return GraphResult(
                    graph_id=self.id,
                    success=False,
                    node_results=node_results,
                    final_state=final_state_with_resume,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

            # Handle successful execution
            if result.success:
                # Apply state updates
                update, command = normalize_node_output(result.output)
                if update:
                    state = self._apply_state_update(state, update)
                    # Store raw output under namespaced key to avoid conflicts
                    state[f"_node_{node_id}"] = result.output

                # Handle Send (map-reduce)
                if result.sends:
                    send_results = await self._execute_sends(result.sends, state)
                    for sr in send_results:
                        if sr.success:
                            state[sr.send_id] = sr.result

                # Determine next nodes
                node_next = self._get_next_nodes(node_id, state, command)
                next_nodes.extend(node_next)

            # Check for interrupt_after
            if node_id in cfg.interrupt_after:
                interrupt_state = InterruptState(
                    interrupt=None,  # type: ignore
                    node_id=node_id,
                    pending_nodes=next_nodes,
                    state_snapshot=state,
                )
                return GraphResult(
                    graph_id=self.id,
                    success=True,
                    node_results=node_results,
                    final_state=state,
                    execution_order=execution_order,
                    duration_ms=(datetime.now(UTC) - start_time).total_seconds() * 1000,
                    interrupt=interrupt_state,
                    iterations=iterations,
                )

        # Check if we've reached END
        if END in current_nodes or END in next_nodes:
            break

        # Move to next nodes (deduplicate)
        current_nodes = list(dict.fromkeys(next_nodes))

    # Calculate duration
    end_time = datetime.now(UTC)
    duration_ms = (end_time - start_time).total_seconds() * 1000

    # Determine final outputs (from last executed nodes)
    final_outputs = {}
    for node_id in reversed(execution_order):
        if node_id in node_results and node_results[node_id].success:
            final_outputs[node_id] = node_results[node_id].output

    # Check success
    success = all(
        r.status in (NodeStatus.COMPLETED, NodeStatus.SKIPPED) for r in node_results.values()
    )

    return GraphResult(
        graph_id=self.id,
        success=success,
        node_results=node_results,
        final_state=state,
        final_outputs=final_outputs,
        execution_order=execution_order,
        duration_ms=duration_ms,
        iterations=iterations,
    )

stream async

stream(inputs: dict[str, Any] | Command | None = None, *, config: GraphConfig | None = None, mode: StreamMode | None = None) -> AsyncIterator[StreamEvent]

Stream graph execution events as nodes complete.

Drives :meth:execute on a background task with an async-queue sink wired in, then yields each node-completion event in real time (instead of collecting them at the end). The final state arrives as the last yielded event in VALUES mode.

Parameters:

Name Type Description Default
inputs dict[str, Any] | Command | None

Initial state or Command (for resume).

None
config GraphConfig | None

Execution configuration.

None
mode StreamMode | None

Stream mode override (VALUES / UPDATES / NODES / DEBUG / CUSTOM). Defaults to config.stream_mode.

None

Yields:

Type Description
AsyncIterator[StreamEvent]

StreamEvent per node, then a terminal VALUES event with

AsyncIterator[StreamEvent]

the final state when mode == VALUES. Re-raises any

AsyncIterator[StreamEvent]

exception the underlying execute raised so callers can react.

Source code in src/locus/multiagent/graph.py
async def stream(
    self,
    inputs: dict[str, Any] | Command | None = None,
    *,
    config: GraphConfig | None = None,
    mode: StreamMode | None = None,
) -> AsyncIterator[StreamEvent]:
    """Stream graph execution events as nodes complete.

    Drives :meth:`execute` on a background task with an async-queue
    sink wired in, then yields each node-completion event in real time
    (instead of collecting them at the end). The final state arrives as
    the last yielded event in ``VALUES`` mode.

    Args:
        inputs: Initial state or Command (for resume).
        config: Execution configuration.
        mode: Stream mode override (``VALUES`` / ``UPDATES`` / ``NODES``
            / ``DEBUG`` / ``CUSTOM``). Defaults to ``config.stream_mode``.

    Yields:
        ``StreamEvent`` per node, then a terminal ``VALUES`` event with
        the final state when ``mode == VALUES``. Re-raises any
        exception the underlying execute raised so callers can react.
    """
    cfg = config or self.config
    # The mode argument is per-call; reflect it in cfg so the sink in
    # execute() emits the right events. We don't mutate the caller's
    # cfg — clone it.
    stream_mode = mode or cfg.stream_mode
    if mode is not None and mode != cfg.stream_mode:
        cfg = cfg.model_copy(update={"stream_mode": mode})

    queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
    sentinel = None  # marks normal completion in the queue

    async def _sink(event: StreamEvent) -> None:
        await queue.put(event)

    async def _drive() -> GraphResult:
        # Make the sink visible to ``emit_custom`` calls inside node
        # bodies via the module-level ContextVar. Reset on exit so we
        # don't leak the sink into unrelated tasks.
        token = _active_stream_sink.set(_sink)
        try:
            return await self.execute(inputs, config=cfg, _event_sink=_sink)
        finally:
            _active_stream_sink.reset(token)
            # Signal end-of-stream regardless of success/error so the
            # consumer never deadlocks.
            await queue.put(sentinel)

    task = asyncio.create_task(_drive())

    consumer_broke_early = False
    try:
        while True:
            event = await queue.get()
            if event is sentinel:
                break
            yield event
    except (GeneratorExit, asyncio.CancelledError):
        consumer_broke_early = True
        raise
    finally:
        # If the consumer broke out early, cancel the driver so the
        # background task doesn't leak.
        if consumer_broke_early and not task.done():
            task.cancel()

    # task is done (or was cancelled). Surface any exception execute
    # raised; otherwise emit the terminal final-state event in
    # VALUES mode.
    result = task.result()
    if stream_mode == StreamMode.VALUES:
        yield StreamEvent(mode=stream_mode, data=result.final_state)

ainvoke async

ainvoke(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> dict[str, Any]

LangChain/LangGraph-compatible alias for execute() returning final_state.

Source code in src/locus/multiagent/graph.py
async def ainvoke(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> dict[str, Any]:
    """LangChain/LangGraph-compatible alias for execute() returning final_state."""
    result = await self.execute(inputs or {}, config=config)
    return result.final_state

astream async

astream(inputs: dict[str, Any] | None = None, config: GraphConfig | None = None, **kwargs: Any) -> AsyncIterator[StreamEvent]

LangChain/LangGraph-compatible alias for stream().

Source code in src/locus/multiagent/graph.py
async def astream(
    self,
    inputs: dict[str, Any] | None = None,
    config: GraphConfig | None = None,
    **kwargs: Any,
) -> AsyncIterator[StreamEvent]:
    """LangChain/LangGraph-compatible alias for stream()."""
    async for event in self.stream(inputs, config=config):
        yield event

get_graph

get_graph() -> StateGraph

LangGraph-compatible alias — returns self (graph exposes .nodes dict).

Source code in src/locus/multiagent/graph.py
def get_graph(self) -> StateGraph:
    """LangGraph-compatible alias — returns self (graph exposes .nodes dict)."""
    return self

aget_state async

aget_state(config: Any = None) -> None

LangGraph-compatible stub — locus uses checkpointer.load directly.

Source code in src/locus/multiagent/graph.py
async def aget_state(self, config: Any = None) -> None:
    """LangGraph-compatible stub — locus uses checkpointer.load directly."""
    return

compile

compile(*, checkpointer: Any | None = None, interrupt_before: list[str] | None = None, interrupt_after: list[str] | None = None, store: Any | None = None) -> StateGraph

Compile the graph with configuration.

Parameters:

Name Type Description Default
checkpointer Any | None

Checkpointer for state persistence

None
interrupt_before list[str] | None

Nodes to pause before

None
interrupt_after list[str] | None

Nodes to pause after

None
store Any | None

Store for cross-thread memory

None

Returns:

Type Description
StateGraph

Configured graph (self)

Source code in src/locus/multiagent/graph.py
def compile(
    self,
    *,
    checkpointer: Any | None = None,
    interrupt_before: list[str] | None = None,
    interrupt_after: list[str] | None = None,
    store: Any | None = None,
) -> StateGraph:
    """
    Compile the graph with configuration.

    Args:
        checkpointer: Checkpointer for state persistence
        interrupt_before: Nodes to pause before
        interrupt_after: Nodes to pause after
        store: Store for cross-thread memory

    Returns:
        Configured graph (self)
    """
    if checkpointer:
        self.config.checkpointer = checkpointer
    if interrupt_before:
        self.config.interrupt_before = interrupt_before
    if interrupt_after:
        self.config.interrupt_after = interrupt_after
    if store:
        self.config.store = store
    return self

GraphConfig

Bases: BaseModel

Configuration for graph execution.

Functional API

task

task(fn: Callable | None = None, *, name: str | None = None, retry_attempts: int = 1, cache: bool = False) -> Any

Decorator that marks a function as a parallelizable task.

Tasks are tracked within an entrypoint for monitoring and can be configured with retry and caching.

Parameters:

Name Type Description Default
fn Callable | None

The function to decorate.

None
name str | None

Task name (defaults to function name).

None
retry_attempts int

Number of retry attempts on failure.

1
cache bool

If True, cache results for identical arguments.

False
Example

@task async def fetch(url: str) -> dict: return await httpx.get(url).json()

@task(retry_attempts=3) async def unreliable_api(query: str) -> str: return await call_api(query)

Source code in src/locus/multiagent/functional.py
def task(
    fn: Callable | None = None,
    *,
    name: str | None = None,
    retry_attempts: int = 1,
    cache: bool = False,
) -> Any:
    """Decorator that marks a function as a parallelizable task.

    Tasks are tracked within an entrypoint for monitoring and can be
    configured with retry and caching.

    Args:
        fn: The function to decorate.
        name: Task name (defaults to function name).
        retry_attempts: Number of retry attempts on failure.
        cache: If True, cache results for identical arguments.

    Example:
        @task
        async def fetch(url: str) -> dict:
            return await httpx.get(url).json()

        @task(retry_attempts=3)
        async def unreliable_api(query: str) -> str:
            return await call_api(query)
    """

    def decorator(func: Callable) -> Callable:
        task_name = name or func.__name__
        _cache: dict[str, Any] = {}

        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            start = time.perf_counter()

            # Check cache
            if cache:
                cache_key = f"{args}:{kwargs}"
                if cache_key in _cache:
                    return _cache[cache_key]

            last_error = None
            for attempt in range(retry_attempts):
                try:
                    if asyncio.iscoroutinefunction(func):
                        result = await func(*args, **kwargs)
                    else:
                        result = func(*args, **kwargs)

                    duration = (time.perf_counter() - start) * 1000
                    _current_tasks.append(
                        TaskResult(
                            value=result,
                            duration_ms=duration,
                            task_name=task_name,
                        )
                    )

                    if cache:
                        _cache[cache_key] = result

                    return result

                except Exception as e:  # noqa: BLE001 — retry-any-failure semantics for user task bodies
                    last_error = e
                    if attempt < retry_attempts - 1:
                        await asyncio.sleep(0.1 * (attempt + 1))

            duration = (time.perf_counter() - start) * 1000
            _current_tasks.append(
                TaskResult(
                    duration_ms=duration,
                    task_name=task_name,
                    error=str(last_error),
                )
            )
            raise last_error  # type: ignore[misc]

        wrapper._is_task = True  # type: ignore[attr-defined]  # noqa: SLF001
        wrapper._task_name = task_name  # type: ignore[attr-defined]  # noqa: SLF001
        return wrapper

    if fn is not None:
        return decorator(fn)
    return decorator

entrypoint

entrypoint(fn: Callable | None = None, *, name: str | None = None) -> Any

Decorator that marks a function as a workflow entrypoint.

The entrypoint is the top-level function that orchestrates tasks. It tracks all task executions and returns an EntrypointResult.

Parameters:

Name Type Description Default
fn Callable | None

The function to decorate.

None
name str | None

Entrypoint name (defaults to function name).

None
Example

@entrypoint async def my_workflow(input_data: str) -> str: step1 = await fetch(input_data) step2 = await process(step1) return step2

result = await my_workflow("hello")

result is the raw return value

Access metadata via my_workflow.last_result

Source code in src/locus/multiagent/functional.py
def entrypoint(
    fn: Callable | None = None,
    *,
    name: str | None = None,
) -> Any:
    """Decorator that marks a function as a workflow entrypoint.

    The entrypoint is the top-level function that orchestrates tasks.
    It tracks all task executions and returns an EntrypointResult.

    Args:
        fn: The function to decorate.
        name: Entrypoint name (defaults to function name).

    Example:
        @entrypoint
        async def my_workflow(input_data: str) -> str:
            step1 = await fetch(input_data)
            step2 = await process(step1)
            return step2

        result = await my_workflow("hello")
        # result is the raw return value
        # Access metadata via my_workflow.last_result
    """

    def decorator(func: Callable) -> Callable:
        ep_name = name or func.__name__
        last_result: list[EntrypointResult] = [None]  # type: ignore[list-item]

        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            global _current_tasks
            _current_tasks = []

            start = time.perf_counter()

            try:
                if asyncio.iscoroutinefunction(func):
                    result = await func(*args, **kwargs)
                else:
                    result = func(*args, **kwargs)

                duration = (time.perf_counter() - start) * 1000
                last_result[0] = EntrypointResult(
                    value=result,
                    tasks=list(_current_tasks),
                    duration_ms=duration,
                )
                return result

            except Exception as e:
                duration = (time.perf_counter() - start) * 1000
                last_result[0] = EntrypointResult(
                    tasks=list(_current_tasks),
                    duration_ms=duration,
                    error=str(e),
                )
                raise

        wrapper.last_result = property(lambda self: last_result[0])  # type: ignore[attr-defined]  # noqa: ARG005
        wrapper._last_result = last_result  # type: ignore[attr-defined]  # noqa: SLF001
        wrapper._is_entrypoint = True  # type: ignore[attr-defined]  # noqa: SLF001
        wrapper._entrypoint_name = ep_name  # type: ignore[attr-defined]  # noqa: SLF001

        def get_result() -> EntrypointResult | None:
            return last_result[0]

        wrapper.get_result = get_result  # type: ignore[attr-defined]
        return wrapper

    if fn is not None:
        return decorator(fn)
    return decorator

A2A protocol

A2AServer

A2AServer(agent: Any, name: str = 'Locus Agent', description: str = '', skills: list[AgentSkill] | list[str] | None = None, url: str = '', provider: AgentProvider | None = None, version: str = '0.1.0', api_key: str | None = None, allow_unauthenticated: bool = False)

Expose a Locus Agent as a spec-compliant A2A endpoint.

Parameters:

Name Type Description Default
agent Any

A Locus Agent (or anything with run(prompt) -> AsyncIterator).

required
name str

Display name for the Agent Card.

'Locus Agent'
description str

One-line description for the Agent Card.

''
skills list[AgentSkill] | list[str] | None

List of :class:AgentSkill (preferred) or plain strings (legacy — auto-promoted to skills with id == name).

None
url str

Public URL the agent is reachable at — set this for cross-process / cross-host deployments so the card's url field is correct. Defaults to a placeholder.

''
provider AgentProvider | None

Optional :class:AgentProvider (e.g. Oracle).

None
version str

Agent semver — useful for capability negotiation.

'0.1.0'
api_key str | None

Bearer token required on every route; if None, falls back to LOCUS_A2A_API_KEY.

None
allow_unauthenticated bool

Bind to non-loopback without a key. Use only behind an upstream proxy that terminates auth.

False

Example::

from locus import Agent
from locus.a2a import A2AServer
from locus.a2a.spec import AgentSkill

server = A2AServer(
    agent=my_agent,
    name="Research Agent",
    description="Open-web research with citations.",
    skills=[
        AgentSkill(
            id="research",
            name="Research",
            description="Answer with cited sources.",
            tags=["search", "summarise"],
        ),
    ],
    url="https://research.example.com",
    api_key="secret",
)
server.run(port=8001)
Source code in src/locus/a2a/protocol.py
def __init__(
    self,
    agent: Any,
    name: str = "Locus Agent",
    description: str = "",
    skills: list[AgentSkill] | list[str] | None = None,
    url: str = "",
    provider: AgentProvider | None = None,
    version: str = "0.1.0",
    api_key: str | None = None,
    allow_unauthenticated: bool = False,
) -> None:
    self._agent = agent
    self._name = name
    self._description = description or f"A2A-compatible {name}"
    self._skills = self._normalise_skills(skills)
    self._url = url
    self._provider = provider
    self._version = version
    self._api_key = api_key or os.environ.get("LOCUS_A2A_API_KEY") or None
    self._allow_unauthenticated = allow_unauthenticated
    self._app: Any = None
    self._store = _TaskStore()

run

run(host: str = '127.0.0.1', port: int = 8001, **kwargs: Any) -> None

Run the A2A server.

Defaults to loopback binding. Non-loopback bindings require either api_key to be set or allow_unauthenticated=True.

Source code in src/locus/a2a/protocol.py
def run(self, host: str = "127.0.0.1", port: int = 8001, **kwargs: Any) -> None:
    """Run the A2A server.

    Defaults to loopback binding. Non-loopback bindings require
    either ``api_key`` to be set or ``allow_unauthenticated=True``.
    """
    if self._api_key is None and not self._allow_unauthenticated and not _is_loopback(host):
        msg = (
            f"Refusing to bind A2AServer to {host!r} without an API "
            "key. Set LOCUS_A2A_API_KEY, pass api_key=... to "
            "A2AServer, or pass allow_unauthenticated=True if an "
            "upstream proxy terminates auth."
        )
        raise RuntimeError(msg)

    try:
        import uvicorn
    except ImportError as e:
        msg = "uvicorn required. Install with: pip install uvicorn"
        raise ImportError(msg) from e
    uvicorn.run(self.app, host=host, port=port, **kwargs)

A2AClient

A2AClient(url: str, api_key: str | None = None)

Call a remote A2A agent from Locus.

Spec-compliant methods:

  • :meth:get_agent_card — fetches /.well-known/agent-card.json, falling back to the legacy /agent-card endpoint.
  • :meth:send_message — JSON-RPC message/send; returns a :class:Task you can poll with :meth:get_task.
  • :meth:send_message_streaming — JSON-RPC message/stream; yields events from the SSE stream.
  • :meth:get_task, :meth:cancel_task — task lifecycle.

Plus the legacy convenience APIs preserved from the pre-spec implementation:

  • :meth:invoke — flat string-in / string-out over /a2a/invoke.
  • :meth:as_tool — wrap a remote agent as a Locus @tool.
Source code in src/locus/a2a/protocol.py
def __init__(self, url: str, api_key: str | None = None) -> None:
    self._url = url.rstrip("/")
    self._api_key = api_key

get_agent_card async

get_agent_card() -> AgentCard

Fetch the remote agent's capability card.

Tries the spec well-known URL first, falls back to the legacy /agent-card endpoint for older peers.

Source code in src/locus/a2a/protocol.py
async def get_agent_card(self) -> AgentCard:
    """Fetch the remote agent's capability card.

    Tries the spec well-known URL first, falls back to the legacy
    ``/agent-card`` endpoint for older peers.
    """
    import httpx

    async with httpx.AsyncClient() as client:
        for path in ("/.well-known/agent-card.json", "/agent-card"):
            try:
                resp = await client.get(f"{self._url}{path}", headers=self._auth_headers())
                if resp.status_code == 200:
                    data = resp.json()
                    # Legacy peers serve flat string skills — promote.
                    if data.get("skills") and isinstance(data["skills"][0], str):
                        data["skills"] = [
                            {"id": s, "name": s, "description": s} for s in data["skills"]
                        ]
                    # Legacy peers also omit url/capabilities.
                    data.setdefault("url", self._url)
                    return AgentCard.model_validate(data)
            except httpx.HTTPError:
                continue
    msg = f"Could not fetch Agent Card from {self._url}"
    raise RuntimeError(msg)

send_message async

send_message(message: Message) -> Task

Send a message via JSON-RPC message/send and return the Task.

Source code in src/locus/a2a/protocol.py
async def send_message(self, message: Message) -> Task:
    """Send a message via JSON-RPC ``message/send`` and return the Task."""
    result = await self._rpc("message/send", {"message": message.model_dump(exclude_none=True)})
    return Task.model_validate(result)

send_message_streaming async

send_message_streaming(message: Message) -> AsyncIterator[dict[str, Any]]

Send a message via JSON-RPC message/stream and yield events.

Source code in src/locus/a2a/protocol.py
async def send_message_streaming(self, message: Message) -> AsyncIterator[dict[str, Any]]:
    """Send a message via JSON-RPC ``message/stream`` and yield events."""
    import httpx

    body = {
        "jsonrpc": "2.0",
        "id": uuid.uuid4().hex,
        "method": "message/stream",
        "params": {"message": message.model_dump(exclude_none=True)},
    }
    headers = self._auth_headers() | {"Accept": "text/event-stream"}
    async with (
        httpx.AsyncClient(timeout=120.0) as client,
        client.stream("POST", f"{self._url}/", json=body, headers=headers) as resp,
    ):
        resp.raise_for_status()
        async for raw in resp.aiter_lines():
            if not raw or not raw.startswith("data: "):
                continue
            payload = raw[len("data: ") :]
            if payload.strip() == "[DONE]":
                break
            try:
                env = json.loads(payload)
            except json.JSONDecodeError:
                continue
            if "error" in env:
                yield env  # surface error envelope to caller
                break
            yield env.get("result", env)

get_task async

get_task(task_id: str, history_length: int | None = None) -> Task

Fetch a task by id (JSON-RPC tasks/get).

Source code in src/locus/a2a/protocol.py
async def get_task(self, task_id: str, history_length: int | None = None) -> Task:
    """Fetch a task by id (JSON-RPC ``tasks/get``)."""
    params: dict[str, Any] = {"id": task_id}
    if history_length is not None:
        params["historyLength"] = history_length
    result = await self._rpc("tasks/get", params)
    return Task.model_validate(result)

cancel_task async

cancel_task(task_id: str) -> Task

Cancel a task (JSON-RPC tasks/cancel).

Source code in src/locus/a2a/protocol.py
async def cancel_task(self, task_id: str) -> Task:
    """Cancel a task (JSON-RPC ``tasks/cancel``)."""
    result = await self._rpc("tasks/cancel", {"id": task_id})
    return Task.model_validate(result)

invoke async

invoke(prompt: str) -> str

Send a flat text prompt over the legacy /a2a/invoke.

Useful when you control both ends of the wire and want a one-line round-trip; spec-compliant peers should prefer :meth:send_message so they can read the full :class:Task.

Source code in src/locus/a2a/protocol.py
async def invoke(self, prompt: str) -> str:
    """Send a flat text prompt over the legacy ``/a2a/invoke``.

    Useful when you control both ends of the wire and want a one-line
    round-trip; spec-compliant peers should prefer
    :meth:`send_message` so they can read the full :class:`Task`.
    """
    import httpx

    request = A2ARequest(messages=[A2AMessage(role="user", content=prompt)])
    async with httpx.AsyncClient(timeout=120.0) as client:
        resp = await client.post(
            f"{self._url}/a2a/invoke",
            json=request.model_dump(),
            headers=self._auth_headers(),
        )
        resp.raise_for_status()
        response = A2AResponse.model_validate(resp.json())
    agent_msgs = [m for m in response.messages if m.role == "agent"]
    return agent_msgs[-1].content if agent_msgs else ""

as_tool

as_tool(name: str | None = None, description: str | None = None) -> Any

Wrap this remote agent as a Locus @tool.

Source code in src/locus/a2a/protocol.py
def as_tool(self, name: str | None = None, description: str | None = None) -> Any:
    """Wrap this remote agent as a Locus ``@tool``."""
    from locus.tools.decorator import tool as tool_decorator

    client = self
    tool_name = name or "remote_agent"
    tool_desc = description or "Call a remote A2A agent"

    @tool_decorator(name=tool_name, description=tool_desc)
    def call_remote(prompt: str) -> str:
        """Send a request to a remote agent."""
        import asyncio

        return asyncio.run(client.invoke(prompt))

    return call_remote