LlamaIndex Agent
A multi-agent RAG pipeline demonstrating waxell-observe decorator patterns across a parent orchestrator and two child agents. The pipeline preprocesses queries, embeds and retrieves documents, generates RAG answers, and evaluates/synthesizes a final response -- all with full observability.
This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to run without any API keys.
Architecture
Key Code
Orchestrator -- coordinating the RAG pipeline
The parent agent connects preprocessing, embedding, retrieval, and two child agents in a five-phase pipeline. Child agents auto-link to this parent via WaxellContext lineage.
@waxell.observe(agent_name="llamaindex-orchestrator", workflow_name="rag-pipeline")
async def run_pipeline(query: str, dry_run: bool = False, waxell_ctx=None):
waxell.tag("demo", "llamaindex")
waxell.tag("pipeline", "rag")
waxell.metadata("corpus_size", len(_RAG_DOCUMENTS))
openai_client = get_openai_client(dry_run=dry_run)
preprocessed = await preprocess_query(query) # @step
embed_documents(documents=_RAG_DOCUMENTS) # @tool(embedding)
retrieved = retrieve_rag_documents(query=query, # @retrieval(llamaindex)
corpus=_RAG_DOCUMENTS, top_k=3)
rag_result = await run_rag_generation( # child @observe
query=query, documents=_RAG_DOCUMENTS,
openai_client=openai_client, dry_run=dry_run)
eval_result = await run_evaluation( # child @observe
query=query, rag_answer=rag_result["answer"],
documents=_RAG_DOCUMENTS, openai_client=openai_client)
return {"answer": eval_result["answer"],
"synthesis_strategy": eval_result["strategy"]}
Decorator patterns -- retrieval, reasoning, and decisions
Each decorator auto-records a typed span in the trace without any manual instrumentation code.
@waxell.retrieval(source="llamaindex")
def retrieve_rag_documents(query: str, corpus: list, top_k: int = 3) -> list[dict]:
sorted_docs = sorted(corpus, key=lambda d: d.get("score", 0), reverse=True)[:top_k]
return [{"id": d["id"], "title": d["title"], "score": d["score"],
"snippet": d["content"][:80] + "..."} for d in sorted_docs]
@waxell.reasoning_dec(step="context_evaluation")
async def evaluate_context(query: str, documents: list) -> dict:
avg_score = sum(d.get("score", 0) for d in documents) / max(len(documents), 1)
return {"thought": f"Retrieved {len(documents)} docs with avg relevance {avg_score:.2f}.",
"conclusion": "Context is sufficient" if avg_score > 0.8 else "Needs augmentation"}
@waxell.decision(name="synthesis_strategy",
options=["extractive", "abstractive", "hybrid"])
async def choose_synthesis_strategy(query: str, documents: list) -> dict:
avg_score = sum(d.get("score", 0) for d in documents) / max(len(documents), 1)
if avg_score > 0.9:
return {"chosen": "extractive", "reasoning": "High relevance -- extractive preserves accuracy"}
return {"chosen": "hybrid", "reasoning": "Moderate relevance -- hybrid balances extraction with gap-filling"}
Scoring and metadata enrichment
The evaluator child agent attaches quality scores and metadata that appear in the Waxell dashboard.
@waxell.observe(agent_name="llamaindex-evaluator", workflow_name="answer-evaluation")
async def run_evaluation(query, rag_answer, documents, openai_client, waxell_ctx=None):
waxell.tag("agent_role", "evaluator")
strategy = await choose_synthesis_strategy(query=query, documents=documents)
waxell.decide("refinement_depth",
chosen="deep" if len(documents) > 2 else "light",
options=["light", "deep"],
reasoning=f"{len(documents)} docs -- deep refinement appropriate",
confidence=0.85)
response = await openai_client.chat.completions.create(...) # auto-instrumented
waxell.score("answer_quality", 0.88, comment="Based on document coverage and coherence")
waxell.score("context_relevance", 0.91, comment="Average retrieval score across documents")
waxell.metadata("synthesis_strategy", strategy.get("chosen"))
return {"answer": response.choices[0].message.content, "strategy": strategy.get("chosen")}
What this demonstrates
@waxell.observe-- parent orchestrator with two child agents, auto-linked viaWaxellContextlineage@waxell.step_dec-- records preprocessing as a workflow step span@waxell.tool(tool_type="embedding")-- records embedding operations as tool call spans@waxell.retrieval(source="llamaindex")-- records document retrieval with source attribution@waxell.reasoning_dec-- captures chain-of-thought evaluation (thought, evidence, conclusion)@waxell.decision-- records named decisions with options, chosen value, and reasoningwaxell.decide()-- manual inline decisions (refinement depth)waxell.score()-- attaches numeric quality scores (answer_quality, context_relevance)waxell.tag()/waxell.metadata()-- enriches spans with searchable tags and structured metadata- Auto-instrumented LLM calls -- two
gpt-4o-minicalls captured automatically viawaxell.init()
Run it
# Dry-run (no API keys needed)
cd dev/waxell-dev
python -m app.demos.llamaindex_agent --dry-run
# Live (requires OpenAI API key)
python -m app.demos.llamaindex_agent
# Custom query
python -m app.demos.llamaindex_agent --query "How do I optimize chunking?"