RAG Pipeline
A complete retrieval-augmented generation pipeline that showcases every behavior tracking method in waxell-observe: retrieval, decisions, reasoning, tool calls, retries, scores, and tags.
Environment variables
This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL.
import waxell_observe as waxell
# CRITICAL: init() BEFORE importing LLM SDKs so auto-instrumentors can patch them
waxell.init()
from openai import OpenAI # patched automatically by waxell.init()
import asyncio
client = OpenAI()
# --- Decorated helpers (auto-record behavior on every call) ---
@waxell.decision(name="classify_query", options=["factual", "analytical"])
def classify_query(query: str) -> dict:
"""Auto-records the classification decision."""
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Classify as 'factual' or 'analytical': {query}"}],
)
category = resp.choices[0].message.content.strip().lower()
return {"chosen": category, "reasoning": f"Query classified as {category}"}
@waxell.retrieval(source="vector_store")
def search_documents(query: str) -> list[dict]:
"""Auto-records the retrieval with source and returned documents."""
return [ # Replace with your actual vector store
{"id": "doc1", "title": "AI Safety Overview", "score": 0.95},
{"id": "doc2", "title": "Alignment Research", "score": 0.87},
{"id": "doc3", "title": "Deployment Best Practices", "score": 0.82},
]
@waxell.tool(tool_type="api")
def web_search(query: str) -> dict:
"""Auto-records the tool call with timing."""
return {"result_count": 5, "query": query}
@waxell.reasoning_dec(step="quality_assessment")
def assess_quality(answer: str, docs: list) -> dict:
"""Auto-records the reasoning chain."""
return {
"thought": f"Answer references {len(docs)} sources",
"evidence": [d["title"] for d in docs],
"conclusion": "Answer adequately covers source material",
}
# --- Main pipeline (decorator creates a tracked run) ---
@waxell.observe(agent_name="rag-agent")
async def rag_pipeline(query: str):
waxell.tag("pipeline", "rag")
category = classify_query(query) # @decision auto-recorded
docs = search_documents(query) # @retrieval auto-recorded
web_search(f"{query} latest research") # @tool auto-recorded
# Manual routing decision via convenience function
waxell.decide("retrieval_strategy", chosen="semantic_search",
options=["semantic", "keyword", "hybrid"],
reasoning=f"Query is {category['chosen']}")
context = "\n".join(d["title"] for d in docs)
response = client.chat.completions.create( # auto-instrumented LLM call
model="gpt-4o",
messages=[
{"role": "system", "content": f"Answer using:\n{context}"},
{"role": "user", "content": query},
],
)
answer = response.choices[0].message.content
assess_quality(answer, docs) # @reasoning_dec auto-recorded
waxell.score("answer_quality", 0.92)
waxell.metadata("sources", [d["title"] for d in docs])
return answer
asyncio.run(rag_pipeline("What are the latest developments in AI safety?"))
What this demonstrates
@waxell.decision-- auto-records the classification decision with options, chosen value, and reasoning.@waxell.retrieval-- auto-records document retrieval with the source and returned documents.@waxell.tool-- auto-records tool calls (web search) with inputs, outputs, and timing.@waxell.reasoning_dec-- auto-records a reasoning chain with thought, evidence, and conclusion.waxell.decide()-- records a manual routing decision inline (no decorator needed).waxell.score(),waxell.tag(),waxell.metadata()-- top-level convenience functions that attach data to the current run without needing a context reference.waxell.init()-- auto-instruments all OpenAI calls so LLM usage is captured with zero code changes.
Run it
export OPENAI_API_KEY="sk-..."
export WAXELL_API_KEY="your-waxell-api-key"
export WAXELL_API_URL="https://api.waxell.ai"
python rag_pipeline.py