Skip to main content

Streaming Integration

Auto-Instrumented Streaming

If you're using waxell.init(), streaming responses from OpenAI, Anthropic, Gemini, Cohere, and Bedrock are captured automatically -- the SDK wraps stream iterators to aggregate tokens, timing, and cost without any manual code. The examples below show the simple decorator pattern. See Advanced: Custom Stream Processing for cases where you need manual control.

When auto-instrumentation is active, streaming Just Works: you iterate the stream as you normally would, and the SDK records content, model, tokens (input and output), latency, and cost behind the scenes.

OpenAI Streaming

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")

# Import AFTER init() so OpenAI is auto-instrumented
from openai import AsyncOpenAI

client = AsyncOpenAI()


@waxell.observe(agent_name="streaming-agent")
async def stream_openai(query: str) -> str:
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Be concise."},
{"role": "user", "content": query},
],
stream=True,
)

content = ""
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
text = chunk.choices[0].delta.content
content += text
print(text, end="", flush=True) # stream to user

return content


# Run it
answer = await stream_openai(
"Explain quantum computing",
session_id="sess_001",
user_id="user_alice",
)

That's it. The trace contains the model, full content, prompt + completion tokens, latency, and cost -- all captured by auto-instrumentation.

Anthropic Streaming

Anthropic streams typed events instead of chunks, but auto-instrumentation handles both -- you iterate normally:

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")

import anthropic

client = anthropic.AsyncAnthropic()


@waxell.observe(agent_name="streaming-claude")
async def stream_claude(query: str) -> str:
stream = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
)

content = ""
async for event in stream:
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
text = event.delta.text
content += text
print(text, end="", flush=True)

return content

Input tokens (from message_start) and output tokens (from message_delta) are extracted automatically.

Streaming Comparison Example

Compare streaming across providers in a single trace. Each call is auto-captured:

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")

from openai import AsyncOpenAI
import anthropic

openai_client = AsyncOpenAI()
anthropic_client = anthropic.AsyncAnthropic()


@waxell.observe(agent_name="streaming-demo", workflow_name="streaming-comparison")
async def compare_streaming(query: str) -> dict:
waxell.tag("demo", "streaming")
waxell.tag("providers", "openai,anthropic")

# --- OpenAI Streaming (auto-captured) ---
print("OpenAI: ", end="", flush=True)
openai_content = ""
async for chunk in await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
stream=True,
):
if chunk.choices and chunk.choices[0].delta.content:
openai_content += chunk.choices[0].delta.content
print(chunk.choices[0].delta.content, end="", flush=True)
print()
waxell.step("openai_stream", output={"chars": len(openai_content)})

# --- Anthropic Streaming (auto-captured) ---
print("Anthropic: ", end="", flush=True)
anthropic_content = ""
async for event in await anthropic_client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
):
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
anthropic_content += event.delta.text
print(event.delta.text, end="", flush=True)
print()
waxell.step("anthropic_stream", output={"chars": len(anthropic_content)})

return {
"openai_response": openai_content[:500],
"anthropic_response": anthropic_content[:500],
}


# Run it
await compare_streaming(
"Explain quantum computing in simple terms",
session_id="sess_compare_001",
)

Key Differences

AspectOpenAIAnthropic
Iteratorasync for chunk in streamasync for event in stream
Content locationchunk.choices[0].delta.contentevent.delta.text (when event.type == "content_block_delta")
Token availabilityFinal chunk (if requested)Separate message_start / message_delta events

For both providers, auto-instrumentation extracts model, content, tokens, and cost without any manual code.

Advanced: Custom Stream Processing

If you need to process the stream in ways the SDK can't infer (e.g. an unsupported provider, custom event handling, or you want to record specific per-chunk telemetry), drop down to the context manager and call record_llm_call yourself after the stream completes:

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")

from waxell_observe import WaxellContext

async with WaxellContext(agent_name="custom-stream") as ctx:
# ... your custom streaming logic, accumulate content + tokens ...

ctx.record_llm_call(
model="my-custom-model",
tokens_in=tokens_in,
tokens_out=tokens_out,
response_preview=content[:200],
)

See the Context Manager page for the full API.

Token Estimation

If you do need to estimate tokens in a custom-streaming scenario:

def estimate_tokens(text: str) -> int:
"""Rough estimate: ~1.3 tokens per word for English."""
return int(len(text.split()) * 1.3)

For more accurate estimation with OpenAI models:

import tiktoken

def count_tokens(text: str, model: str = "gpt-4o") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))

Best Practices

  1. Trust auto-instrumentation -- iterate the stream as normal, the SDK handles capture
  2. Wrap streaming in @observe -- groups the stream into a tracked run
  3. Use print(..., flush=True) for real-time UX -- streams to the user while the SDK records
  4. Drop to a context manager only for unsupported providers -- 95% of cases don't need it

Next Steps