Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions AUDIT.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,26 @@ Automated contract tests validate sample request/response payloads against the O
- **Negative tests:** missing required fields, extra fields (additionalProperties), invalid enum values
- **Enum value tests:** UnitEnum, ErrorCode, DecisionEnum, ReservationStatus, CommitOveragePolicy
- **Spec fixture:** `tests/fixtures/cycles-protocol-v0.yaml` (copy of canonical spec)

---

## Streaming Convenience Module (added 2026-04-08)

**Module:** `runcycles/streaming.py`
**Test file:** `tests/test_streaming.py` (64 tests, all passing)
**Version:** 0.3.0

Added `StreamReservation` and `AsyncStreamReservation` context managers that automate the reserve → commit/release lifecycle for streaming use cases. This is a DX convenience layer — no protocol changes.

- **`StreamReservation`** — sync context manager: reserves on `__enter__`, auto-commits on successful `__exit__`, auto-releases on exception
- **`AsyncStreamReservation`** — async equivalent using `__aenter__`/`__aexit__`
- **`StreamUsage`** — mutable accumulator for token counts and cost during streaming
- **Client convenience methods:** `CyclesClient.stream_reservation()` and `AsyncCyclesClient.stream_reservation()` — thin factories that build Subject from config defaults
- **Cost resolution:** explicit `usage.actual_cost` > `cost_fn(usage)` > estimate fallback
- **Heartbeat:** automatic TTL extension, same interval formula as decorator lifecycle (`max(ttl_ms / 2, 1000)` ms)
- **Commit retry:** uses existing `CommitRetryEngine`/`AsyncCommitRetryEngine`
- **Context propagation:** sets/clears `CyclesContext` via `ContextVar`, accessible via `get_cycles_context()`; respects user-set `ctx.metrics` during streaming
- **Spec validation:** `validate_ttl_ms()` (1000–86400000), `validate_grace_period_ms()` (0–60000), `validate_subject()` (at least one standard field) — matches lifecycle.py
- **Error handling:** `RESERVATION_FINALIZED`, `RESERVATION_EXPIRED`, and `IDEMPOTENCY_MISMATCH` do not trigger release; other 4xx client errors do trigger release — matches lifecycle.py behavior exactly

Protocol conformance: No new endpoints or protocol changes. All reservation, commit, release, and extend calls use the same client methods and body formats as the decorator path. Verified by 64 unit tests covering success, deny, error, retry, heartbeat, cost resolution, context propagation, spec validation, and all commit error-code branches.
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,47 @@ async def call_llm(prompt: str) -> str:
result = await call_llm("Hello")
```

### Streaming

For streaming LLM responses, use the `stream_reservation()` context manager. It reserves budget on enter, auto-commits on successful exit, and auto-releases on exception:

```python
from openai import OpenAI
from runcycles import CyclesClient, CyclesConfig, Action, Amount, Unit

config = CyclesConfig(base_url="http://localhost:7878", api_key="your-api-key", tenant="acme")
cycles_client = CyclesClient(config)
openai_client = OpenAI()
max_tokens = 1024

with cycles_client.stream_reservation(
action=Action(kind="llm.completion", name="gpt-4o"),
estimate=Amount(unit=Unit.USD_MICROCENTS, amount=max_tokens * 1000),
cost_fn=lambda u: u.tokens_input * 250 + u.tokens_output * 1000,
) as reservation:
# Caps available immediately after entering the context
if reservation.caps and reservation.caps.max_tokens:
max_tokens = min(max_tokens, reservation.caps.max_tokens)

stream = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
max_tokens=max_tokens,
stream=True,
stream_options={"include_usage": True},
)

