Skip to main content

RAG Pipeline Agent

A multi-agent RAG pipeline with governance integration. A parent orchestrator coordinates 2 child agents -- a retriever that searches a document store, ranks results, filters for relevance, and evaluates retrieval quality, and a synthesizer that generates answers with quality assessment and factual grounding scores. Supports --policy-triggers mode that intentionally crosses budget, safety, and latency policy thresholds.

Environment variables

This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls. Use --policy-triggers to exercise governance policies.

Architecture

Key Code

Document Retrieval with @tool and @retrieval

The retriever searches the document store and ranks results by relevance score.

@waxell.tool(tool_type="vector_db")
def search_documents(query: str) -> list:
"""Search the document store for relevant documents."""
return retrieve_documents(query)

@waxell.retrieval(source="rag")
def retrieve_and_rank(query: str, documents: list) -> list[dict]:
"""Rank retrieved documents by relevance."""
ranked = []
for i, doc in enumerate(documents):
score = round(0.95 - (i * 0.08), 2)
ranked.append({"id": doc["id"], "title": doc["title"],
"content": doc["content"], "score": score})
return ranked

Retrieval Evaluation and Answer Quality Assessment

@waxell.reasoning_dec(step="evaluate_retrieval")
async def evaluate_retrieval(documents: list, query: str) -> dict:
avg_score = sum(d.get("score", 0) for d in documents) / max(len(documents), 1)
return {
"thought": f"Retrieved {len(documents)} documents with avg score {avg_score:.2f}.",
"evidence": [f"Doc '{d['title']}': score={d.get('score', 'N/A')}" for d in documents],
"conclusion": "Good retrieval quality" if avg_score > 0.7 else "May need expanded search",
}

@waxell.reasoning_dec(step="quality_assessment")
async def assess_answer_quality(answer: str, documents: list) -> dict:
doc_titles = [d.get("title", "unknown") for d in documents]
coverage = len([t for t in doc_titles if t.lower() in answer.lower()])
return {
"thought": f"Generated answer references {coverage}/{len(documents)} source documents.",
"evidence": [f"Source: {t}" for t in doc_titles],
"conclusion": "Answer adequately covers source material",
}

waxell.score("answer_quality", 0.91, comment="Good synthesis from retrieved documents")
waxell.score("factual_grounding", True, data_type="boolean")

What this demonstrates

  • @waxell.observe -- parent-child agent hierarchy (orchestrator + 2 child agents) with automatic lineage via WaxellContext
  • @waxell.tool(tool_type="vector_db") -- document search recorded as vector DB tool span
  • @waxell.retrieval(source="rag") -- document ranking recorded with RAG as the source
  • @waxell.decision -- query classification via OpenAI (factual, analytical, creative) and output format
  • waxell.decide() -- retrieval strategy selection (semantic_search, keyword_search, hybrid_search)
  • @waxell.reasoning_dec -- retrieval quality evaluation and answer quality assessment
  • @waxell.step_dec -- query preprocessing and document filtering
  • waxell.score() -- answer quality (float) and factual grounding (boolean) scores
  • waxell.tag() / waxell.metadata() -- demo type, query type, document corpus size
  • Auto-instrumented LLM calls -- OpenAI calls in query analysis, filtering, and synthesis captured automatically
  • Policy trigger mode -- --policy-triggers flag exercises budget, safety, and latency policies
  • PolicyViolationError handling -- graceful agent halt on governance policy violation

Run it

# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.rag_agent --dry-run

# Live mode
export OPENAI_API_KEY="sk-..."
python -m app.demos.rag_agent

# With policy triggers (tests governance)
python -m app.demos.rag_agent --dry-run --policy-triggers

# Custom query
python -m app.demos.rag_agent --dry-run --query "How do I monitor AI agents?"

Source

dev/waxell-dev/app/demos/rag_agent.py