Skip to main content

Sync Pipeline Agent

A batch support ticket processing pipeline that runs classify, extract entities, route, and respond steps across 3 tickets. Each ticket produces its own trace with a sync-runner (classification + routing) and sync-evaluator (response generation + scoring) child agents.

Environment variables

This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls.

Architecture

Key Code

Ticket Classification Pipeline

The runner classifies each ticket, extracts entities, reasons about routing, and dispatches to the correct team.

@waxell.observe(agent_name="sync-runner", workflow_name="ticket-classification")
async def run_sync_runner(ticket: dict, openai_client, waxell_ctx=None) -> dict:
waxell.tag("ticket_id", ticket["ticket_id"])
waxell.tag("customer_tier", ticket["customer_tier"])

# Classify via LLM (auto-instrumented)
classify_response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "system", "content": "Classify: account_access, billing, ..."},
{"role": "user", "content": f"{ticket['subject']}\n{ticket['body']}"}],
)
category = _derive_category(ticket)
await classify_step(category)

# Reasoning chain for entity analysis
await analyze_entities(ticket, category)

# Route ticket via @decision
route_config = CATEGORIES.get(category, CATEGORIES["general"])
await decide_route_ticket(category, route_config)
team_availability_check(team=route_config["team"])

Batch Processing with Per-Ticket Traces

Each ticket creates its own parent trace with child agents.

for i, ticket in enumerate(TICKETS, 1):
result = await process_ticket(
ticket=ticket, dry_run=args.dry_run,
session_id=session, user_id=user_id,
enforce_policy=observe_active,
client=get_observe_client(),
)
results.append(result)

What this demonstrates

  • Batch processing -- 3 tickets each produce independent traces with full parent-child lineage.
  • 4-step pipeline -- classify, extract entities, route, respond -- each recorded as @step spans.
  • @reasoning for entity analysis -- chain-of-thought assessment of ticket content and priority.
  • @decision for routing -- records which team receives the ticket with reasoning.
  • @tool for availability check -- verifies team capacity before routing.
  • Policy violation handling -- wraps pipeline in try/except PolicyViolationError.

Run it

# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.sync_pipeline_agent --dry-run

# Live mode
export OPENAI_API_KEY="sk-..."
python -m app.demos.sync_pipeline_agent

Source

dev/waxell-dev/app/demos/sync_pipeline_agent.py