Skip to main content

Streaming Agent

A multi-agent streaming comparison pipeline that runs OpenAI and Anthropic in streaming mode side-by-side, capturing per-chunk token counts and comparing response quality with reasoning and scoring.

Environment variables

This example requires OPENAI_API_KEY, ANTHROPIC_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls.

Architecture

Key Code

OpenAI Streaming with Chunk Counting

The openai-streamer child agent streams a response from OpenAI, counting chunks and tokens as they arrive.

@waxell.observe(agent_name="openai-streamer", workflow_name="openai-streaming", capture_io=True)
async def run_openai_stream(query: str, client, *, dry_run: bool = False, waxell_ctx=None) -> dict:
waxell.tag("provider", "openai")
waxell.tag("mode", "streaming")

stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant. Be concise."},
{"role": "user", "content": query},
],
stream=True,
)

content = ""
chunk_count = 0
async for chunk in stream:
chunk_count += 1
if chunk.choices and chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content

waxell.score("stream_quality", 0.85, comment="OpenAI streaming quality")
return {"content": content, "chunks": chunk_count, "total_tokens": tokens_in + tokens_out}

Streaming Comparison with @reasoning

After both providers stream their responses, @reasoning documents the comparison logic.

@waxell.reasoning_dec(step="compare_streaming_results")
def compare_streaming_results(openai_result: dict, anthropic_result: dict) -> dict:
winner = "openai" if openai_result["content_length"] > anthropic_result["content_length"] else "anthropic"
return {
"thought": f"OpenAI: {openai_result['content_length']} chars, Anthropic: {anthropic_result['content_length']} chars",
"evidence": [
f"OpenAI tokens: {openai_result['total_tokens']}",
f"Anthropic tokens: {anthropic_result['total_tokens']}",
],
"conclusion": f"{winner} produced the longer response",
}

What this demonstrates

  • Streaming LLM instrumentation -- both OpenAI and Anthropic streaming calls are auto-captured with per-chunk token accounting.
  • Multi-agent comparison -- an orchestrator coordinates two child agents (openai-streamer, anthropic-streamer) with automatic parent-child traces.
  • @decision decorator -- selects comparison mode (sequential, side-by-side, race) before executing the pipeline.
  • @reasoning decorator -- documents the comparison logic between the two streaming results.
  • waxell.score() -- attaches stream quality scores to each child agent's trace.

Run it

# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.streaming_agent --dry-run

# Live mode
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
python -m app.demos.streaming_agent

Source

dev/waxell-dev/app/demos/streaming_agent.py