Streaming Integration
Streaming responses require special handling -- accumulate chunks, extract tokens from the final event, and record after completion.
OpenAI Streaming
OpenAI streams response chunks with an optional final usage event:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...")
from waxell_observe import WaxellContext
from openai import AsyncOpenAI
client = AsyncOpenAI()
async with WaxellContext(agent_name="streaming-agent") as ctx:
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Be concise."},
{"role": "user", "content": "Explain quantum computing"},
],
stream=True,
)
content = ""
tokens_in = 0
tokens_out = 0
chunk_count = 0
async for chunk in stream:
chunk_count += 1
# Accumulate content
if chunk.choices and chunk.choices[0].delta.content:
text = chunk.choices[0].delta.content
content += text
print(text, end="", flush=True) # Stream to user
# Extract tokens from final chunk (if available)
if chunk.usage:
tokens_in = chunk.usage.prompt_tokens
tokens_out = chunk.usage.completion_tokens
# Fallback token estimation if not provided
if not tokens_out:
tokens_in = 150 # Estimate based on prompt
tokens_out = max(len(content.split()) * 2, 80)
print() # Newline after stream
# Record after stream completes
ctx.record_step("stream_response", output={
"chunks": chunk_count,
"content_length": len(content),
"tokens_in": tokens_in,
"tokens_out": tokens_out,
})
ctx.record_llm_call(
model="gpt-4o-mini",
tokens_in=tokens_in,
tokens_out=tokens_out,
task="stream_response",
response_preview=content[:200],
)
ctx.set_result({"response": content})
Anthropic Streaming
Anthropic uses typed events instead of chunks:
from waxell_observe import WaxellContext
import anthropic
client = anthropic.AsyncAnthropic()
async with WaxellContext(agent_name="streaming-claude") as ctx:
stream = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)
content = ""
tokens_in = 0
tokens_out = 0
event_count = 0
async for event in stream:
event_count += 1
# Content comes in content_block_delta events
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
text = event.delta.text
content += text
print(text, end="", flush=True)
# Input tokens in message_start
elif event.type == "message_start" and hasattr(event, "message"):
if hasattr(event.message, "usage"):
tokens_in = event.message.usage.input_tokens
# Output tokens in message_delta
elif event.type == "message_delta" and hasattr(event, "usage"):
if event.usage:
tokens_out = event.usage.output_tokens
# Fallback estimation
if not tokens_out:
tokens_in = 150
tokens_out = max(len(content.split()) * 2, 80)
print()
ctx.record_step("stream_response", output={
"events": event_count,
"content_length": len(content),
"tokens_in": tokens_in,
"tokens_out": tokens_out,
})
ctx.record_llm_call(
model="claude-sonnet-4",
tokens_in=tokens_in,
tokens_out=tokens_out,
task="stream_response",
response_preview=content[:200],
)
Streaming Comparison Example
Compare streaming behavior across providers:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...")
from waxell_observe import WaxellContext, generate_session_id
from openai import AsyncOpenAI
import anthropic
openai_client = AsyncOpenAI()
anthropic_client = anthropic.AsyncAnthropic()
async def compare_streaming(query: str):
session = generate_session_id()
async with WaxellContext(
agent_name="streaming-demo",
workflow_name="streaming-comparison",
inputs={"query": query},
session_id=session,
) as ctx:
ctx.set_tag("demo", "streaming")
ctx.set_tag("providers", "openai,anthropic")
# --- OpenAI Streaming ---
print("OpenAI: ", end="", flush=True)
openai_stream = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
stream=True,
)
openai_content = ""
openai_chunks = 0
openai_tokens_in, openai_tokens_out = 0, 0
async for chunk in openai_stream:
openai_chunks += 1
if chunk.choices and chunk.choices[0].delta.content:
openai_content += chunk.choices[0].delta.content
print(chunk.choices[0].delta.content, end="", flush=True)
if chunk.usage:
openai_tokens_in = chunk.usage.prompt_tokens
openai_tokens_out = chunk.usage.completion_tokens
if not openai_tokens_out:
openai_tokens_in = 150
openai_tokens_out = len(openai_content.split()) * 2
print(f"\n({openai_chunks} chunks, {openai_tokens_in}+{openai_tokens_out} tokens)\n")
ctx.record_step("openai_stream", output={
"chunks": openai_chunks,
"tokens": openai_tokens_in + openai_tokens_out,
})
ctx.record_llm_call(
model="gpt-4o-mini",
tokens_in=openai_tokens_in,
tokens_out=openai_tokens_out,
task="openai_stream",
)
# --- Anthropic Streaming ---
print("Anthropic: ", end="", flush=True)
anthropic_stream = await anthropic_client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
)
anthropic_content = ""
anthropic_events = 0
anthropic_tokens_in, anthropic_tokens_out = 0, 0
async for event in anthropic_stream:
anthropic_events += 1
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
anthropic_content += event.delta.text
print(event.delta.text, end="", flush=True)
elif event.type == "message_start" and hasattr(event.message, "usage"):
anthropic_tokens_in = event.message.usage.input_tokens
elif event.type == "message_delta" and event.usage:
anthropic_tokens_out = event.usage.output_tokens
if not anthropic_tokens_out:
anthropic_tokens_in = 150
anthropic_tokens_out = len(anthropic_content.split()) * 2
print(f"\n({anthropic_events} events, {anthropic_tokens_in}+{anthropic_tokens_out} tokens)\n")
ctx.record_step("anthropic_stream", output={
"events": anthropic_events,
"tokens": anthropic_tokens_in + anthropic_tokens_out,
})
ctx.record_llm_call(
model="claude-sonnet-4",
tokens_in=anthropic_tokens_in,
tokens_out=anthropic_tokens_out,
task="anthropic_stream",
)
# --- Comparison ---
ctx.record_step("compare_results", output={
"openai": {"chars": len(openai_content), "tokens": openai_tokens_in + openai_tokens_out},
"anthropic": {"chars": len(anthropic_content), "tokens": anthropic_tokens_in + anthropic_tokens_out},
})
ctx.set_result({
"openai_response": openai_content[:500],
"anthropic_response": anthropic_content[:500],
})
# Run it
await compare_streaming("Explain quantum computing in simple terms")
Key Differences
| Aspect | OpenAI | Anthropic |
|---|---|---|
| Iterator | async for chunk in stream | async for event in stream |
| Content location | chunk.choices[0].delta.content | event.delta.text (when event.type == "content_block_delta") |
| Input tokens | chunk.usage.prompt_tokens | event.message.usage.input_tokens (in message_start) |
| Output tokens | chunk.usage.completion_tokens | event.usage.output_tokens (in message_delta) |
| Token availability | Final chunk (if requested) | Separate events |
Token Estimation
When tokens aren't provided in the stream:
def estimate_tokens(text: str) -> int:
"""Rough estimate: ~1.3 tokens per word for English."""
return int(len(text.split()) * 1.3)
# Usage
if not tokens_out:
tokens_out = estimate_tokens(content)
For more accurate estimation, use tiktoken for OpenAI models:
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
Best Practices
- Always accumulate content -- Don't rely on token counts alone
- Record after stream completes -- Call
record_llm_callonce at the end - Include chunk/event counts -- Useful for debugging latency issues
- Provide fallback estimates -- Not all streams include usage data
- Use flush for real-time output --
print(..., flush=True)for immediate display - Handle different event types -- Anthropic has multiple event types
Next Steps
- OpenAI Integration -- Non-streaming patterns
- Anthropic Integration -- Non-streaming patterns
- Multi-Agent -- Streaming in multi-agent systems