A2A Protocol¶
A2A (Agent-to-Agent) is the public cross-framework protocol at
a2aproject.github.io/A2A. Locus
implements both sides; this notebook spins up a real Agent behind
A2AServer, drives every spec endpoint from A2AClient, and inspects
the typed task lifecycle.
This notebook covers:
- Agent Card at
/.well-known/agent-card.jsonwith typedAgentSkillentries — enough for any A2A client to discover and call the agent. - JSON-RPC 2.0 endpoints:
message/send,tasks/get,tasks/cancel, andmessage/stream(SSE lifecycle events). TaskNotCancelable(-32002) surfaced as aRuntimeErrorwhen you try to cancel a terminal task.A2AClient.invoke— backwards-compatible flat shape for non-spec peers.A2AClient.as_tool(...)— wrap a remote agent as a Locus@toolso a local agent can delegate to it.
Prerequisites¶
pip install fastapi uvicornfor the server side.- Notebook 08 (Agent basics). The wire format is provider-agnostic.
Run¶
The default provider is OCI Generative AI. With ~/.oci/config
present the agent talks to a live OCI model; canonical picks are
openai.gpt-4.1 or meta.llama-3.3-70b-instruct. Set
LOCUS_MODEL_PROVIDER=mock for offline runs.
The notebook starts an in-process uvicorn server and drives a client
against it; expect a few seconds of warm-up before the first
message/send returns.
Source¶
# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 29: A2A — publishing a Locus Agent as a spec-compliant peer.
A2A (Agent-to-Agent) is the public cross-framework protocol at
https://a2aproject.github.io/A2A/. Locus implements the server and
client sides; this notebook spins up a real Agent behind ``A2AServer``,
drives every spec endpoint from ``A2AClient``, and inspects the typed
task lifecycle.
- Agent Card published at ``/.well-known/agent-card.json`` carries
capabilities, typed ``AgentSkill`` entries, and provider metadata —
enough for any A2A client to discover and call the agent.
- JSON-RPC 2.0 endpoints: ``message/send`` (synchronous round-trip),
``tasks/get`` (poll by id), ``tasks/cancel`` (terminal-state errors
surface as the spec's ``TaskNotCancelable`` code), and
``message/stream`` (SSE lifecycle events).
- ``A2AClient.invoke`` is the backwards-compatible flat shape for peers
that haven't picked up the spec yet.
- ``A2AClient.as_tool(...)`` wraps a remote agent as a Locus ``@tool``
so a local agent can delegate to it transparently.
Run it:
.venv/bin/python examples/notebook_34_a2a_protocol.py
The default provider is OCI Generative AI. With ``~/.oci/config``
present the agent talks to a live OCI model (canonical pick:
``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``). Set
``LOCUS_MODEL_PROVIDER=mock`` for offline runs.
Prerequisites:
- ``pip install fastapi uvicorn`` for the server side.
- Notebook 08 (Agent basics). The wire format is provider-agnostic.
"""
from __future__ import annotations
import asyncio
import socket
import threading
import time
from config import get_model
from locus.a2a import (
A2AClient,
A2AServer,
AgentSkill,
Message,
TaskState,
TextPart,
)
from locus.agent import Agent, AgentConfig
def _free_port() -> int:
# Bind-and-release to grab an unused port. Small TOCTOU window, fine
# for a notebook.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 0))
port = int(s.getsockname()[1])
s.close()
return port
def _start_server(server: A2AServer, port: int) -> threading.Thread:
"""Run uvicorn in a daemon thread so the client below can drive it."""
import uvicorn
config = uvicorn.Config(
app=server.app, host="127.0.0.1", port=port, log_level="warning", access_log=False
)
uv = uvicorn.Server(config)
thread = threading.Thread(target=uv.run, daemon=True, name="a2a-server")
thread.start()
deadline = time.monotonic() + 10.0
while time.monotonic() < deadline and not uv.started:
time.sleep(0.05)
if not uv.started:
msg = "uvicorn did not start within deadline"
raise RuntimeError(msg)
return thread
async def main() -> None:
print("=" * 60)
print("Notebook 29: A2A — Locus Agent as a spec-compliant peer")
print("=" * 60)
# ---------------------------------------------------------------
# Part 1: stand up a real agent behind A2A
# ---------------------------------------------------------------
print("\n=== Part 1: A2AServer with typed skills ===\n")
model = get_model()
research = Agent(
config=AgentConfig(
system_prompt=("You are a research assistant. Reply in one short sentence."),
max_iterations=2,
model=model,
)
)
port = _free_port()
# Demo bearer token, not a real credential.
api_key = "notebook-secret" # noqa: S105
server = A2AServer(
agent=research,
name="research",
description="Answers research questions with concise summaries.",
url=f"http://127.0.0.1:{port}",
skills=[
AgentSkill(
id="research",
name="Research",
description="Look up facts and summarise.",
tags=["search", "summarise"],
examples=["What is quantum computing?"],
),
],
api_key=api_key,
)
_start_server(server, port)
print(f" A2AServer listening on http://127.0.0.1:{port}")
base_url = f"http://127.0.0.1:{port}"
client = A2AClient(url=base_url, api_key=api_key)
# ---------------------------------------------------------------
# Part 2: discover via /.well-known/agent-card.json
# ---------------------------------------------------------------
print("\n=== Part 2: Agent Card discovery ===\n")
card = await client.get_agent_card()
print(f" name: {card.name}")
print(f" description: {card.description}")
print(f" url: {card.url}")
print(
f" capabilities: streaming={card.capabilities.streaming} "
f"push={card.capabilities.pushNotifications}"
)
for skill in card.skills:
print(f" skill: {skill.id} — {skill.name} (tags={skill.tags})")
# ---------------------------------------------------------------
# Part 3: message/send returns a typed Task
# ---------------------------------------------------------------
print("\n=== Part 3: message/send → Task ===\n")
task = await client.send_message(
Message(
role="user",
parts=[TextPart(text="What is quantum computing?")],
messageId="m-1",
)
)
print(f" task.id: {task.id}")
print(f" task.contextId: {task.contextId}")
print(f" task.status.state: {task.status.state.value}")
if task.artifacts:
first_part = task.artifacts[-1].parts[0]
text = getattr(first_part, "text", "")
print(f" reply artifact: {text[:120]}")
# ---------------------------------------------------------------
# Part 4: tasks/get — poll the task by id
# ---------------------------------------------------------------
print("\n=== Part 4: tasks/get ===\n")
refetched = await client.get_task(task.id)
print(
f" re-fetched task is in {refetched.status.state.value} state "
f"(== completed: {refetched.status.state == TaskState.completed})"
)
# ---------------------------------------------------------------
# Part 5: tasks/cancel on a terminal task surfaces TaskNotCancelable
# ---------------------------------------------------------------
print("\n=== Part 5: tasks/cancel on a terminal task ===\n")
try:
await client.cancel_task(task.id)
except RuntimeError as e:
print(f" spec error surfaced: {e}")
# ---------------------------------------------------------------
# Part 6: message/stream — SSE lifecycle events
# ---------------------------------------------------------------
print("\n=== Part 6: message/stream ===\n")
seen: list[str] = []
async for event in client.send_message_streaming(
Message(
role="user",
parts=[TextPart(text="Stream a one-sentence answer about LLMs.")],
messageId="m-2",
)
):
kind = event.get("kind") or "?"
seen.append(kind)
if kind == "task":
print(f" initial task envelope: id={event.get('id')}")
elif kind == "status-update":
state = event.get("status", {}).get("state")
print(f" status-update: state={state}")
elif kind == "artifact-update":
artifact = event.get("artifact", {})
parts = artifact.get("parts", [])
text = parts[0].get("text", "") if parts else ""
print(f" artifact-update: {text[:120]}")
print(f" total events: {len(seen)}")
# ---------------------------------------------------------------
# Part 7: backwards-compat flat invoke for non-spec peers
# ---------------------------------------------------------------
print("\n=== Part 7: legacy /a2a/invoke (backwards-compat) ===\n")
text = await client.invoke("Give me a one-line summary of A2A.")
print(f" flat reply: {text[:120]}")
# ---------------------------------------------------------------
# Part 8: wrap a remote agent as a local @tool
# ---------------------------------------------------------------
print("\n=== Part 8: A2AClient.as_tool ===\n")
tool = client.as_tool(name="ask_research", description="ask the research agent")
print(f" tool.name = {tool.name}, tool.description = {tool.description}")
# tool.fn calls asyncio.run() internally, so it can only be invoked
# from sync code. We're already inside an async main, so just
# inspect the tool object instead of calling it.
print("\n" + "=" * 60)
print("Next: Notebook 30 — DeepAgent")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())