Functional API¶
Express a workflow as decorated async functions instead of a graph. If
StateGraph feels like overkill for a straight-line pipeline, the
functional API lets you write the same workflow as ordinary Python:
decorate the units of work with @task, decorate the orchestrator
with @entrypoint, and Locus tracks timing, retries, and caching
behind the scenes.
What you'll see:
@task— a unit of work; can declareretry_attemptsandcache.@entrypoint— the top-level coroutine; tracks every task it awaits.pipeline.get_result()returns anEntrypointResultwith per-task metadata.- Same execution semantics as
StateGraph, written imperatively.
Runs on the same OCI GenAI default as the rest of the notebooks:
LOCUS_MODEL_ID=openai.gpt-4.1 python examples/notebook_29_functional_api.py
# or, fully offline:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_29_functional_api.py
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Express a workflow as decorated async functions instead of a graph.
If `StateGraph` feels like overkill for a straight-line pipeline, the
functional API lets you write the same workflow as ordinary Python:
decorate the units of work with `@task`, decorate the orchestrator
with `@entrypoint`, and Locus tracks timing, retries, and caching for
you behind the scenes.
- @task — a unit of work; can declare retry_attempts and cache.
- @entrypoint — the top-level coroutine; tracks every task it awaits.
- pipeline.get_result() returns an EntrypointResult with per-task metadata.
- Same execution semantics as StateGraph, written imperatively.
Run it:
LOCUS_MODEL_PROVIDER=mock python examples/notebook_29_functional_api.py
The default provider is OCI GenAI — auto-detected from ~/.oci/config.
Set LOCUS_MODEL_PROVIDER=mock for offline runs. Pick an OCI model with
LOCUS_MODEL_ID=openai.gpt-4.1 (or meta.llama-3.3-70b-instruct, etc.).
"""
import asyncio
import time
from config import get_model
from locus.agent import Agent
from locus.multiagent.functional import entrypoint, task
def _llm_call(
prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
"""Run a one-shot Agent and print a timing/token banner. Used by every part."""
agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
t0 = time.perf_counter()
res = agent.run_sync(prompt)
dt = time.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {res.metrics.prompt_tokens}→{res.metrics.completion_tokens} tokens]"
)
return res.message.strip()
# =============================================================================
# Part 1: A two-task pipeline
# =============================================================================
async def example_basic():
"""Two @task functions wired together inside an @entrypoint."""
print("=== Part 1: A two-task pipeline ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is the Locus functional API a better choice than StateGraph?')}"
)
@task
async def fetch(url: str) -> dict:
return {"data": f"fetched from {url}", "status": 200}
@task
async def process(data: dict) -> str:
return f"processed: {data['data']}"
@entrypoint
async def pipeline(url: str) -> str:
data = await fetch(url)
result = await process(data)
return result
result = await pipeline("https://api.example.com/data")
print(f"Result: {result}")
ep = pipeline.get_result()
print(f"Tasks executed: {len(ep.tasks)}")
for t in ep.tasks:
print(f" {t.task_name}: {t.duration_ms:.1f}ms")
print(f"Total: {ep.duration_ms:.1f}ms")
# =============================================================================
# Part 2: @task(retry_attempts=3)
# =============================================================================
async def example_retry():
"""retry_attempts on the decorator handles transient failures."""
print("\n=== Part 2: @task(retry_attempts=3) ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why does @task(retry_attempts=3) belong on the task and not in caller code?')}"
)
attempt = 0
@task(retry_attempts=3)
async def unreliable_api(query: str) -> str:
nonlocal attempt
attempt += 1
if attempt < 3:
raise ConnectionError("API timeout")
return f"result for: {query}"
@entrypoint
async def retry_pipeline() -> str:
return await unreliable_api("test")
result = await retry_pipeline()
print(f"Result: {result}")
print(f"Attempts needed: {attempt}")
# =============================================================================
# Part 3: @task(cache=True)
# =============================================================================
async def example_cache():
"""Same arguments return the cached result without re-running the task."""
print("\n=== Part 3: @task(cache=True) ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when should you turn @task(cache=True) ON for an LLM-heavy pipeline?')}"
)
call_count = 0
@task(cache=True)
async def expensive_compute(key: str) -> str:
nonlocal call_count
call_count += 1
return f"computed_{call_count}"
@entrypoint
async def cache_pipeline() -> tuple:
r1 = await expensive_compute("same_key")
r2 = await expensive_compute("same_key") # cache hit
r3 = await expensive_compute("diff_key")
return (r1, r2, r3)
r1, r2, r3 = await cache_pipeline()
print(f"r1={r1}, r2={r2}, r3={r3}")
print(f"Actual calls: {call_count}") # 2, not 3
async def example_with_llm():
"""A @task can wrap a full Agent call just like a node would."""
print("\n=== Part 4: @task wrapping an LLM call ===\n")
@task
async def fetch_topic(seed: str) -> str:
return f"Tell me about {seed}."
@task
async def think(prompt: str) -> str:
import time as _t
agent = Agent(
model=get_model(max_tokens=80),
system_prompt="Answer in one factual sentence.",
)
t0 = _t.perf_counter()
result = agent.run_sync(prompt)
dt = _t.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {result.metrics.prompt_tokens}→{result.metrics.completion_tokens} tokens]"
)
return result.message.strip()
@entrypoint
async def pipeline(seed: str) -> str:
question = await fetch_topic(seed)
return await think(question)
answer = await pipeline("Oracle Cloud Infrastructure")
print(f"Answer: {answer}")
if __name__ == "__main__":
asyncio.run(example_basic())
asyncio.run(example_retry())
asyncio.run(example_cache())
asyncio.run(example_with_llm())