Skip to main content

RAG Pipeline

A complete retrieval-augmented generation pipeline that showcases every behavior tracking method in waxell-observe: retrieval, decisions, reasoning, tool calls, retries, scores, and tags.

Environment variables

This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL.

import waxell_observe as waxell

# CRITICAL: init() BEFORE importing LLM SDKs so auto-instrumentors can patch them
waxell.init()

from openai import OpenAI # patched automatically by waxell.init()
import asyncio

client = OpenAI()


# --- Decorated helpers (auto-record behavior on every call) ---

@waxell.decision(name="classify_query", options=["factual", "analytical"])
def classify_query(query: str) -> dict:
"""Auto-records the classification decision."""
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Classify as 'factual' or 'analytical': {query}"}],
)
category = resp.choices[0].message.content.strip().lower()
return {"chosen": category, "reasoning": f"Query classified as {category}"}


@waxell.retrieval(source="vector_store")
def search_documents(query: str) -> list[dict]:
"""Auto-records the retrieval with source and returned documents."""
return [ # Replace with your actual vector store
{"id": "doc1", "title": "AI Safety Overview", "score": 0.95},
{"id": "doc2", "title": "Alignment Research", "score": 0.87},
{"id": "doc3", "title": "Deployment Best Practices", "score": 0.82},
]


@waxell.tool(tool_type="api")
def web_search(query: str) -> dict:
"""Auto-records the tool call with timing."""
return {"result_count": 5, "query": query}


@waxell.reasoning_dec(step="quality_assessment")
def assess_quality(answer: str, docs: list) -> dict:
"""Auto-records the reasoning chain."""
return {
"thought": f"Answer references {len(docs)} sources",
"evidence": [d["title"] for d in docs],
"conclusion": "Answer adequately covers source material",
}


# --- Main pipeline (decorator creates a tracked run) ---

@waxell.observe(agent_name="rag-agent")
async def rag_pipeline(query: str):
waxell.tag("pipeline", "rag")

category = classify_query(query) # @decision auto-recorded
docs = search_documents(query) # @retrieval auto-recorded
web_search(f"{query} latest research") # @tool auto-recorded

# Manual routing decision via convenience function
waxell.decide("retrieval_strategy", chosen="semantic_search",
options=["semantic", "keyword", "hybrid"],
reasoning=f"Query is {category['chosen']}")

context = "\n".join(d["title"] for d in docs)
response = client.chat.completions.create( # auto-instrumented LLM call
model="gpt-4o",
messages=[
{"role": "system", "content": f"Answer using:\n{context}"},
{"role": "user", "content": query},
],
)
answer = response.choices[0].message.content

assess_quality(answer, docs) # @reasoning_dec auto-recorded
waxell.score("answer_quality", 0.92)
waxell.metadata("sources", [d["title"] for d in docs])
return answer


asyncio.run(rag_pipeline("What are the latest developments in AI safety?"))

What this demonstrates

  • @waxell.decision -- auto-records the classification decision with options, chosen value, and reasoning.
  • @waxell.retrieval -- auto-records document retrieval with the source and returned documents.
  • @waxell.tool -- auto-records tool calls (web search) with inputs, outputs, and timing.
  • @waxell.reasoning_dec -- auto-records a reasoning chain with thought, evidence, and conclusion.
  • waxell.decide() -- records a manual routing decision inline (no decorator needed).
  • waxell.score(), waxell.tag(), waxell.metadata() -- top-level convenience functions that attach data to the current run without needing a context reference.
  • waxell.init() -- auto-instruments all OpenAI calls so LLM usage is captured with zero code changes.

Run it

export OPENAI_API_KEY="sk-..."
export WAXELL_API_KEY="your-waxell-api-key"
export WAXELL_API_URL="https://api.waxell.ai"

python rag_pipeline.py