Skip to main content

Full RAG Pipeline Agent

A 12-step end-to-end RAG pipeline stress test that chains 7 integration categories together: web scraping (Firecrawl), embeddings (Voyage AI), dual vector DB indexing (Pinecone + ChromaDB), dual retrieval, reranking (FlashRank), safety checks (Presidio + LLM Guard), dual LLM synthesis (OpenAI + Anthropic), and evaluation (DeepEval). Designed to exercise the maximum number of observability features in a single execution.

Environment variables

This example requires OPENAI_API_KEY, ANTHROPIC_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls. All external services are mocked.

Architecture

Key Code

Multi-Category Tool Operations

The pipeline chains 12 steps across 7 integration categories, each recorded with the appropriate tool type.

# Web scraping
@waxell.tool(tool_type="web_scraping")
def firecrawl_scrape(url: str) -> dict:
"""Scrape a URL using Firecrawl."""
return {"url": url, "content": scraped_content, "word_count": len(words)}

# Embedding
@waxell.tool(tool_type="embedding")
def voyage_embed(texts: list, model: str = "voyage-3") -> dict:
"""Embed texts using Voyage AI."""
return {"embeddings": [...], "model": model, "tokens": total_tokens}

# Dual vector DB indexing
@waxell.tool(tool_type="vector_db")
def pinecone_upsert(vectors: list, namespace: str) -> dict:
return {"upserted": len(vectors)}

@waxell.tool(tool_type="vector_db")
def chroma_add(documents: list, collection: str) -> dict:
return {"added": len(documents)}

# Reranking
@waxell.tool(tool_type="reranking")
def flashrank_rerank(query: str, documents: list) -> dict:
return {"reranked": len(documents), "top_score": 0.95}

# Safety checks
@waxell.tool(tool_type="safety")
def presidio_check(text: str) -> dict:
return {"pii_detected": False, "entities": []}

@waxell.tool(tool_type="safety")
def llm_guard_check(text: str) -> dict:
return {"safe": True, "flags": []}

Evaluation with DeepEval

@waxell.tool(tool_type="evaluation")
def deepeval_evaluate(query: str, answer: str, context: list) -> dict:
"""Evaluate response with DeepEval metrics."""
return {
"faithfulness": 0.92,
"relevancy": 0.88,
"hallucination": 0.05,
"overall": "pass",
}

What this demonstrates

  • @waxell.observe -- single agent with 12-step lifecycle tracing
  • @waxell.tool -- 7 tool types in one pipeline: web_scraping, embedding, vector_db, reranking, safety, evaluation
  • Dual vector DB -- Pinecone and ChromaDB indexed and queried in the same pipeline
  • Dual LLM -- OpenAI for answer generation, Anthropic for summary/critique
  • Safety pipeline -- Presidio PII detection on input, LLM Guard on output
  • Evaluation -- DeepEval faithfulness, relevancy, and hallucination metrics
  • waxell.score() -- multiple quality scores (faithfulness, relevancy, hallucination)
  • waxell.tag() / waxell.metadata() -- integration categories, providers, and pipeline metadata
  • Auto-instrumented LLM calls -- both OpenAI and Anthropic calls captured automatically
  • Stress test design -- exercises maximum observability features in a single execution

Run it

# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.full_rag_pipeline_agent --dry-run

# Live mode
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
python -m app.demos.full_rag_pipeline_agent

# Custom query
python -m app.demos.full_rag_pipeline_agent --dry-run --query "How do I deploy agents safely?"

Source

dev/waxell-dev/app/demos/full_rag_pipeline_agent.py