Skip to main content

Streaming Integration

Streaming responses require special handling -- accumulate chunks, extract tokens from the final event, and record after completion.

OpenAI Streaming

OpenAI streams response chunks with an optional final usage event:

import waxell_observe
waxell_observe.init(api_key="wax_sk_...")

from waxell_observe import WaxellContext
from openai import AsyncOpenAI

client = AsyncOpenAI()

async with WaxellContext(agent_name="streaming-agent") as ctx:
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Be concise."},
{"role": "user", "content": "Explain quantum computing"},
],
stream=True,
)

content = ""
tokens_in = 0
tokens_out = 0
chunk_count = 0

async for chunk in stream:
chunk_count += 1

# Accumulate content
if chunk.choices and chunk.choices[0].delta.content:
text = chunk.choices[0].delta.content
content += text
print(text, end="", flush=True) # Stream to user

# Extract tokens from final chunk (if available)
if chunk.usage:
tokens_in = chunk.usage.prompt_tokens
tokens_out = chunk.usage.completion_tokens

# Fallback token estimation if not provided
if not tokens_out:
tokens_in = 150 # Estimate based on prompt
tokens_out = max(len(content.split()) * 2, 80)

print() # Newline after stream

# Record after stream completes
ctx.record_step("stream_response", output={
"chunks": chunk_count,
"content_length": len(content),
"tokens_in": tokens_in,
"tokens_out": tokens_out,
})

ctx.record_llm_call(
model="gpt-4o-mini",
tokens_in=tokens_in,
tokens_out=tokens_out,
task="stream_response",
response_preview=content[:200],
)

ctx.set_result({"response": content})

Anthropic Streaming

Anthropic uses typed events instead of chunks:

from waxell_observe import WaxellContext
import anthropic

client = anthropic.AsyncAnthropic()

async with WaxellContext(agent_name="streaming-claude") as ctx:
stream = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)

content = ""
tokens_in = 0
tokens_out = 0
event_count = 0

async for event in stream:
event_count += 1

# Content comes in content_block_delta events
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
text = event.delta.text
content += text
print(text, end="", flush=True)

# Input tokens in message_start
elif event.type == "message_start" and hasattr(event, "message"):
if hasattr(event.message, "usage"):
tokens_in = event.message.usage.input_tokens

# Output tokens in message_delta
elif event.type == "message_delta" and hasattr(event, "usage"):
if event.usage:
tokens_out = event.usage.output_tokens

# Fallback estimation
if not tokens_out:
tokens_in = 150
tokens_out = max(len(content.split()) * 2, 80)

print()

ctx.record_step("stream_response", output={
"events": event_count,
"content_length": len(content),
"tokens_in": tokens_in,
"tokens_out": tokens_out,
})

ctx.record_llm_call(
model="claude-sonnet-4",
tokens_in=tokens_in,
tokens_out=tokens_out,
task="stream_response",
response_preview=content[:200],
)

Streaming Comparison Example

Compare streaming behavior across providers:

import waxell_observe
waxell_observe.init(api_key="wax_sk_...")

from waxell_observe import WaxellContext, generate_session_id
from openai import AsyncOpenAI
import anthropic

openai_client = AsyncOpenAI()
anthropic_client = anthropic.AsyncAnthropic()

async def compare_streaming(query: str):
session = generate_session_id()

async with WaxellContext(
agent_name="streaming-demo",
workflow_name="streaming-comparison",
inputs={"query": query},
session_id=session,
) as ctx:
ctx.set_tag("demo", "streaming")
ctx.set_tag("providers", "openai,anthropic")

# --- OpenAI Streaming ---
print("OpenAI: ", end="", flush=True)

openai_stream = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
stream=True,
)

openai_content = ""
openai_chunks = 0
openai_tokens_in, openai_tokens_out = 0, 0

async for chunk in openai_stream:
openai_chunks += 1
if chunk.choices and chunk.choices[0].delta.content:
openai_content += chunk.choices[0].delta.content
print(chunk.choices[0].delta.content, end="", flush=True)
if chunk.usage:
openai_tokens_in = chunk.usage.prompt_tokens
openai_tokens_out = chunk.usage.completion_tokens

if not openai_tokens_out:
openai_tokens_in = 150
openai_tokens_out = len(openai_content.split()) * 2

print(f"\n({openai_chunks} chunks, {openai_tokens_in}+{openai_tokens_out} tokens)\n")

ctx.record_step("openai_stream", output={
"chunks": openai_chunks,
"tokens": openai_tokens_in + openai_tokens_out,
})
ctx.record_llm_call(
model="gpt-4o-mini",
tokens_in=openai_tokens_in,
tokens_out=openai_tokens_out,
task="openai_stream",
)

# --- Anthropic Streaming ---
print("Anthropic: ", end="", flush=True)

anthropic_stream = await anthropic_client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
)

anthropic_content = ""
anthropic_events = 0
anthropic_tokens_in, anthropic_tokens_out = 0, 0

async for event in anthropic_stream:
anthropic_events += 1
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
anthropic_content += event.delta.text
print(event.delta.text, end="", flush=True)
elif event.type == "message_start" and hasattr(event.message, "usage"):
anthropic_tokens_in = event.message.usage.input_tokens
elif event.type == "message_delta" and event.usage:
anthropic_tokens_out = event.usage.output_tokens

if not anthropic_tokens_out:
anthropic_tokens_in = 150
anthropic_tokens_out = len(anthropic_content.split()) * 2

print(f"\n({anthropic_events} events, {anthropic_tokens_in}+{anthropic_tokens_out} tokens)\n")

ctx.record_step("anthropic_stream", output={
"events": anthropic_events,
"tokens": anthropic_tokens_in + anthropic_tokens_out,
})
ctx.record_llm_call(
model="claude-sonnet-4",
tokens_in=anthropic_tokens_in,
tokens_out=anthropic_tokens_out,
task="anthropic_stream",
)

# --- Comparison ---
ctx.record_step("compare_results", output={
"openai": {"chars": len(openai_content), "tokens": openai_tokens_in + openai_tokens_out},
"anthropic": {"chars": len(anthropic_content), "tokens": anthropic_tokens_in + anthropic_tokens_out},
})

ctx.set_result({
"openai_response": openai_content[:500],
"anthropic_response": anthropic_content[:500],
})

# Run it
await compare_streaming("Explain quantum computing in simple terms")

Key Differences

AspectOpenAIAnthropic
Iteratorasync for chunk in streamasync for event in stream
Content locationchunk.choices[0].delta.contentevent.delta.text (when event.type == "content_block_delta")
Input tokenschunk.usage.prompt_tokensevent.message.usage.input_tokens (in message_start)
Output tokenschunk.usage.completion_tokensevent.usage.output_tokens (in message_delta)
Token availabilityFinal chunk (if requested)Separate events

Token Estimation

When tokens aren't provided in the stream:

def estimate_tokens(text: str) -> int:
"""Rough estimate: ~1.3 tokens per word for English."""
return int(len(text.split()) * 1.3)

# Usage
if not tokens_out:
tokens_out = estimate_tokens(content)

For more accurate estimation, use tiktoken for OpenAI models:

import tiktoken

def count_tokens(text: str, model: str = "gpt-4o") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))

Best Practices

  1. Always accumulate content -- Don't rely on token counts alone
  2. Record after stream completes -- Call record_llm_call once at the end
  3. Include chunk/event counts -- Useful for debugging latency issues
  4. Provide fallback estimates -- Not all streams include usage data
  5. Use flush for real-time output -- print(..., flush=True) for immediate display
  6. Handle different event types -- Anthropic has multiple event types

Next Steps