Skip to main content

Multi-Agent Integration

Trace complex multi-agent systems where a coordinator dispatches tasks to specialized sub-agents. Use shared session IDs to correlate all agents in a single trace.

The Pattern

  1. Generate a single session_id for the entire workflow
  2. Coordinator is an @observe-decorated function that calls sub-agents
  3. Sub-agents are @waxell_agent (alias for @observe) decorated functions that share the same session
  4. All agents appear in the same trace, linked by session
  5. LLM calls inside any agent are auto-captured by init() -- no manual record_llm_call

Complete Example

A coordinator dispatching to planner, researcher, and executor agents:

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")

# Import AFTER init() so OpenAI is auto-instrumented
from openai import AsyncOpenAI
from waxell_observe import waxell_agent, generate_session_id
from waxell_observe.errors import PolicyViolationError

client = AsyncOpenAI()


# --- Sub-agent: Planner ---
@waxell_agent(agent_name="planner", workflow_name="plan-task")
async def plan_task(task_description: str) -> dict:
"""Break a task into research queries."""
waxell.step("analyze_task", output={"task": task_description[:100]})

# LLM call auto-captured
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Break this into 3 research queries."},
{"role": "user", "content": task_description},
],
)
content = response.choices[0].message.content
queries = [line.strip() for line in content.splitlines() if line.strip()][:3]

waxell.step("generate_plan", output={"num_queries": len(queries)})
return {"queries": queries}


# --- Sub-agent: Researcher ---
@waxell_agent(agent_name="researcher", workflow_name="research-query")
async def research_query(query: str, query_index: int = 0) -> str:
"""Research a single query."""
waxell.tag("query_index", str(query_index))
waxell.step("search", output={"query": query[:100]})

response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Provide a concise finding."},
{"role": "user", "content": f"Research: {query}"},
],
)
finding = response.choices[0].message.content
waxell.step("compile_findings", output={"length": len(finding)})
return finding


# --- Sub-agent: Executor ---
@waxell_agent(agent_name="executor", workflow_name="synthesize-findings")
async def synthesize_findings(findings: list[str], original_task: str) -> str:
"""Synthesize findings into a final answer."""
waxell.metadata("num_findings", len(findings))
waxell.step("evaluate_findings", output={"count": len(findings)})

findings_text = "\n".join(f"- {f}" for f in findings)
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Synthesize these findings."},
{"role": "user", "content": f"Task: {original_task}\n\nFindings:\n{findings_text}"},
],
)
answer = response.choices[0].message.content
waxell.step("produce_output", output={"length": len(answer)})
return answer


# --- Coordinator ---
@waxell.observe(agent_name="coordinator", workflow_name="multi-agent-task")
async def run_multi_agent_task(task: str) -> str:
waxell.tag("demo", "multi-agent")
waxell.tag("num_agents", "3")

# Phase 1: Planning
waxell.step("delegate_to_planner")
plan_result = await plan_task(task)
queries = plan_result["queries"]

# Phase 2: Research (could be parallel; see below)
waxell.step("delegate_to_researchers")
findings = []
for i, query in enumerate(queries):
finding = await research_query(query, query_index=i)
findings.append(finding)

# Phase 3: Synthesis
waxell.step("delegate_to_executor")
final_answer = await synthesize_findings(findings, task)

return final_answer


# Run it -- one session_id flows to every sub-agent via call-time kwargs
async def main():
session = generate_session_id()
try:
answer = await run_multi_agent_task(
"What are the key considerations for deploying AI agents in production?",
session_id=session,
user_id="user_123",
)
print(answer)
except PolicyViolationError as e:
print(f"Policy violation: {e}")
Session propagation

Pass session_id once at the coordinator's call site. Sub-agent calls inherit the active session automatically when invoked inside the coordinator's @observe run. You can also pass session_id=... explicitly to any sub-agent call for full control.

Parallel Research

For independent queries, run researchers in parallel:

import asyncio

@waxell.observe(agent_name="coordinator")
async def run_parallel(task: str) -> str:
plan_result = await plan_task(task)
queries = plan_result["queries"]

waxell.step("delegate_to_researchers_parallel")
findings = await asyncio.gather(*[
research_query(query, query_index=i)
for i, query in enumerate(queries)
])

return await synthesize_findings(findings, task)

Session Correlation

All agents with the same session_id appear together in the UI:

Session: sess_a1b2c3d4e5f6
├── coordinator (multi-agent-task)
│ ├── Step: delegate_to_planner
│ ├── Step: delegate_to_researchers
│ └── Step: delegate_to_executor
├── planner (plan-task)
│ ├── Step: analyze_task
│ ├── Step: generate_plan
│ └── LLM: gpt-4o-mini (auto-captured)
├── researcher (research-query) [query_index=0]
│ ├── Step: search
│ ├── Step: compile_findings
│ └── LLM: gpt-4o-mini (auto-captured)
├── researcher (research-query) [query_index=1]
│ └── ...
├── researcher (research-query) [query_index=2]
│ └── ...
└── executor (synthesize-findings)
├── Step: evaluate_findings
├── Step: produce_output
└── LLM: gpt-4o-mini (auto-captured)

Tagging Sub-Agents

Use tags to identify specific invocations:

@waxell_agent(agent_name="researcher")
async def research_query(query: str, query_index: int = 0) -> str:
waxell.tag("query_index", str(query_index))
waxell.tag("query_hash", hash(query) % 10000)
# ...

Error Propagation

Errors in sub-agents bubble up to the coordinator. Each sub-agent's run is still recorded with status="error".

@waxell.observe(agent_name="coordinator")
async def coordinator(task: str) -> str:
try:
findings = await research_query("...")
except PolicyViolationError as e:
waxell.tag("error", "policy_violation")
raise

Metrics Aggregation

The session view shows aggregated metrics:

  • Total LLM calls: sum across all agents
  • Total tokens: sum of input + output tokens
  • Total cost: sum of all LLM costs
  • Duration: wall-clock time for the entire session
  • Agent count: number of distinct agents

Parent-Child Run Lineage

Beyond session correlation, the SDK supports explicit parent-child relationships between runs. When a coordinator spawns a sub-agent on a different worker (or you need a true causality graph rather than session-grouping), pass the parent's run_id to create the hierarchy:

from waxell_observe import WaxellObserveClient, WaxellContext

client = WaxellObserveClient()

async with WaxellContext(agent_name="coordinator", session_id=session) as parent_ctx:
# Sub-agent on a different worker creates a child run linked to the parent
run_info = await client.start_run(
agent_name="researcher",
session_id=session,
parent_workflow_id=parent_ctx.run_id,
root_workflow_id=parent_ctx.run_id,
)
# ... sub-agent work ...
await client.complete_run(run_info.run_id, result={"findings": findings})

Parent-child lineage appears in the dashboard as a tree view, distinct from session-level grouping. This is one of the few places where dropping to WaxellContext is justified -- most multi-agent setups should use the @observe / @waxell_agent pattern above. For full causality graphs (spawn chains, signals, retries), see Lineage.

Best Practices

  1. Single session_id -- generate once, pass to the coordinator's call; sub-agents inherit it
  2. Descriptive agent names -- planner, researcher, executor not agent1, agent2
  3. Use tags for differentiation -- query_index, model_tier, etc.
  4. Record delegation steps -- waxell.step("delegate_to_planner") shows orchestration flow
  5. Handle errors at the coordinator -- centralized error handling and logging
  6. Run independent sub-tasks in parallel -- asyncio.gather works with decorated agents

Next Steps