Haystack Agent
A Haystack-style RAG pipeline demonstrating waxell-observe decorator patterns across a parent orchestrator and two child agents. The pipeline models Haystack components (QueryEmbedder, DocumentRetriever, PromptBuilder, OpenAIGenerator) as individually traced steps, with retriever selection decisions and pipeline quality evaluation.
This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to run without any API keys.
Architecture
Key Code
Pipeline runner -- Haystack components as traced steps
Each Haystack component (QueryEmbedder, Retriever, PromptBuilder, Generator) is modeled as a separate traced operation using the appropriate decorator.
@waxell.observe(agent_name="haystack-runner", workflow_name="haystack-component-execution")
async def run_pipeline_components(query: str, client, waxell_ctx=None):
waxell.tag("agent_role", "runner")
waxell.tag("framework", "haystack")
waxell.metadata("components", ["QueryEmbedder", "DocumentRetriever",
"PromptBuilder", "OpenAIGenerator"])
embedder_result = await run_query_embedder(query=query) # @step
retriever_choice = await select_retriever(query=query, # @decision
pipeline_type="rag")
retrieval_result = retrieve_from_store(query=query, top_k=3) # @tool
ranked_docs = rank_documents(query=query, # @retrieval
docs=retrieval_result["docs"])
context_text = "\n".join(d["content"] for d in retrieval_result["docs"])
builder_result = await run_prompt_builder(query=query, # @step
context_text=context_text)
response = await client.chat.completions.create( # auto-instrumented
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"Answer based on this context:\n{context_text}"},
{"role": "user", "content": query},
])
return {"answer": response.choices[0].message.content,
"docs": retrieval_result["docs"],
"docs_count": len(retrieval_result["docs"])}
Component decorators -- step, tool, retrieval, and decision
Each Haystack component type maps to a waxell-observe decorator that auto-records typed spans.
@waxell.step_dec(name="component_query_embedder")
async def run_query_embedder(query: str) -> dict:
return {"component": "QueryEmbedder", "embedding_dim": 1536,
"model": "text-embedding-3-small"}
@waxell.tool(tool_type="document_store")
def retrieve_from_store(query: str, top_k: int = 3) -> dict:
docs = retrieve_documents(query, top_k=top_k)
return {"docs": docs, "docs_retrieved": len(docs),
"top_doc": docs[0]["title"] if docs else "none"}
@waxell.retrieval(source="haystack-document-store")
def rank_documents(query: str, docs: list) -> list[dict]:
return [{"id": d["id"], "text": d["content"][:100],
"score": 0.9 - i * 0.1, "title": d["title"]}
for i, d in enumerate(docs)]
@waxell.decision(name="select_retriever",
options=["InMemoryBM25", "InMemoryEmbedding", "ElasticsearchBM25"])
async def select_retriever(query: str, pipeline_type: str) -> dict:
chosen = "InMemoryEmbedding" if pipeline_type == "rag" else "InMemoryBM25"
return {"chosen": chosen,
"reasoning": f"Pipeline type '{pipeline_type}' best served by {chosen}"}
Pipeline evaluator -- reasoning and scores
The evaluator child agent uses @reasoning to assess pipeline output quality and attaches scores visible in the dashboard.
@waxell.observe(agent_name="haystack-evaluator", workflow_name="haystack-evaluation")
async def run_evaluator(query: str, answer: str, docs_count: int, waxell_ctx=None):
waxell.tag("agent_role", "evaluator")
waxell.tag("framework", "haystack")
evaluation = await evaluate_pipeline( # @reasoning
query=query, docs_retrieved=docs_count, answer=answer)
waxell.score("pipeline_quality", 0.87,
comment="Haystack RAG pipeline quality assessment")
waxell.score("retrieval_relevance", 0.91,
comment="Document retrieval relevance score")
waxell.score("answer_grounding", True, data_type="boolean",
comment="Answer is grounded in retrieved documents")
return {"evaluation": evaluation, "components_evaluated": 4}
What this demonstrates
@waxell.observe-- parent orchestrator with runner and evaluator child agents, auto-linked lineage@waxell.step_dec-- models Haystack components (QueryEmbedder, PromptBuilder, init) as traced workflow steps@waxell.tool(tool_type="document_store")-- records document store retrieval as a tool call span@waxell.retrieval(source="haystack-document-store")-- records document ranking with source attribution@waxell.decision-- records retriever selection with named options (BM25 vs Embedding vs Elasticsearch)@waxell.reasoning_dec-- captures pipeline quality evaluation (thought, evidence, conclusion)waxell.score()-- attaches typed scores: numeric (pipeline_quality, retrieval_relevance) and boolean (answer_grounding)waxell.tag()/waxell.metadata()-- enriches spans with framework identification and component lists- Auto-instrumented LLM call --
gpt-4o-minigeneration call captured automatically viawaxell.init()
Run it
# Dry-run (no API keys needed)
cd dev/waxell-dev
python -m app.demos.haystack_agent --dry-run
# Live (requires OpenAI API key)
python -m app.demos.haystack_agent
# Custom query
python -m app.demos.haystack_agent --query "Best practices for RAG pipelines?"