Skip to content

Commit 94170b9

Browse files
committed
feat: add StreamReservation context manager for streaming DX
Add StreamReservation and AsyncStreamReservation context managers that automate the reserve → commit/release lifecycle for streaming use cases, reducing boilerplate from ~50 lines to ~15. - StreamUsage dataclass for accumulating tokens/cost during streaming - Auto-commit on successful exit, auto-release on exception - Heartbeat-based TTL extension for long-running streams - Commit retry via existing CommitRetryEngine - Cost resolution: explicit actual_cost > cost_fn > estimate fallback - Respects user-set ctx.metrics during streaming - Full spec validation (TTL, grace_period, subject constraints) - Handles IDEMPOTENCY_MISMATCH correctly (no release) - Client convenience: CyclesClient.stream_reservation() - 64 tests, 97% module coverage, 99.38% total coverage - Version bump: 0.2.0 → 0.3.0
1 parent 20b3c3b commit 94170b9

8 files changed

Lines changed: 2037 additions & 112 deletions

File tree

AUDIT.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,23 @@ Automated contract tests validate sample request/response payloads against the O
182182
- **Negative tests:** missing required fields, extra fields (additionalProperties), invalid enum values
183183
- **Enum value tests:** UnitEnum, ErrorCode, DecisionEnum, ReservationStatus, CommitOveragePolicy
184184
- **Spec fixture:** `tests/fixtures/cycles-protocol-v0.yaml` (copy of canonical spec)
185+
186+
---
187+
188+
## Streaming Convenience Module (added 2026-04-08)
189+
190+
**Module:** `runcycles/streaming.py`
191+
**Test file:** `tests/test_streaming.py` (57 tests, all passing)
192+
193+
Added `StreamReservation` and `AsyncStreamReservation` context managers that automate the reserve → commit/release lifecycle for streaming use cases. This is a DX convenience layer — no protocol changes.
194+
195+
- **`StreamReservation`** — sync context manager: reserves on `__enter__`, auto-commits on successful `__exit__`, auto-releases on exception
196+
- **`AsyncStreamReservation`** — async equivalent using `__aenter__`/`__aexit__`
197+
- **`StreamUsage`** — mutable accumulator for token counts and cost during streaming
198+
- **Client convenience methods:** `CyclesClient.stream_reservation()` and `AsyncCyclesClient.stream_reservation()` — thin factories that build Subject from config defaults
199+
- **Cost resolution:** explicit `usage.actual_cost` > `cost_fn(usage)` > estimate fallback
200+
- **Heartbeat:** automatic TTL extension, same interval formula as decorator lifecycle
201+
- **Commit retry:** uses existing `CommitRetryEngine`/`AsyncCommitRetryEngine`
202+
- **Context propagation:** sets/clears `CyclesContext` via `ContextVar`, accessible via `get_cycles_context()`
203+
204+
Protocol conformance: No new endpoints or protocol changes. All reservation, commit, release, and extend calls use the same client methods and body formats as the decorator path. Verified by 57 unit tests covering success, deny, error, retry, heartbeat, cost resolution, and context propagation.

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,34 @@ async def call_llm(prompt: str) -> str:
128128
result = await call_llm("Hello")
129129
```
130130
131+
### Streaming
132+
133+
For streaming LLM responses, use the `stream_reservation()` context manager. It reserves budget on enter, auto-commits on successful exit, and auto-releases on exception:
134+
135+
```python
136+
from runcycles import CyclesClient, CyclesConfig, Action, Amount, Unit
137+
138+
config = CyclesConfig(base_url="http://localhost:7878", api_key="your-api-key", tenant="acme")
139+
client = CyclesClient(config)
140+
141+
with client.stream_reservation(
142+
action=Action(kind="llm.completion", name="gpt-4o"),
143+
estimate=Amount(unit=Unit.USD_MICROCENTS, amount=max_tokens * 1000),
144+
cost_fn=lambda u: u.tokens_input * 250 + u.tokens_output * 1000,
145+
) as reservation:
146+
# Caps available immediately
147+
if reservation.caps and reservation.caps.max_tokens:
148+
max_tokens = min(max_tokens, reservation.caps.max_tokens)
149+
150+
for chunk in openai_stream:
151+
if chunk.usage:
152+
reservation.usage.tokens_input = chunk.usage.prompt_tokens
153+
reservation.usage.tokens_output = chunk.usage.completion_tokens
154+
# Committed automatically with actual cost from cost_fn
155+
```
156+
157+
Also available as `async with client.stream_reservation(...)` for async clients. See [streaming_usage.py](examples/streaming_usage.py) for a complete example.
158+
131159
## Configuration
132160
133161
### From environment variables

examples/streaming_usage.py

Lines changed: 23 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Budget-managed streaming with Cycles.
22
3-
Demonstrates the programmatic reserve → stream → commit pattern where the
4-
actual cost is only known after the stream completes.
3+
Demonstrates the StreamReservation context manager: reserve on enter,
4+
auto-commit on success, auto-release on exception.
55
66
Requirements:
77
pip install runcycles openai
@@ -14,23 +14,15 @@
1414
"""
1515

1616
import os
17-
import time
18-
import uuid
1917

2018
from openai import OpenAI
2119

2220
from runcycles import (
2321
Action,
2422
Amount,
2523
BudgetExceededError,
26-
CommitRequest,
2724
CyclesClient,
2825
CyclesConfig,
29-
CyclesMetrics,
30-
CyclesProtocolError,
31-
ReleaseRequest,
32-
ReservationCreateRequest,
33-
Subject,
3426
Unit,
3527
)
3628

@@ -50,7 +42,7 @@
5042

5143

5244
# ---------------------------------------------------------------------------
53-
# 2. Streaming with budget management
45+
# 2. Streaming with budget management (context manager API)
5446
# ---------------------------------------------------------------------------
5547
def stream_with_budget(
5648
prompt: str,
@@ -59,65 +51,25 @@ def stream_with_budget(
5951
) -> str:
6052
"""Stream an OpenAI response with Cycles budget protection.
6153
62-
The pattern:
63-
1. Reserve budget based on max_tokens (worst case)
64-
2. Stream the response, accumulating output
65-
3. Commit the actual cost after the stream completes
66-
4. Release the reservation if streaming fails
54+
The StreamReservation context manager handles:
55+
- Creating a reservation on enter
56+
- Auto-committing actual cost on successful exit
57+
- Auto-releasing the reservation on exception
58+
- Heartbeat-based TTL extension for long streams
6759
"""
6860
estimated_input_tokens = len(prompt.split()) * 2
69-
estimated_cost = (
70-
estimated_input_tokens * PRICE_PER_INPUT_TOKEN
71-
+ max_tokens * PRICE_PER_OUTPUT_TOKEN
72-
)
73-
74-
idempotency_key = str(uuid.uuid4())
75-
76-
# Step 1: Reserve budget
77-
reserve_response = cycles_client.create_reservation(
78-
ReservationCreateRequest(
79-
idempotency_key=idempotency_key,
80-
subject=Subject(tenant=config.tenant, agent="streaming-agent"),
81-
action=Action(kind="llm.completion", name=model),
82-
estimate=Amount(unit=Unit.USD_MICROCENTS, amount=estimated_cost),
83-
ttl_ms=120_000, # longer TTL for streaming
84-
)
85-
)
86-
87-
if not reserve_response.is_success:
88-
error = reserve_response.get_error_response()
89-
if error and error.error == "BUDGET_EXCEEDED":
90-
raise BudgetExceededError(
91-
error.message,
92-
status=reserve_response.status,
93-
error_code=error.error,
94-
request_id=error.request_id,
95-
details=error.details,
96-
)
97-
msg = error.message if error else (reserve_response.error_message or "Reservation failed")
98-
raise CyclesProtocolError(
99-
msg,
100-
status=reserve_response.status,
101-
error_code=error.error if error else None,
102-
request_id=error.request_id if error else None,
103-
details=error.details if error else None,
104-
)
105-
106-
reservation_id = reserve_response.get_body_attribute("reservation_id")
107-
decision = reserve_response.get_body_attribute("decision")
108-
109-
# Check for caps
110-
caps = reserve_response.get_body_attribute("caps")
111-
if caps and caps.get("max_tokens"):
112-
max_tokens = min(max_tokens, caps["max_tokens"])
113-
print(f" Budget authority capped max_tokens to {max_tokens}")
61+
estimated_cost = estimated_input_tokens * PRICE_PER_INPUT_TOKEN + max_tokens * PRICE_PER_OUTPUT_TOKEN
62+
63+
with cycles_client.stream_reservation(
64+
action=Action(kind="llm.completion", name=model),
65+
estimate=Amount(unit=Unit.USD_MICROCENTS, amount=estimated_cost),
66+
cost_fn=lambda u: u.tokens_input * PRICE_PER_INPUT_TOKEN + u.tokens_output * PRICE_PER_OUTPUT_TOKEN,
67+
) as reservation:
68+
# Caps are available immediately after entering the context
69+
if reservation.caps and reservation.caps.max_tokens:
70+
max_tokens = min(max_tokens, reservation.caps.max_tokens)
71+
print(f" Budget authority capped max_tokens to {max_tokens}")
11472

115-
# Step 2: Stream the response
116-
start_time = time.time()
117-
chunks: list[str] = []
118-
completion_tokens = 0
119-
120-
try:
12173
stream = openai_client.chat.completions.create(
12274
model=model,
12375
messages=[{"role": "user", "content": prompt}],
@@ -126,6 +78,7 @@ def stream_with_budget(
12678
stream_options={"include_usage": True},
12779
)
12880

81+
chunks: list[str] = []
12982
for chunk in stream:
13083
if chunk.choices and chunk.choices[0].delta.content:
13184
text = chunk.choices[0].delta.content
@@ -134,48 +87,12 @@ def stream_with_budget(
13487

13588
# The final chunk includes usage stats
13689
if chunk.usage:
137-
input_tokens = chunk.usage.prompt_tokens
138-
completion_tokens = chunk.usage.completion_tokens
90+
reservation.usage.tokens_input = chunk.usage.prompt_tokens
91+
reservation.usage.tokens_output = chunk.usage.completion_tokens
13992

14093
print() # newline after streaming
14194

142-
except Exception:
143-
# If streaming fails, release the reservation to free budget
144-
cycles_client.release_reservation(
145-
reservation_id,
146-
ReleaseRequest(idempotency_key=f"release-{idempotency_key}"),
147-
)
148-
raise
149-
150-
# Step 3: Commit actual cost
151-
elapsed_ms = int((time.time() - start_time) * 1000)
152-
actual_cost = (
153-
input_tokens * PRICE_PER_INPUT_TOKEN
154-
+ completion_tokens * PRICE_PER_OUTPUT_TOKEN
155-
)
156-
157-
commit_response = cycles_client.commit_reservation(
158-
reservation_id,
159-
CommitRequest(
160-
idempotency_key=f"commit-{idempotency_key}",
161-
actual=Amount(unit=Unit.USD_MICROCENTS, amount=actual_cost),
162-
metrics=CyclesMetrics(
163-
tokens_input=input_tokens,
164-
tokens_output=completion_tokens,
165-
latency_ms=elapsed_ms,
166-
model_version=model,
167-
custom={"streamed": True, "decision": decision},
168-
),
169-
),
170-
)
171-
172-
if not commit_response.is_success:
173-
print(f" Warning: commit failed: {commit_response.error_message}")
174-
175-
savings = estimated_cost - actual_cost
176-
print(f" Estimated: {estimated_cost} microcents, Actual: {actual_cost} microcents")
177-
print(f" Budget saved by accurate commit: {savings} microcents")
178-
95+
# Auto-committed on exit with actual cost computed by cost_fn
17996
return "".join(chunks)
18097

18198

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "runcycles"
7-
version = "0.2.0"
7+
version = "0.3.0"
88
description = "Python client for the Cycles budget-management protocol"
99
readme = "README.md"
1010
license = "Apache-2.0"

runcycles/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
Unit,
5252
)
5353
from runcycles.response import CyclesResponse
54+
from runcycles.streaming import AsyncStreamReservation, StreamReservation, StreamUsage
5455

5556
__all__ = [
5657
# Client
@@ -67,6 +68,10 @@
6768
"get_cycles_context",
6869
# Response
6970
"CyclesResponse",
71+
# Streaming
72+
"StreamReservation",
73+
"AsyncStreamReservation",
74+
"StreamUsage",
7075
# Exceptions
7176
"CyclesError",
7277
"CyclesProtocolError",

0 commit comments

Comments
 (0)