Tutorial 36: Functional API — @entrypoint and @task Decorators¶
This tutorial covers:
- @task: define parallelizable units with retry and caching
- @entrypoint: orchestrate tasks with built-in tracking
- TaskResult and EntrypointResult for metadata
- Alternative to StateGraph for imperative workflows
Prerequisites:
- No model needed for this tutorial
Difficulty: Intermediate
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Tutorial 36: Functional API — @entrypoint and @task Decorators
This tutorial covers:
- @task: define parallelizable units with retry and caching
- @entrypoint: orchestrate tasks with automatic tracking
- TaskResult and EntrypointResult for metadata
- Alternative to StateGraph for imperative workflows
Prerequisites:
- No model needed for this tutorial
Difficulty: Intermediate
"""
import asyncio
import time
from config import get_model
from locus.agent import Agent
from locus.multiagent.functional import entrypoint, task
def _llm_call(
prompt: str, *, system: str = "Reply in one short sentence.", max_tokens: int = 80
) -> str:
"""Helper: real model call with timing/token banner — used by every Part."""
agent = Agent(model=get_model(max_tokens=max_tokens), system_prompt=system)
t0 = time.perf_counter()
res = agent.run_sync(prompt)
dt = time.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {res.metrics.prompt_tokens}→{res.metrics.completion_tokens} tokens]"
)
return res.message.strip()
# =============================================================================
# Part 1: Basic Pipeline
# =============================================================================
async def example_basic():
"""Simple task chain with automatic tracking."""
print("=== Part 1: Basic Pipeline ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when is the Locus functional API a better choice than StateGraph?')}"
)
@task
async def fetch(url: str) -> dict:
return {"data": f"fetched from {url}", "status": 200}
@task
async def process(data: dict) -> str:
return f"processed: {data['data']}"
@entrypoint
async def pipeline(url: str) -> str:
data = await fetch(url)
result = await process(data)
return result
result = await pipeline("https://api.example.com/data")
print(f"Result: {result}")
# Access metadata
ep = pipeline.get_result()
print(f"Tasks executed: {len(ep.tasks)}")
for t in ep.tasks:
print(f" {t.task_name}: {t.duration_ms:.1f}ms")
print(f"Total: {ep.duration_ms:.1f}ms")
# =============================================================================
# Part 2: Task with Retry
# =============================================================================
async def example_retry():
"""Tasks can retry on failure."""
print("\n=== Part 2: Task with Retry ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, why does @task(retry_attempts=3) belong on the task and not in caller code?')}"
)
attempt = 0
@task(retry_attempts=3)
async def unreliable_api(query: str) -> str:
nonlocal attempt
attempt += 1
if attempt < 3:
raise ConnectionError("API timeout")
return f"result for: {query}"
@entrypoint
async def retry_pipeline() -> str:
return await unreliable_api("test")
result = await retry_pipeline()
print(f"Result: {result}")
print(f"Attempts needed: {attempt}")
# =============================================================================
# Part 3: Task with Caching
# =============================================================================
async def example_cache():
"""Cache task results for identical arguments."""
print("\n=== Part 3: Task with Caching ===\n")
print(
f"AI rationale: {_llm_call('In one sentence, when should you turn @task(cache=True) ON for an LLM-heavy pipeline?')}"
)
call_count = 0
@task(cache=True)
async def expensive_compute(key: str) -> str:
nonlocal call_count
call_count += 1
return f"computed_{call_count}"
@entrypoint
async def cache_pipeline() -> tuple:
r1 = await expensive_compute("same_key")
r2 = await expensive_compute("same_key") # Cache hit!
r3 = await expensive_compute("diff_key") # Different key
return (r1, r2, r3)
r1, r2, r3 = await cache_pipeline()
print(f"r1={r1}, r2={r2}, r3={r3}")
print(f"Actual calls: {call_count}") # 2, not 3
async def example_with_llm():
"""A functional pipeline whose inner task delegates to a real Agent."""
print("\n=== Part 4: @task with real LLM ===\n")
@task
async def fetch_topic(seed: str) -> str:
return f"Tell me about {seed}."
@task
async def think(prompt: str) -> str:
import time as _t
agent = Agent(
model=get_model(max_tokens=80),
system_prompt="Answer in one factual sentence.",
)
t0 = _t.perf_counter()
result = agent.run_sync(prompt)
dt = _t.perf_counter() - t0
print(
f" [model call: {dt:.2f}s · {result.metrics.prompt_tokens}→{result.metrics.completion_tokens} tokens]"
)
return result.message.strip()
@entrypoint
async def pipeline(seed: str) -> str:
question = await fetch_topic(seed)
return await think(question)
answer = await pipeline("Oracle Cloud Infrastructure")
print(f"Answer: {answer}")
if __name__ == "__main__":
asyncio.run(example_basic())
asyncio.run(example_retry())
asyncio.run(example_cache())
asyncio.run(example_with_llm())