Sync Pipeline Agent
A batch support ticket processing pipeline that runs classify, extract entities, route, and respond steps across 3 tickets. Each ticket produces its own trace with a sync-runner (classification + routing) and sync-evaluator (response generation + scoring) child agents.
Environment variables
This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls.
Architecture
Key Code
Ticket Classification Pipeline
The runner classifies each ticket, extracts entities, reasons about routing, and dispatches to the correct team.
@waxell.observe(agent_name="sync-runner", workflow_name="ticket-classification")
async def run_sync_runner(ticket: dict, openai_client, waxell_ctx=None) -> dict:
waxell.tag("ticket_id", ticket["ticket_id"])
waxell.tag("customer_tier", ticket["customer_tier"])
# Classify via LLM (auto-instrumented)
classify_response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "system", "content": "Classify: account_access, billing, ..."},
{"role": "user", "content": f"{ticket['subject']}\n{ticket['body']}"}],
)
category = _derive_category(ticket)
await classify_step(category)
# Reasoning chain for entity analysis
await analyze_entities(ticket, category)
# Route ticket via @decision
route_config = CATEGORIES.get(category, CATEGORIES["general"])
await decide_route_ticket(category, route_config)
team_availability_check(team=route_config["team"])
Batch Processing with Per-Ticket Traces
Each ticket creates its own parent trace with child agents.
for i, ticket in enumerate(TICKETS, 1):
result = await process_ticket(
ticket=ticket, dry_run=args.dry_run,
session_id=session, user_id=user_id,
enforce_policy=observe_active,
client=get_observe_client(),
)
results.append(result)
What this demonstrates
- Batch processing -- 3 tickets each produce independent traces with full parent-child lineage.
- 4-step pipeline -- classify, extract entities, route, respond -- each recorded as
@stepspans. @reasoningfor entity analysis -- chain-of-thought assessment of ticket content and priority.@decisionfor routing -- records which team receives the ticket with reasoning.@toolfor availability check -- verifies team capacity before routing.- Policy violation handling -- wraps pipeline in
try/except PolicyViolationError.
Run it
# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.sync_pipeline_agent --dry-run
# Live mode
export OPENAI_API_KEY="sk-..."
python -m app.demos.sync_pipeline_agent