Multi-Agent Coordination
A planner-researcher-synthesizer pipeline where each agent is independently traced via @observe, producing separate tracked runs that can be correlated in the dashboard.
Environment variables
This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL.
import waxell_observe as waxell
# CRITICAL: init() BEFORE importing LLM SDKs so auto-instrumentors can patch them
waxell.init()
from openai import OpenAI # patched automatically by waxell.init()
import asyncio
client = OpenAI()
# --- Agent 1: Planner ---
@waxell.observe(agent_name="planner")
async def plan(task: str) -> list[str]:
waxell.tag("agent_role", "planner")
response = client.chat.completions.create( # auto-instrumented
model="gpt-4o",
messages=[{"role": "user", "content": f"Break into 3 research queries: {task}"}],
)
queries = response.choices[0].message.content.strip().split("\n")[:3]
waxell.decide("task_decomposition", chosen="3-query split",
options=["single-query", "3-query split", "parallel-search"],
reasoning=f"Decomposed into {len(queries)} sub-queries")
return queries
# --- Agent 2: Researcher ---
@waxell.retrieval(source="web")
def search_web(query: str) -> list[dict]:
"""Auto-records retrieval with source and results."""
return [{"title": f"Result for: {query}", "snippet": "..."}]
@waxell.observe(agent_name="researcher")
async def research(query: str) -> str:
waxell.tag("agent_role", "researcher")
search_web(query) # @retrieval auto-recorded
response = client.chat.completions.create( # auto-instrumented
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Research briefly: {query}"}],
)
return response.choices[0].message.content
# --- Agent 3: Synthesizer ---
@waxell.observe(agent_name="synthesizer")
async def synthesize(findings: list[str]) -> str:
waxell.tag("agent_role", "synthesizer")
combined = "\n---\n".join(findings)
response = client.chat.completions.create( # auto-instrumented
model="gpt-4o",
messages=[{"role": "user", "content": f"Synthesize:\n{combined}"}],
)
summary = response.choices[0].message.content
waxell.score("synthesis_quality", 0.9)
return summary
# --- Orchestrator (parent run — child agents auto-link via WaxellContext lineage) ---
@waxell.observe(agent_name="orchestrator", workflow_name="research-pipeline")
async def coordinate(task: str):
waxell.tag("pipeline", "multi-agent")
# Each nested @observe call creates a child run linked to this parent automatically
queries = await plan(task)
findings = [await research(q) for q in queries]
return await synthesize(findings)
asyncio.run(coordinate("What are the key trends in AI for 2025?"))
What this demonstrates
- Nested
@waxell.observe-- the orchestrator's run is the parent;plan,research, andsynthesizeeach create child runs with automatic parent-child lineage viaWaxellContext. - Per-agent tags and scores -- each agent sets its own
waxell.tag()andwaxell.score()values, scoped to its own tracked run. - Top-level convenience functions --
waxell.tag(),waxell.decide(), andwaxell.score()work within each agent's context without needing actxreference. @waxell.retrievalhelper -- the researcher's web search is auto-recorded as a retrieval span attributed to the researcher's run.- Auto-instrumented LLM calls -- every
client.chat.completions.create()call is captured and attributed to the correct agent's run viawaxell.init().
Run it
export OPENAI_API_KEY="sk-..."
export WAXELL_API_KEY="your-waxell-api-key"
export WAXELL_API_URL="https://api.waxell.ai"
python multi_agent.py