for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
if chunk.usage:
reservation.usage.tokens_input = chunk.usage.prompt_tokens
reservation.usage.tokens_output = chunk.usage.completion_tokens
# Committed automatically with actual cost computed by cost_fn
```

Also available as `async with client.stream_reservation(...)` for async clients. See [streaming_usage.py](examples/streaming_usage.py) for a complete example.

## Configuration

### From environment variables
Expand Down Expand Up @@ -374,7 +415,7 @@ The [`examples/`](examples/) directory contains runnable integration examples:
| [async_usage.py](examples/async_usage.py) | Async client and async decorator |
| [openai_integration.py](examples/openai_integration.py) | Guard OpenAI chat completions with budget checks |
| [anthropic_integration.py](examples/anthropic_integration.py) | Guard Anthropic messages with per-tool budget tracking |
| [streaming_usage.py](examples/streaming_usage.py) | Budget-managed streaming with token accumulation |
| [streaming_usage.py](examples/streaming_usage.py) | `stream_reservation()` context manager with auto-commit |
| [fastapi_integration.py](examples/fastapi_integration.py) | FastAPI middleware, dependency injection, per-tenant budgets |
| [langchain_integration.py](examples/langchain_integration.py) | LangChain callback handler for budget-aware agents |

Expand Down
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pip install runcycles
| [async_usage.py](async_usage.py) | Async client and async decorator | — |
| [openai_integration.py](openai_integration.py) | Guard OpenAI chat completions with budget checks | `openai` |
| [anthropic_integration.py](anthropic_integration.py) | Guard Anthropic messages with per-tool budget tracking | `anthropic` |
| [streaming_usage.py](streaming_usage.py) | Budget-managed streaming with token accumulation | `openai` |
| [streaming_usage.py](streaming_usage.py) | `stream_reservation()` context manager with auto-commit | `openai` |
| [fastapi_integration.py](fastapi_integration.py) | FastAPI middleware, dependency injection, per-tenant budgets | `fastapi`, `uvicorn` |
| [langchain_integration.py](langchain_integration.py) | LangChain callback handler for budget-aware agents | `langchain`, `langchain-openai` |

Expand Down
129 changes: 23 additions & 106 deletions examples/streaming_usage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Budget-managed streaming with Cycles.

Demonstrates the programmatic reserve → stream → commit pattern where the
actual cost is only known after the stream completes.
Demonstrates the StreamReservation context manager: reserve on enter,
auto-commit on success, auto-release on exception.

Requirements:
pip install runcycles openai
Expand All @@ -14,23 +14,15 @@
"""

import os
import time
import uuid

from openai import OpenAI

from runcycles import (
Action,
Amount,
BudgetExceededError,
CommitRequest,
CyclesClient,
CyclesConfig,
CyclesMetrics,
CyclesProtocolError,
ReleaseRequest,
ReservationCreateRequest,
Subject,
Unit,
)

Expand All @@ -50,7 +42,7 @@


# ---------------------------------------------------------------------------
# 2. Streaming with budget management
# 2. Streaming with budget management (context manager API)
# ---------------------------------------------------------------------------
def stream_with_budget(
prompt: str,
Expand All @@ -59,65 +51,25 @@ def stream_with_budget(
) -> str:
"""Stream an OpenAI response with Cycles budget protection.

The pattern:
1. Reserve budget based on max_tokens (worst case)
2. Stream the response, accumulating output
3. Commit the actual cost after the stream completes
4. Release the reservation if streaming fails
The StreamReservation context manager handles:
- Creating a reservation on enter
- Auto-committing actual cost on successful exit
- Auto-releasing the reservation on exception
- Heartbeat-based TTL extension for long streams
"""
estimated_input_tokens = len(prompt.split()) * 2
estimated_cost = (
estimated_input_tokens * PRICE_PER_INPUT_TOKEN
+ max_tokens * PRICE_PER_OUTPUT_TOKEN
)

idempotency_key = str(uuid.uuid4())

# Step 1: Reserve budget
reserve_response = cycles_client.create_reservation(
ReservationCreateRequest(
idempotency_key=idempotency_key,
subject=Subject(tenant=config.tenant, agent="streaming-agent"),
action=Action(kind="llm.completion", name=model),
estimate=Amount(unit=Unit.USD_MICROCENTS, amount=estimated_cost),
ttl_ms=120_000, # longer TTL for streaming
)
)

if not reserve_response.is_success:
error = reserve_response.get_error_response()
if error and error.error == "BUDGET_EXCEEDED":
raise BudgetExceededError(
error.message,
status=reserve_response.status,
error_code=error.error,
request_id=error.request_id,
details=error.details,
)
msg = error.message if error else (reserve_response.error_message or "Reservation failed")
raise CyclesProtocolError(
msg,
status=reserve_response.status,
error_code=error.error if error else None,
request_id=error.request_id if error else None,
details=error.details if error else None,
)

reservation_id = reserve_response.get_body_attribute("reservation_id")
decision = reserve_response.get_body_attribute("decision")

# Check for caps
caps = reserve_response.get_body_attribute("caps")
if caps and caps.get("max_tokens"):
max_tokens = min(max_tokens, caps["max_tokens"])
print(f" Budget authority capped max_tokens to {max_tokens}")
estimated_cost = estimated_input_tokens * PRICE_PER_INPUT_TOKEN + max_tokens * PRICE_PER_OUTPUT_TOKEN

with cycles_client.stream_reservation(
action=Action(kind="llm.completion", name=model),
estimate=Amount(unit=Unit.USD_MICROCENTS, amount=estimated_cost),
cost_fn=lambda u: u.tokens_input * PRICE_PER_INPUT_TOKEN + u.tokens_output * PRICE_PER_OUTPUT_TOKEN,
) as reservation:
# Caps are available immediately after entering the context
if reservation.caps and reservation.caps.max_tokens:
max_tokens = min(max_tokens, reservation.caps.max_tokens)
print(f" Budget authority capped max_tokens to {max_tokens}")

# Step 2: Stream the response
start_time = time.time()
chunks: list[str] = []
completion_tokens = 0

try:
stream = openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
Expand All @@ -126,6 +78,7 @@ def stream_with_budget(
stream_options={"include_usage": True},
)

chunks: list[str] = []
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
text = chunk.choices[0].delta.content
Expand All @@ -134,48 +87,12 @@ def stream_with_budget(

# The final chunk includes usage stats
if chunk.usage:
input_tokens = chunk.usage.prompt_tokens
completion_tokens = chunk.usage.completion_tokens
reservation.usage.tokens_input = chunk.usage.prompt_tokens
reservation.usage.tokens_output = chunk.usage.completion_tokens

print() # newline after streaming

except Exception:
# If streaming fails, release the reservation to free budget
cycles_client.release_reservation(
reservation_id,
ReleaseRequest(idempotency_key=f"release-{idempotency_key}"),
)
raise

# Step 3: Commit actual cost
elapsed_ms = int((time.time() - start_time) * 1000)
actual_cost = (
input_tokens * PRICE_PER_INPUT_TOKEN
+ completion_tokens * PRICE_PER_OUTPUT_TOKEN
)

commit_response = cycles_client.commit_reservation(
reservation_id,
CommitRequest(
idempotency_key=f"commit-{idempotency_key}",
actual=Amount(unit=Unit.USD_MICROCENTS, amount=actual_cost),
metrics=CyclesMetrics(
tokens_input=input_tokens,
tokens_output=completion_tokens,
latency_ms=elapsed_ms,
model_version=model,
custom={"streamed": True, "decision": decision},
),
),
)

if not commit_response.is_success:
print(f" Warning: commit failed: {commit_response.error_message}")

savings = estimated_cost - actual_cost
print(f" Estimated: {estimated_cost} microcents, Actual: {actual_cost} microcents")
print(f" Budget saved by accurate commit: {savings} microcents")

# Auto-committed on exit with actual cost computed by cost_fn
return "".join(chunks)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "runcycles"
version = "0.2.0"
version = "0.3.0"
description = "Python client for the Cycles budget-management protocol"
readme = "README.md"
license = "Apache-2.0"
Expand Down
5 changes: 5 additions & 0 deletions runcycles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
Unit,
)
from runcycles.response import CyclesResponse
from runcycles.streaming import AsyncStreamReservation, StreamReservation, StreamUsage

__all__ = [
# Client
Expand All @@ -67,6 +68,10 @@
"get_cycles_context",
# Response
"CyclesResponse",
# Streaming
"StreamReservation",
"AsyncStreamReservation",
"StreamUsage",
# Exceptions
"CyclesError",
"CyclesProtocolError",
Expand Down
Loading
Loading