Multi-Agent Integration
Trace complex multi-agent systems where a coordinator dispatches tasks to specialized sub-agents. Use shared session IDs to correlate all agents in a single trace.
The Pattern
- Generate a single
session_idfor the entire workflow - Coordinator is an
@observe-decorated function that calls sub-agents - Sub-agents are
@waxell_agent(alias for@observe) decorated functions that share the same session - All agents appear in the same trace, linked by session
- LLM calls inside any agent are auto-captured by
init()-- no manualrecord_llm_call
Complete Example
A coordinator dispatching to planner, researcher, and executor agents:
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
# Import AFTER init() so OpenAI is auto-instrumented
from openai import AsyncOpenAI
from waxell_observe import waxell_agent, generate_session_id
from waxell_observe.errors import PolicyViolationError
client = AsyncOpenAI()
# --- Sub-agent: Planner ---
@waxell_agent(agent_name="planner", workflow_name="plan-task")
async def plan_task(task_description: str) -> dict:
"""Break a task into research queries."""
waxell.step("analyze_task", output={"task": task_description[:100]})
# LLM call auto-captured
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Break this into 3 research queries."},
{"role": "user", "content": task_description},
],
)
content = response.choices[0].message.content
queries = [line.strip() for line in content.splitlines() if line.strip()][:3]
waxell.step("generate_plan", output={"num_queries": len(queries)})
return {"queries": queries}
# --- Sub-agent: Researcher ---
@waxell_agent(agent_name="researcher", workflow_name="research-query")
async def research_query(query: str, query_index: int = 0) -> str:
"""Research a single query."""
waxell.tag("query_index", str(query_index))
waxell.step("search", output={"query": query[:100]})
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Provide a concise finding."},
{"role": "user", "content": f"Research: {query}"},
],
)
finding = response.choices[0].message.content
waxell.step("compile_findings", output={"length": len(finding)})
return finding
# --- Sub-agent: Executor ---
@waxell_agent(agent_name="executor", workflow_name="synthesize-findings")
async def synthesize_findings(findings: list[str], original_task: str) -> str:
"""Synthesize findings into a final answer."""
waxell.metadata("num_findings", len(findings))
waxell.step("evaluate_findings", output={"count": len(findings)})
findings_text = "\n".join(f"- {f}" for f in findings)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Synthesize these findings."},
{"role": "user", "content": f"Task: {original_task}\n\nFindings:\n{findings_text}"},
],
)
answer = response.choices[0].message.content
waxell.step("produce_output", output={"length": len(answer)})
return answer
# --- Coordinator ---
@waxell.observe(agent_name="coordinator", workflow_name="multi-agent-task")
async def run_multi_agent_task(task: str) -> str:
waxell.tag("demo", "multi-agent")
waxell.tag("num_agents", "3")
# Phase 1: Planning
waxell.step("delegate_to_planner")
plan_result = await plan_task(task)
queries = plan_result["queries"]
# Phase 2: Research (could be parallel; see below)
waxell.step("delegate_to_researchers")
findings = []
for i, query in enumerate(queries):
finding = await research_query(query, query_index=i)
findings.append(finding)
# Phase 3: Synthesis
waxell.step("delegate_to_executor")
final_answer = await synthesize_findings(findings, task)
return final_answer
# Run it -- one session_id flows to every sub-agent via call-time kwargs
async def main():
session = generate_session_id()
try:
answer = await run_multi_agent_task(
"What are the key considerations for deploying AI agents in production?",
session_id=session,
user_id="user_123",
)
print(answer)
except PolicyViolationError as e:
print(f"Policy violation: {e}")
Pass session_id once at the coordinator's call site. Sub-agent calls inherit the active session automatically when invoked inside the coordinator's @observe run. You can also pass session_id=... explicitly to any sub-agent call for full control.
Parallel Research
For independent queries, run researchers in parallel:
import asyncio
@waxell.observe(agent_name="coordinator")
async def run_parallel(task: str) -> str:
plan_result = await plan_task(task)
queries = plan_result["queries"]
waxell.step("delegate_to_researchers_parallel")
findings = await asyncio.gather(*[
research_query(query, query_index=i)
for i, query in enumerate(queries)
])
return await synthesize_findings(findings, task)
Session Correlation
All agents with the same session_id appear together in the UI:
Session: sess_a1b2c3d4e5f6
├── coordinator (multi-agent-task)
│ ├── Step: delegate_to_planner
│ ├── Step: delegate_to_researchers
│ └── Step: delegate_to_executor
├── planner (plan-task)
│ ├── Step: analyze_task
│ ├── Step: generate_plan
│ └── LLM: gpt-4o-mini (auto-captured)
├── researcher (research-query) [query_index=0]
│ ├── Step: search
│ ├── Step: compile_findings
│ └── LLM: gpt-4o-mini (auto-captured)
├── researcher (research-query) [query_index=1]
│ └── ...
├── researcher (research-query) [query_index=2]
│ └── ...
└── executor (synthesize-findings)
├── Step: evaluate_findings
├── Step: produce_output
└── LLM: gpt-4o-mini (auto-captured)
Tagging Sub-Agents
Use tags to identify specific invocations:
@waxell_agent(agent_name="researcher")
async def research_query(query: str, query_index: int = 0) -> str:
waxell.tag("query_index", str(query_index))
waxell.tag("query_hash", hash(query) % 10000)
# ...
Error Propagation
Errors in sub-agents bubble up to the coordinator. Each sub-agent's run is still recorded with status="error".
@waxell.observe(agent_name="coordinator")
async def coordinator(task: str) -> str:
try:
findings = await research_query("...")
except PolicyViolationError as e:
waxell.tag("error", "policy_violation")
raise
Metrics Aggregation
The session view shows aggregated metrics:
- Total LLM calls: sum across all agents
- Total tokens: sum of input + output tokens
- Total cost: sum of all LLM costs
- Duration: wall-clock time for the entire session
- Agent count: number of distinct agents
Parent-Child Run Lineage
Beyond session correlation, the SDK supports explicit parent-child relationships between runs. When a coordinator spawns a sub-agent on a different worker (or you need a true causality graph rather than session-grouping), pass the parent's run_id to create the hierarchy:
from waxell_observe import WaxellObserveClient, WaxellContext
client = WaxellObserveClient()
async with WaxellContext(agent_name="coordinator", session_id=session) as parent_ctx:
# Sub-agent on a different worker creates a child run linked to the parent
run_info = await client.start_run(
agent_name="researcher",
session_id=session,
parent_workflow_id=parent_ctx.run_id,
root_workflow_id=parent_ctx.run_id,
)
# ... sub-agent work ...
await client.complete_run(run_info.run_id, result={"findings": findings})
Parent-child lineage appears in the dashboard as a tree view, distinct from session-level grouping. This is one of the few places where dropping to WaxellContext is justified -- most multi-agent setups should use the @observe / @waxell_agent pattern above. For full causality graphs (spawn chains, signals, retries), see Lineage.
Best Practices
- Single session_id -- generate once, pass to the coordinator's call; sub-agents inherit it
- Descriptive agent names --
planner,researcher,executornotagent1,agent2 - Use tags for differentiation --
query_index,model_tier, etc. - Record delegation steps --
waxell.step("delegate_to_planner")shows orchestration flow - Handle errors at the coordinator -- centralized error handling and logging
- Run independent sub-tasks in parallel --
asyncio.gatherworks with decorated agents
Next Steps
- Decorator Pattern --
@observe/@waxell_agentdetails - Context Manager --
WaxellContextfor advanced lineage scenarios - Sessions -- Session analytics and grouping