Streaming Integration
If you're using waxell.init(), streaming responses from OpenAI, Anthropic, Gemini, Cohere, and Bedrock are captured automatically -- the SDK wraps stream iterators to aggregate tokens, timing, and cost without any manual code. The examples below show the simple decorator pattern. See Advanced: Custom Stream Processing for cases where you need manual control.
When auto-instrumentation is active, streaming Just Works: you iterate the stream as you normally would, and the SDK records content, model, tokens (input and output), latency, and cost behind the scenes.
OpenAI Streaming
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
# Import AFTER init() so OpenAI is auto-instrumented
from openai import AsyncOpenAI
client = AsyncOpenAI()
@waxell.observe(agent_name="streaming-agent")
async def stream_openai(query: str) -> str:
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Be concise."},
{"role": "user", "content": query},
],
stream=True,
)
content = ""
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
text = chunk.choices[0].delta.content
content += text
print(text, end="", flush=True) # stream to user
return content
# Run it
answer = await stream_openai(
"Explain quantum computing",
session_id="sess_001",
user_id="user_alice",
)
That's it. The trace contains the model, full content, prompt + completion tokens, latency, and cost -- all captured by auto-instrumentation.
Anthropic Streaming
Anthropic streams typed events instead of chunks, but auto-instrumentation handles both -- you iterate normally:
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")
import anthropic
client = anthropic.AsyncAnthropic()
@waxell.observe(agent_name="streaming-claude")
async def stream_claude(query: str) -> str:
stream = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
)
content = ""
async for event in stream:
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
text = event.delta.text
content += text
print(text, end="", flush=True)
return content
Input tokens (from message_start) and output tokens (from message_delta) are extracted automatically.
Streaming Comparison Example
Compare streaming across providers in a single trace. Each call is auto-captured:
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")
from openai import AsyncOpenAI
import anthropic
openai_client = AsyncOpenAI()
anthropic_client = anthropic.AsyncAnthropic()
@waxell.observe(agent_name="streaming-demo", workflow_name="streaming-comparison")
async def compare_streaming(query: str) -> dict:
waxell.tag("demo", "streaming")
waxell.tag("providers", "openai,anthropic")
# --- OpenAI Streaming (auto-captured) ---
print("OpenAI: ", end="", flush=True)
openai_content = ""
async for chunk in await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
stream=True,
):
if chunk.choices and chunk.choices[0].delta.content:
openai_content += chunk.choices[0].delta.content
print(chunk.choices[0].delta.content, end="", flush=True)
print()
waxell.step("openai_stream", output={"chars": len(openai_content)})
# --- Anthropic Streaming (auto-captured) ---
print("Anthropic: ", end="", flush=True)
anthropic_content = ""
async for event in await anthropic_client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
):
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
anthropic_content += event.delta.text
print(event.delta.text, end="", flush=True)
print()
waxell.step("anthropic_stream", output={"chars": len(anthropic_content)})
return {
"openai_response": openai_content[:500],
"anthropic_response": anthropic_content[:500],
}
# Run it
await compare_streaming(
"Explain quantum computing in simple terms",
session_id="sess_compare_001",
)
Key Differences
| Aspect | OpenAI | Anthropic |
|---|---|---|
| Iterator | async for chunk in stream | async for event in stream |
| Content location | chunk.choices[0].delta.content | event.delta.text (when event.type == "content_block_delta") |
| Token availability | Final chunk (if requested) | Separate message_start / message_delta events |
For both providers, auto-instrumentation extracts model, content, tokens, and cost without any manual code.
Advanced: Custom Stream Processing
If you need to process the stream in ways the SDK can't infer (e.g. an unsupported provider, custom event handling, or you want to record specific per-chunk telemetry), drop down to the context manager and call record_llm_call yourself after the stream completes:
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")
from waxell_observe import WaxellContext
async with WaxellContext(agent_name="custom-stream") as ctx:
# ... your custom streaming logic, accumulate content + tokens ...
ctx.record_llm_call(
model="my-custom-model",
tokens_in=tokens_in,
tokens_out=tokens_out,
response_preview=content[:200],
)
See the Context Manager page for the full API.
Token Estimation
If you do need to estimate tokens in a custom-streaming scenario:
def estimate_tokens(text: str) -> int:
"""Rough estimate: ~1.3 tokens per word for English."""
return int(len(text.split()) * 1.3)
For more accurate estimation with OpenAI models:
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
Best Practices
- Trust auto-instrumentation -- iterate the stream as normal, the SDK handles capture
- Wrap streaming in
@observe-- groups the stream into a tracked run - Use
print(..., flush=True)for real-time UX -- streams to the user while the SDK records - Drop to a context manager only for unsupported providers -- 95% of cases don't need it
Next Steps
- OpenAI Integration -- Non-streaming patterns
- Anthropic Integration -- Non-streaming patterns
- Multi-Agent -- Streaming in multi-agent systems