Functional API¶
The functional API is locus's "agent as a task" shape — @task and
@entrypoint decorators that bring agent runs into the regular
asyncio universe.
What it is¶
Two decorators:
| Decorator | What it does |
|---|---|
@task |
Wraps a coroutine that calls an Agent. Returns a Task you can await, gather, retry, time-out — anything asyncio gives you. |
@entrypoint |
Marks the top-level coroutine of a workflow. Adds .run / .run_sync so you can invoke the workflow synchronously from non-async code. |
These are not a new orchestration runtime. They're a thin shim
that lets agents participate in plain asyncio. The point is to
compose with asyncio.gather, asyncio.wait_for, asyncio.Queue,
or anything else you already use.
When to use it¶
- ✅ You think in
async defandasyncio.gatheralready. - ✅ The flow is map/reduce over agents (vet N vendors in parallel).
- ✅ You want to mix agents with non-agent code — DB writes, HTTP calls, file I/O — in the same coroutine.
- ✅ Tooling like
tenacityretries,asyncio.timeout, or aasyncio.Queuescheduler already gives you the orchestration you need.
When NOT to use it¶
- ❌ You want inspectable, named control-flow with cycles or conditional branches → use StateGraph.
- ❌ You need per-node retry / cache policies as data → StateGraph.
- ❌ Different agents should decide who runs → use Orchestrator.
Code¶
import asyncio
from locus.multiagent.functional import task, entrypoint
@task
async def vet_vendor(vendor: dict) -> dict:
"""Run the compliance agent against one vendor."""
return await compliance_agent.run(f"Vet {vendor['name']}.")
@entrypoint
async def vet_all(vendors: list[dict]) -> list[dict]:
"""Vet every vendor in parallel; gather the results."""
return await asyncio.gather(*[vet_vendor(v) for v in vendors])
scored = vet_all.run_sync(catalogue)
Map/reduce with retries and timeouts¶
Because tasks are plain coroutines, you compose with whatever the asyncio ecosystem provides:
from tenacity import retry, stop_after_attempt, wait_exponential
@task
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5))
async def vet_vendor(vendor: dict) -> dict:
return await compliance_agent.run(f"Vet {vendor['name']}.")
@entrypoint
async def vet_all_with_deadline(vendors: list[dict]) -> list[dict]:
async with asyncio.timeout(60): # 60s wall-clock cap
return await asyncio.gather(*[vet_vendor(v) for v in vendors])
Tasks calling tasks¶
Tasks compose. An @entrypoint workflow can call other @tasks
including parallel batches inside sequential phases:
@task
async def shortlist_vendors(catalogue: list[dict]) -> list[dict]:
return await procurement_agent.run(f"Shortlist 5 from {len(catalogue)}.")
@task
async def vet(vendor: dict) -> dict:
return await compliance_agent.run(f"Vet {vendor['name']}.")
@entrypoint
async def end_to_end(catalogue: list[dict]) -> dict:
shortlisted = await shortlist_vendors(catalogue) # phase 1
scored = await asyncio.gather(*[vet(v) for v in shortlisted]) # phase 2 (parallel)
final = await approval_agent.run(f"Approve from: {scored}") # phase 3
return final
Tutorials¶
tutorial_36_functional_api.py—@taskand@entrypointend-to-end.tutorial_42_map_reduce_code_review.py— same map/reduce shape, written as a graph withSendinstead. Useful as the "graph version" comparison when you're choosing between functional and StateGraph for a fan-out workload.
Source¶
multiagent/functional.py
— task, entrypoint, TaskResult, EntrypointResult.
See also¶
- Multi-agent overview — pick a shape.
- StateGraph — for the same fan-out as data with inspectable retry/cache policies.
- Composition — for the same shapes via
ParallelPipeline.