Streaming Agent
A multi-agent streaming comparison pipeline that runs OpenAI and Anthropic in streaming mode side-by-side, capturing per-chunk token counts and comparing response quality with reasoning and scoring.
Environment variables
This example requires OPENAI_API_KEY, ANTHROPIC_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls.
Architecture
Key Code
OpenAI Streaming with Chunk Counting
The openai-streamer child agent streams a response from OpenAI, counting chunks and tokens as they arrive.
@waxell.observe(agent_name="openai-streamer", workflow_name="openai-streaming", capture_io=True)
async def run_openai_stream(query: str, client, *, dry_run: bool = False, waxell_ctx=None) -> dict:
waxell.tag("provider", "openai")
waxell.tag("mode", "streaming")
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant. Be concise."},
{"role": "user", "content": query},
],
stream=True,
)
content = ""
chunk_count = 0
async for chunk in stream:
chunk_count += 1
if chunk.choices and chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content
waxell.score("stream_quality", 0.85, comment="OpenAI streaming quality")
return {"content": content, "chunks": chunk_count, "total_tokens": tokens_in + tokens_out}
Streaming Comparison with @reasoning
After both providers stream their responses, @reasoning documents the comparison logic.
@waxell.reasoning_dec(step="compare_streaming_results")
def compare_streaming_results(openai_result: dict, anthropic_result: dict) -> dict:
winner = "openai" if openai_result["content_length"] > anthropic_result["content_length"] else "anthropic"
return {
"thought": f"OpenAI: {openai_result['content_length']} chars, Anthropic: {anthropic_result['content_length']} chars",
"evidence": [
f"OpenAI tokens: {openai_result['total_tokens']}",
f"Anthropic tokens: {anthropic_result['total_tokens']}",
],
"conclusion": f"{winner} produced the longer response",
}
What this demonstrates
- Streaming LLM instrumentation -- both OpenAI and Anthropic streaming calls are auto-captured with per-chunk token accounting.
- Multi-agent comparison -- an orchestrator coordinates two child agents (
openai-streamer,anthropic-streamer) with automatic parent-child traces. @decisiondecorator -- selects comparison mode (sequential, side-by-side, race) before executing the pipeline.@reasoningdecorator -- documents the comparison logic between the two streaming results.waxell.score()-- attaches stream quality scores to each child agent's trace.
Run it
# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.streaming_agent --dry-run
# Live mode
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
python -m app.demos.streaming_agent