Skip to content

A2A Protocol

A2A (Agent-to-Agent) is the public cross-framework protocol at a2aproject.github.io/A2A. Locus implements both sides; this notebook spins up a real Agent behind A2AServer, drives every spec endpoint from A2AClient, and inspects the typed task lifecycle.

This notebook covers:

  • Agent Card at /.well-known/agent-card.json with typed AgentSkill entries — enough for any A2A client to discover and call the agent.
  • JSON-RPC 2.0 endpoints: message/send, tasks/get, tasks/cancel, and message/stream (SSE lifecycle events).
  • TaskNotCancelable (-32002) surfaced as a RuntimeError when you try to cancel a terminal task.
  • A2AClient.invoke — backwards-compatible flat shape for non-spec peers.
  • A2AClient.as_tool(...) — wrap a remote agent as a Locus @tool so a local agent can delegate to it.

Prerequisites

  • pip install fastapi uvicorn for the server side.
  • Notebook 08 (Agent basics). The wire format is provider-agnostic.

Run

python examples/notebook_34_a2a_protocol.py

The default provider is OCI Generative AI. With ~/.oci/config present the agent talks to a live OCI model; canonical picks are openai.gpt-4.1 or meta.llama-3.3-70b-instruct. Set LOCUS_MODEL_PROVIDER=mock for offline runs.

The notebook starts an in-process uvicorn server and drives a client against it; expect a few seconds of warm-up before the first message/send returns.

Source

# Copyright (c) 2025, 2026 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v1.0 as shown at
# https://oss.oracle.com/licenses/upl/
"""
Notebook 29: A2A — publishing a Locus Agent as a spec-compliant peer.

A2A (Agent-to-Agent) is the public cross-framework protocol at
https://a2aproject.github.io/A2A/. Locus implements the server and
client sides; this notebook spins up a real Agent behind ``A2AServer``,
drives every spec endpoint from ``A2AClient``, and inspects the typed
task lifecycle.

- Agent Card published at ``/.well-known/agent-card.json`` carries
  capabilities, typed ``AgentSkill`` entries, and provider metadata —
  enough for any A2A client to discover and call the agent.
- JSON-RPC 2.0 endpoints: ``message/send`` (synchronous round-trip),
  ``tasks/get`` (poll by id), ``tasks/cancel`` (terminal-state errors
  surface as the spec's ``TaskNotCancelable`` code), and
  ``message/stream`` (SSE lifecycle events).
- ``A2AClient.invoke`` is the backwards-compatible flat shape for peers
  that haven't picked up the spec yet.
- ``A2AClient.as_tool(...)`` wraps a remote agent as a Locus ``@tool``
  so a local agent can delegate to it transparently.

Run it:
    .venv/bin/python examples/notebook_34_a2a_protocol.py

The default provider is OCI Generative AI. With ``~/.oci/config``
present the agent talks to a live OCI model (canonical pick:
``openai.gpt-4.1`` or ``meta.llama-3.3-70b-instruct``). Set
``LOCUS_MODEL_PROVIDER=mock`` for offline runs.

Prerequisites:
- ``pip install fastapi uvicorn`` for the server side.
- Notebook 08 (Agent basics). The wire format is provider-agnostic.
"""

from __future__ import annotations

import asyncio
import socket
import threading
import time

from config import get_model

from locus.a2a import (
    A2AClient,
    A2AServer,
    AgentSkill,
    Message,
    TaskState,
    TextPart,
)
from locus.agent import Agent, AgentConfig


def _free_port() -> int:
    # Bind-and-release to grab an unused port. Small TOCTOU window, fine
    # for a notebook.
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(("127.0.0.1", 0))
    port = int(s.getsockname()[1])
    s.close()
    return port


def _start_server(server: A2AServer, port: int) -> threading.Thread:
    """Run uvicorn in a daemon thread so the client below can drive it."""
    import uvicorn

    config = uvicorn.Config(
        app=server.app, host="127.0.0.1", port=port, log_level="warning", access_log=False
    )
    uv = uvicorn.Server(config)
    thread = threading.Thread(target=uv.run, daemon=True, name="a2a-server")
    thread.start()
    deadline = time.monotonic() + 10.0
    while time.monotonic() < deadline and not uv.started:
        time.sleep(0.05)
    if not uv.started:
        msg = "uvicorn did not start within deadline"
        raise RuntimeError(msg)
    return thread


