Skip to main content

Multi-Agent Integration

Trace complex multi-agent systems where a coordinator dispatches tasks to specialized sub-agents. Use shared session IDs to correlate all agents in a single trace.

The Pattern

  1. Generate a single session_id for the entire workflow
  2. Coordinator uses WaxellContext with that session
  3. Sub-agents use @waxell_agent decorator with the same session
  4. All agents appear in the same trace, linked by session

Complete Example

A coordinator dispatching to planner, researcher, and executor agents:

import waxell_observe
waxell_observe.init(api_key="wax_sk_...", api_url="https://waxell.dev")

from waxell_observe import WaxellContext, waxell_agent, generate_session_id
from waxell_observe.errors import PolicyViolationError
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def run_multi_agent_task(task: str, user_id: str = ""):
# Single session ID for all agents
session = generate_session_id()

# --- Sub-agent: Planner ---
@waxell_agent(
agent_name="planner",
workflow_name="plan-task",
session_id=session, # Same session!
user_id=user_id,
)
async def plan_task(task_description: str, waxell_ctx=None) -> dict:
"""Break a task into research queries."""
waxell_ctx.record_step("analyze_task", output={"task": task_description[:100]})

response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Break this into 3 research queries."},
{"role": "user", "content": task_description},
],
)
content = response.choices[0].message.content

# Parse queries
queries = [line.strip() for line in content.splitlines() if line.strip()][:3]

waxell_ctx.record_step("generate_plan", output={"num_queries": len(queries)})
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="plan_task",
)

return {"queries": queries}

# --- Sub-agent: Researcher ---
@waxell_agent(
agent_name="researcher",
workflow_name="research-query",
session_id=session, # Same session!
user_id=user_id,
)
async def research_query(query: str, query_index: int = 0, waxell_ctx=None) -> str:
"""Research a single query."""
waxell_ctx.set_tag("query_index", str(query_index))
waxell_ctx.record_step("search", output={"query": query[:100]})

response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Provide a concise finding."},
{"role": "user", "content": f"Research: {query}"},
],
)
finding = response.choices[0].message.content

waxell_ctx.record_step("compile_findings", output={"length": len(finding)})
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="research_query",
)

return finding

# --- Sub-agent: Executor ---
@waxell_agent(
agent_name="executor",
workflow_name="synthesize-findings",
session_id=session, # Same session!
user_id=user_id,
)
async def synthesize_findings(findings: list[str], original_task: str, waxell_ctx=None) -> str:
"""Synthesize findings into a final answer."""
waxell_ctx.set_metadata("num_findings", len(findings))
waxell_ctx.record_step("evaluate_findings", output={"count": len(findings)})

findings_text = "\n".join(f"- {f}" for f in findings)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Synthesize these findings."},
{"role": "user", "content": f"Task: {original_task}\n\nFindings:\n{findings_text}"},
],
)
answer = response.choices[0].message.content

waxell_ctx.record_step("produce_output", output={"length": len(answer)})
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="synthesize_findings",
)

return answer

# --- Coordinator ---
async with WaxellContext(
agent_name="coordinator",
workflow_name="multi-agent-task",
inputs={"task": task},
session_id=session, # Same session!
user_id=user_id,
) as ctx:
ctx.set_tag("demo", "multi-agent")
ctx.set_tag("num_agents", "3")

try:
# Phase 1: Planning
ctx.record_step("delegate_to_planner")
plan_result = await plan_task(task)
queries = plan_result["queries"]

# Phase 2: Research (could be parallel)
ctx.record_step("delegate_to_researchers")
findings = []
for i, query in enumerate(queries):
finding = await research_query(query, query_index=i)
findings.append(finding)

# Phase 3: Synthesis
ctx.record_step("delegate_to_executor")
final_answer = await synthesize_findings(findings, task)

ctx.set_result({
"answer": final_answer,
"num_queries": len(queries),
"num_findings": len(findings),
})

return final_answer

except PolicyViolationError as e:
print(f"Policy violation: {e}")
raise

# Run it
answer = await run_multi_agent_task(
"What are the key considerations for deploying AI agents in production?",
user_id="user_123"
)

Context Injection

The @waxell_agent decorator injects a waxell_ctx parameter if your function accepts it:

@waxell_agent(agent_name="my-agent")
async def my_function(input: str, waxell_ctx=None) -> str:
# waxell_ctx is automatically injected
if waxell_ctx:
waxell_ctx.set_tag("input_type", "text")
waxell_ctx.record_step("process", output={"length": len(input)})

return f"Processed: {input}"

The parameter:

  • Must be named waxell_ctx
  • Must have a default value (typically None)
  • Will be a WaxellContext instance when the function runs

Parallel Research

For independent queries, run researchers in parallel:

import asyncio

# Phase 2: Parallel research
ctx.record_step("delegate_to_researchers")

# Create tasks for parallel execution
tasks = [
research_query(query, query_index=i)
for i, query in enumerate(queries)
]

# Run all in parallel
findings = await asyncio.gather(*tasks)

Session Correlation

All agents with the same session_id appear together in the UI:

Session: sess_a1b2c3d4e5f6
├── coordinator (multi-agent-task)
│ ├── Step: delegate_to_planner
│ ├── Step: delegate_to_researchers
│ └── Step: delegate_to_executor
├── planner (plan-task)
│ ├── Step: analyze_task
│ ├── Step: generate_plan
│ └── LLM: gpt-4o-mini (plan_task)
├── researcher (research-query) [query_index=0]
│ ├── Step: search
│ ├── Step: compile_findings
│ └── LLM: gpt-4o-mini (research_query)
├── researcher (research-query) [query_index=1]
│ └── ...
├── researcher (research-query) [query_index=2]
│ └── ...
└── executor (synthesize-findings)
├── Step: evaluate_findings
├── Step: produce_output
└── LLM: gpt-4o-mini (synthesize_findings)

Tagging Sub-Agents

Use tags to identify specific invocations:

@waxell_agent(agent_name="researcher", session_id=session)
async def research_query(query: str, query_index: int = 0, waxell_ctx=None) -> str:
# Tag with index for filtering
waxell_ctx.set_tag("query_index", str(query_index))
waxell_ctx.set_tag("query_hash", hash(query) % 10000)
# ...

Error Propagation

Errors in sub-agents bubble up to the coordinator:

async with WaxellContext(agent_name="coordinator", session_id=session) as ctx:
try:
# If researcher throws PolicyViolationError...
findings = await research_query(query)
except PolicyViolationError as e:
# ...coordinator catches it
ctx.set_tag("error", "policy_violation")
ctx.set_result({"error": str(e)})
raise

Metrics Aggregation

The session view shows aggregated metrics:

  • Total LLM calls: Sum across all agents
  • Total tokens: Sum of all input + output tokens
  • Total cost: Sum of all LLM costs
  • Duration: Wall-clock time for entire session
  • Agent count: Number of distinct agents

Best Practices

  1. Single session_id -- Generate once, pass to all agents
  2. Descriptive agent names -- planner, researcher, executor not agent1, agent2
  3. Use tags for differentiation -- query_index, model_tier, etc.
  4. Record delegation steps -- delegate_to_planner shows orchestration flow
  5. Handle errors at coordinator -- Centralized error handling and logging
  6. Consider parallel execution -- Independent sub-tasks can run concurrently

Next Steps