Track a RAG Pipeline
Build and instrument a retrieval-augmented generation (RAG) pipeline with full observability using the decorator pattern -- including retrieval steps, LLM calls, sessions, user tracking, and quality scoring.
Prerequisites
- Python 3.10+
- A Waxell API key (get one from your control plane dashboard)
- A vector database (this tutorial uses ChromaDB, but any will work)
What You'll Learn
- Wrap retrieval and generation functions with
@waxell.retrievaland@waxell.observe - Let
waxell.init()auto-instrument the LLM calls - Group related interactions with sessions and user tracking
- Add metadata, tags, and quality scores inline
- View traces in the Waxell dashboard
Step 1: Install Dependencies
pip install waxell-observe openai chromadb
Set your credentials:
export WAXELL_API_URL="https://acme.waxell.dev"
export WAXELL_API_KEY="wax_sk_..."
export OPENAI_API_KEY="sk-..."
Step 2: Initialize Waxell
Call init() before importing OpenAI -- this enables auto-instrumentation, so every OpenAI call is captured automatically with model, tokens, cost, and latency.
import waxell_observe as waxell
waxell.init() # reads WAXELL_API_KEY + WAXELL_API_URL from env
# Import OpenAI AFTER init() so it's auto-instrumented
import openai
client = openai.OpenAI()
Step 3: Set Up a Vector Store
Create a ChromaDB collection and populate it. ChromaDB is also auto-instrumented -- queries appear automatically in the trace.
import chromadb
chroma = chromadb.Client()
collection = chroma.create_collection("knowledge_base")
collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell.observe decorator captures inputs, outputs, and LLM calls automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)
Step 4: Decorate Retrieval and Generation
Wrap your retrieval function with @waxell.retrieval to capture the query, documents, and scores as a structured span:
@waxell.retrieval(source="chromadb")
def retrieve_documents(query: str, n_results: int = 3) -> list[dict]:
"""Returns documents with scores so the decorator can record them."""
results = collection.query(query_texts=[query], n_results=n_results)
docs = results["documents"][0]
distances = results["distances"][0]
# Convert distances to similarity scores (higher is better)
return [
{"id": f"doc{i}", "text": doc, "score": 1.0 - dist}
for i, (doc, dist) in enumerate(zip(docs, distances))
]
The generation step doesn't need a decorator -- waxell.init() auto-instruments OpenAI:
def generate_answer(query: str, docs: list[dict]) -> str:
context = "\n".join(f"- {d['text']}" for d in docs)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
],
)
return response.choices[0].message.content
Step 5: Wrap the Pipeline with @observe
The @observe decorator creates a tracked execution run. Pass session_id and user_id to group conversations and attribute usage:
@waxell.observe(agent_name="rag-pipeline")
async def rag_query(query: str) -> str:
docs = retrieve_documents(query) # auto-recorded by @retrieval
answer = generate_answer(query, docs) # auto-recorded by init()
# Inline enrichment
waxell.tag("pipeline", "rag-v1")
waxell.tag("environment", "production")
waxell.metadata("retrieval_count", len(docs))
waxell.score("relevance", 0.85)
waxell.score("grounded", True, data_type="boolean")
return answer
That's the entire instrumented pipeline. No WaxellContext, no ctx.record_llm_call, no ctx.record_step -- decorators and auto-instrumentation handle everything.
Step 6: Call with Session and User
Pass session_id and user_id at call time. The @observe decorator intercepts these kwargs and applies them to the run:
import asyncio
from waxell_observe import generate_session_id
async def main():
session = generate_session_id()
answer = await rag_query(
"How does Waxell handle observability?",
session_id=session,
user_id="user_alice",
)
print(f"Answer: {answer}")
# Follow-up in the same session
answer2 = await rag_query(
"What about governance?",
session_id=session,
user_id="user_alice",
)
print(f"Follow-up: {answer2}")
asyncio.run(main())
Step 7: View Results in the Dashboard
After running your pipeline, open your Waxell dashboard:
- Observability > Overview -- Find your
rag-pipelineagent run with its query, generated answer, token counts, and cost estimate. - Observability > Sessions -- Click your session ID to see the full conversation timeline with both queries grouped.
- Run Detail -- See the retrieval span (with documents and scores), the auto-captured LLM call (model, tokens, cost), and the inline scores/tags/metadata.
Full Working Example
import asyncio
import waxell_observe as waxell
waxell.init() # before OpenAI import
import openai
import chromadb
from waxell_observe import generate_session_id
# ---- Setup ----
client = openai.OpenAI()
chroma = chromadb.Client()
collection = chroma.create_collection("knowledge_base")
collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell.observe decorator captures inputs and outputs automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)
# ---- Instrumented pipeline ----
@waxell.retrieval(source="chromadb")
def retrieve_documents(query: str, n_results: int = 3) -> list[dict]:
results = collection.query(query_texts=[query], n_results=n_results)
docs = results["documents"][0]
distances = results["distances"][0]
return [
{"id": f"doc{i}", "text": doc, "score": 1.0 - dist}
for i, (doc, dist) in enumerate(zip(docs, distances))
]
def generate_answer(query: str, docs: list[dict]) -> str:
context = "\n".join(f"- {d['text']}" for d in docs)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}],
)
return response.choices[0].message.content
@waxell.observe(agent_name="rag-pipeline")
async def rag_query(query: str) -> str:
docs = retrieve_documents(query)
answer = generate_answer(query, docs)
waxell.tag("pipeline", "rag-v1")
waxell.metadata("retrieval_count", len(docs))
waxell.score("relevance", 0.85)
return answer
# ---- Run it ----
async def main():
session = generate_session_id()
answer = await rag_query(
"How does Waxell handle observability?",
session_id=session,
user_id="user_alice",
)
print(f"Answer: {answer}")
answer2 = await rag_query(
"What about governance?",
session_id=session,
user_id="user_alice",
)
print(f"Follow-up: {answer2}")
if __name__ == "__main__":
asyncio.run(main())
Why Decorators, Not WaxellContext?
This tutorial uses the decorator pattern because:
waxell.init()auto-instruments OpenAI and ChromaDB -- no manualrecord_llm_callorrecord_stepneeded@waxell.retrievalcaptures query, documents, and scores from the function's return value@waxell.observecreates the run with session/user attributionwaxell.score()/tag()/metadata()convenience functions add inline enrichment
WaxellContext is reserved for advanced cases: batch loops with many runs, multi-step orchestration spanning multiple agent functions, or mid-execution policy checks. See the Context Manager page for those scenarios.
Next Steps
- Decorator Pattern -- Full reference for
@observeand behavior decorators - Session Analytics -- Analyze conversation patterns across sessions
- Cost Optimization -- Reduce LLM spending with observability data
- Cost Management -- Track and control LLM spending