async def main() -> None:
    print("=" * 60)
    print("Notebook 29: A2A — Locus Agent as a spec-compliant peer")
    print("=" * 60)

    # ---------------------------------------------------------------
    # Part 1: stand up a real agent behind A2A
    # ---------------------------------------------------------------
    print("\n=== Part 1: A2AServer with typed skills ===\n")

    model = get_model()
    research = Agent(
        config=AgentConfig(
            system_prompt=("You are a research assistant. Reply in one short sentence."),
            max_iterations=2,
            model=model,
        )
    )

    port = _free_port()
    # Demo bearer token, not a real credential.
    api_key = "notebook-secret"  # noqa: S105

    server = A2AServer(
        agent=research,
        name="research",
        description="Answers research questions with concise summaries.",
        url=f"http://127.0.0.1:{port}",
        skills=[
            AgentSkill(
                id="research",
                name="Research",
                description="Look up facts and summarise.",
                tags=["search", "summarise"],
                examples=["What is quantum computing?"],
            ),
        ],
        api_key=api_key,
    )
    _start_server(server, port)
    print(f"  A2AServer listening on http://127.0.0.1:{port}")

    base_url = f"http://127.0.0.1:{port}"
    client = A2AClient(url=base_url, api_key=api_key)

    # ---------------------------------------------------------------
    # Part 2: discover via /.well-known/agent-card.json
    # ---------------------------------------------------------------
    print("\n=== Part 2: Agent Card discovery ===\n")
    card = await client.get_agent_card()
    print(f"  name:         {card.name}")
    print(f"  description:  {card.description}")
    print(f"  url:          {card.url}")
    print(
        f"  capabilities: streaming={card.capabilities.streaming} "
        f"push={card.capabilities.pushNotifications}"
    )
    for skill in card.skills:
        print(f"  skill:        {skill.id}{skill.name} (tags={skill.tags})")

    # ---------------------------------------------------------------
    # Part 3: message/send returns a typed Task
    # ---------------------------------------------------------------
    print("\n=== Part 3: message/send → Task ===\n")
    task = await client.send_message(
        Message(
            role="user",
            parts=[TextPart(text="What is quantum computing?")],
            messageId="m-1",
        )
    )
    print(f"  task.id:           {task.id}")
    print(f"  task.contextId:    {task.contextId}")
    print(f"  task.status.state: {task.status.state.value}")
    if task.artifacts:
        first_part = task.artifacts[-1].parts[0]
        text = getattr(first_part, "text", "")
        print(f"  reply artifact:    {text[:120]}")

    # ---------------------------------------------------------------
    # Part 4: tasks/get — poll the task by id
    # ---------------------------------------------------------------
    print("\n=== Part 4: tasks/get ===\n")
    refetched = await client.get_task(task.id)
    print(
        f"  re-fetched task is in {refetched.status.state.value} state "
        f"(== completed: {refetched.status.state == TaskState.completed})"
    )

    # ---------------------------------------------------------------
    # Part 5: tasks/cancel on a terminal task surfaces TaskNotCancelable
    # ---------------------------------------------------------------
    print("\n=== Part 5: tasks/cancel on a terminal task ===\n")
    try:
        await client.cancel_task(task.id)
    except RuntimeError as e:
        print(f"  spec error surfaced: {e}")

    # ---------------------------------------------------------------
    # Part 6: message/stream — SSE lifecycle events
    # ---------------------------------------------------------------
    print("\n=== Part 6: message/stream ===\n")
    seen: list[str] = []
    async for event in client.send_message_streaming(
        Message(
            role="user",
            parts=[TextPart(text="Stream a one-sentence answer about LLMs.")],
            messageId="m-2",
        )
    ):
        kind = event.get("kind") or "?"
        seen.append(kind)
        if kind == "task":
            print(f"  initial task envelope: id={event.get('id')}")
        elif kind == "status-update":
            state = event.get("status", {}).get("state")
            print(f"  status-update: state={state}")
        elif kind == "artifact-update":
            artifact = event.get("artifact", {})
            parts = artifact.get("parts", [])
            text = parts[0].get("text", "") if parts else ""
            print(f"  artifact-update: {text[:120]}")
    print(f"  total events: {len(seen)}")

    # ---------------------------------------------------------------
    # Part 7: backwards-compat flat invoke for non-spec peers
    # ---------------------------------------------------------------
    print("\n=== Part 7: legacy /a2a/invoke (backwards-compat) ===\n")
    text = await client.invoke("Give me a one-line summary of A2A.")
    print(f"  flat reply: {text[:120]}")

    # ---------------------------------------------------------------
    # Part 8: wrap a remote agent as a local @tool
    # ---------------------------------------------------------------
    print("\n=== Part 8: A2AClient.as_tool ===\n")
    tool = client.as_tool(name="ask_research", description="ask the research agent")
    print(f"  tool.name = {tool.name}, tool.description = {tool.description}")
    # tool.fn calls asyncio.run() internally, so it can only be invoked
    # from sync code. We're already inside an async main, so just
    # inspect the tool object instead of calling it.

    print("\n" + "=" * 60)
    print("Next: Notebook 30 — DeepAgent")
    print("=" * 60)


if __name__ == "__main__":
    asyncio.run(main())