Multi-Agent Integration
Trace complex multi-agent systems where a coordinator dispatches tasks to specialized sub-agents. Use shared session IDs to correlate all agents in a single trace.
The Pattern
- Generate a single
session_idfor the entire workflow - Coordinator uses
WaxellContextwith that session - Sub-agents use
@waxell_agentdecorator with the same session - All agents appear in the same trace, linked by session
Complete Example
A coordinator dispatching to planner, researcher, and executor agents:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...", api_url="https://waxell.dev")
from waxell_observe import WaxellContext, waxell_agent, generate_session_id
from waxell_observe.errors import PolicyViolationError
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def run_multi_agent_task(task: str, user_id: str = ""):
# Single session ID for all agents
session = generate_session_id()
# --- Sub-agent: Planner ---
@waxell_agent(
agent_name="planner",
workflow_name="plan-task",
session_id=session, # Same session!
user_id=user_id,
)
async def plan_task(task_description: str, waxell_ctx=None) -> dict:
"""Break a task into research queries."""
waxell_ctx.record_step("analyze_task", output={"task": task_description[:100]})
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Break this into 3 research queries."},
{"role": "user", "content": task_description},
],
)
content = response.choices[0].message.content
# Parse queries
queries = [line.strip() for line in content.splitlines() if line.strip()][:3]
waxell_ctx.record_step("generate_plan", output={"num_queries": len(queries)})
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="plan_task",
)
return {"queries": queries}
# --- Sub-agent: Researcher ---
@waxell_agent(
agent_name="researcher",
workflow_name="research-query",
session_id=session, # Same session!
user_id=user_id,
)
async def research_query(query: str, query_index: int = 0, waxell_ctx=None) -> str:
"""Research a single query."""
waxell_ctx.set_tag("query_index", str(query_index))
waxell_ctx.record_step("search", output={"query": query[:100]})
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Provide a concise finding."},
{"role": "user", "content": f"Research: {query}"},
],
)
finding = response.choices[0].message.content
waxell_ctx.record_step("compile_findings", output={"length": len(finding)})
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="research_query",
)
return finding
# --- Sub-agent: Executor ---
@waxell_agent(
agent_name="executor",
workflow_name="synthesize-findings",
session_id=session, # Same session!
user_id=user_id,
)
async def synthesize_findings(findings: list[str], original_task: str, waxell_ctx=None) -> str:
"""Synthesize findings into a final answer."""
waxell_ctx.set_metadata("num_findings", len(findings))
waxell_ctx.record_step("evaluate_findings", output={"count": len(findings)})
findings_text = "\n".join(f"- {f}" for f in findings)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Synthesize these findings."},
{"role": "user", "content": f"Task: {original_task}\n\nFindings:\n{findings_text}"},
],
)
answer = response.choices[0].message.content
waxell_ctx.record_step("produce_output", output={"length": len(answer)})
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="synthesize_findings",
)
return answer
# --- Coordinator ---
async with WaxellContext(
agent_name="coordinator",
workflow_name="multi-agent-task",
inputs={"task": task},
session_id=session, # Same session!
user_id=user_id,
) as ctx:
ctx.set_tag("demo", "multi-agent")
ctx.set_tag("num_agents", "3")
try:
# Phase 1: Planning
ctx.record_step("delegate_to_planner")
plan_result = await plan_task(task)
queries = plan_result["queries"]
# Phase 2: Research (could be parallel)
ctx.record_step("delegate_to_researchers")
findings = []
for i, query in enumerate(queries):
finding = await research_query(query, query_index=i)
findings.append(finding)
# Phase 3: Synthesis
ctx.record_step("delegate_to_executor")
final_answer = await synthesize_findings(findings, task)
ctx.set_result({
"answer": final_answer,
"num_queries": len(queries),
"num_findings": len(findings),
})
return final_answer
except PolicyViolationError as e:
print(f"Policy violation: {e}")
raise
# Run it
answer = await run_multi_agent_task(
"What are the key considerations for deploying AI agents in production?",
user_id="user_123"
)
Context Injection
The @waxell_agent decorator injects a waxell_ctx parameter if your function accepts it:
@waxell_agent(agent_name="my-agent")
async def my_function(input: str, waxell_ctx=None) -> str:
# waxell_ctx is automatically injected
if waxell_ctx:
waxell_ctx.set_tag("input_type", "text")
waxell_ctx.record_step("process", output={"length": len(input)})
return f"Processed: {input}"
The parameter:
- Must be named
waxell_ctx - Must have a default value (typically
None) - Will be a
WaxellContextinstance when the function runs
Parallel Research
For independent queries, run researchers in parallel:
import asyncio
# Phase 2: Parallel research
ctx.record_step("delegate_to_researchers")
# Create tasks for parallel execution
tasks = [
research_query(query, query_index=i)
for i, query in enumerate(queries)
]
# Run all in parallel
findings = await asyncio.gather(*tasks)
Session Correlation
All agents with the same session_id appear together in the UI:
Session: sess_a1b2c3d4e5f6
├── coordinator (multi-agent-task)
│ ├── Step: delegate_to_planner
│ ├── Step: delegate_to_researchers
│ └── Step: delegate_to_executor
├── planner (plan-task)
│ ├── Step: analyze_task
│ ├── Step: generate_plan
│ └── LLM: gpt-4o-mini (plan_task)
├── researcher (research-query) [query_index=0]
│ ├── Step: search
│ ├── Step: compile_findings
│ └── LLM: gpt-4o-mini (research_query)
├── researcher (research-query) [query_index=1]
│ └── ...
├── researcher (research-query) [query_index=2]
│ └── ...
└── executor (synthesize-findings)
├── Step: evaluate_findings
├── Step: produce_output
└── LLM: gpt-4o-mini (synthesize_findings)
Tagging Sub-Agents
Use tags to identify specific invocations:
@waxell_agent(agent_name="researcher", session_id=session)
async def research_query(query: str, query_index: int = 0, waxell_ctx=None) -> str:
# Tag with index for filtering
waxell_ctx.set_tag("query_index", str(query_index))
waxell_ctx.set_tag("query_hash", hash(query) % 10000)
# ...
Error Propagation
Errors in sub-agents bubble up to the coordinator:
async with WaxellContext(agent_name="coordinator", session_id=session) as ctx:
try:
# If researcher throws PolicyViolationError...
findings = await research_query(query)
except PolicyViolationError as e:
# ...coordinator catches it
ctx.set_tag("error", "policy_violation")
ctx.set_result({"error": str(e)})
raise
Metrics Aggregation
The session view shows aggregated metrics:
- Total LLM calls: Sum across all agents
- Total tokens: Sum of all input + output tokens
- Total cost: Sum of all LLM costs
- Duration: Wall-clock time for entire session
- Agent count: Number of distinct agents
Best Practices
- Single session_id -- Generate once, pass to all agents
- Descriptive agent names --
planner,researcher,executornotagent1,agent2 - Use tags for differentiation --
query_index,model_tier, etc. - Record delegation steps --
delegate_to_plannershows orchestration flow - Handle errors at coordinator -- Centralized error handling and logging
- Consider parallel execution -- Independent sub-tasks can run concurrently
Next Steps
- Decorator Pattern --
@waxell_agentdetails - Context Manager --
WaxellContextdetails - Sessions -- Session analytics and grouping