Reference implementation of ARCP v1.1 for Python. The wire is defined
in ../spec/docs/draft-arcp-02.1.md;
this SDK is one of eleven implementations tracked in the workspace's
per-SDK conformance pages.
uv add arcppip install arcpThe Python SDK ships as a single distribution. Subpackages:
| Subpackage | What it contains |
|---|---|
arcp |
Top-level re-exports (ARCPClient, ARCPRuntime, MemoryTransport, pair_memory_transports, errors, version constants). |
arcp.client |
ARCPClient, JobHandle, JobSubscription, AutoAckOptions. |
arcp.runtime |
ARCPRuntime, JobContext, SessionContext, agent registry, lease helpers. |
arcp.transport |
Transport protocol, MemoryTransport, WebSocketTransport, StdioTransport. |
arcp.middleware.asgi |
arcp_asgi_app(runtime, *, allowed_hosts) for Starlette / FastAPI / Litestar / Quart. |
arcp.middleware.aiohttp |
arcp_aiohttp_handler(runtime) and serve_arcp_aiohttp(...) for aiohttp.web.Application. |
arcp.middleware.otel |
with_tracing(inner, *, tracer) — W3C trace context + v1.1 span attrs (§11). |
All of the above ship in one wheel; no à-la-carte install.
import asyncio
from arcp import (
ARCPClient, ClientInfo, RuntimeInfo, pair_memory_transports,
)
from arcp.runtime import ARCPRuntime, StaticBearerVerifier
TOKEN = "demo"
async def echo(input_value, ctx):
await ctx.log("info", "echo started")
return {"echoed": input_value}
async def main() -> None:
runtime = ARCPRuntime(
runtime=RuntimeInfo(name="quickstart", version="1.1.0"),
bearer=StaticBearerVerifier({TOKEN: "me@example.com"}),
)
runtime.register_agent("echo", echo)
client_t, server_t = pair_memory_transports()
async with asyncio.TaskGroup() as tg:
tg.create_task(runtime.accept(server_t))
client = ARCPClient(
client=ClientInfo(name="quickstart-client", version="1.0.0"),
token=TOKEN,
)
await client.connect(client_t)
handle = await client.submit(agent="echo", input={"hi": 1})
result = await handle.done
print(result.result)
await client.close()
asyncio.run(main())The runnable two-process WebSocket variant is at
examples/submit_and_stream/.
| Piece | Spec | Module |
|---|---|---|
| Envelope | §5 | arcp.Envelope |
| Transport | §4 | arcp.transport |
| Session | §6 | arcp.runtime.SessionContext, arcp.client.ARCPClient |
| Job | §7 | arcp.runtime.Job, arcp.client.JobHandle |
| Event | §8 | arcp.runtime.JobContext (emitters) |
| Lease | §9 | arcp.runtime.validate_lease_op, LeaseConstraints |
| Delegation | §10 | JobContext.delegate(...) (recursive submit) |
Cancellation: client.cancel_job(job_id) propagates as
asyncio.CancelledError into the agent body; cleanup runs before the
terminal job.error (code: "CANCELLED") ships. Event payloads are
discriminated unions on payload.kind, validated with pydantic v2.
The nine v1.1 features negotiate per spec §6.2 (intersection of the
client's capabilities.features with the runtime's, in client
order): heartbeat,
ack,
list_jobs,
subscribe,
agent_versions,
lease_expires_at,
cost.budget,
progress,
result_chunk. Features absent
from the negotiated set behave as if unsupported on both sides; see
docs/03-features/capability-negotiation.md.
import asyncio
from arcp import RuntimeInfo, serve_websocket
from arcp.runtime import ARCPRuntime, StaticBearerVerifier
runtime = ARCPRuntime(
runtime=RuntimeInfo(name="my-runtime", version="1.1.0"),
bearer=StaticBearerVerifier({"tok": "me@example.com"}),
)
runtime.register_agent("echo", lambda inp, ctx: {"echoed": inp})
async def main() -> None:
server = await serve_websocket(
runtime.accept, host="127.0.0.1", port=7777, path="/arcp",
)
async with server:
await server.serve_forever()
asyncio.run(main())To mount inside an existing ASGI app, use
arcp.middleware.asgi.arcp_asgi_app(runtime, allowed_hosts=[...])
on a WebSocket route.
uv run arcp serve --host 127.0.0.1 --port 7777 \
--token tok --principal me@example.com
uv run arcp submit --url ws://127.0.0.1:7777/arcp \
--token tok --agent echo --input '{"hi":1}'
uv run arcp tail --url ws://127.0.0.1:7777/arcp \
--token tok --job-id job_01J...
uv run arcp replay --db arcp.db --session sess_XYZ --after-seq 0Commands are implemented in src/arcp/cli.py.
import os
import asyncio
from arcp import ARCPClient, ClientInfo, WebSocketTransport
async def main() -> None:
transport = await WebSocketTransport.connect("wss://runtime.example.com/arcp")
client = ARCPClient(
client=ClientInfo(name="my-client", version="1.0.0"),
token=os.environ["TOKEN"],
)
await client.connect(transport)
try:
handle = await client.submit(
agent="weekly-report",
input={"week": "2026-W19"},
lease_request={"net.fetch": ["s3://example/**"]},
idempotency_key="weekly-report-2026-W19",
)
async for event in handle.events():
# elided: render event to stdout / UI
pass
result = await handle.done
print(result.result)
finally:
await client.close()
asyncio.run(main())client.connect(transport) performs the handshake and starts the
read pump; client.close() sends session.bye (spec §6.7) and
closes the transport. Cancellation propagates asyncio.CancelledError
into handle.done.
Per-section status, with source citations, lives at
docs/06-conformance.md. All v1.1
normative requirements are implemented; the deferred items listed at
the bottom of that page are intentional, not bugs.
Core (v1.0):
| Example | What it shows |
|---|---|
submit-and-stream |
Connect, submit, iterate events, close. |
delegate |
Parent agent submits a child job. |
resume |
Reattach to a session and replay. |
idempotent-retry |
Same key → same job_id. |
lease-violation |
Authorize outside the granted lease. |
cancel |
job.cancel → terminal CANCELLED. |
stdio |
Child-process subagent over pipes. |
vendor-extensions |
x-vendor.* passthrough. |
custom-auth |
Custom BearerVerifier. |
v1.1 features:
| Example | Feature |
|---|---|
heartbeat |
heartbeat |
ack-backpressure |
ack |
list-jobs |
list_jobs |
subscribe |
subscribe |
agent-versions |
agent_versions |
lease-expires-at |
lease_expires_at |
cost-budget |
cost.budget |
progress |
progress |
result-chunk |
result_chunk |
Host integrations:
| Example | Host |
|---|---|
host-tracing |
OpenTelemetry |
host-asgi |
Starlette / FastAPI |
host-aiohttp |
aiohttp |
uv sync # install incl. dev extras
uv run pytest # tests/
uv run ruff check # lint + format check
uv run pyright # strict type check