Track a RAG Pipeline
Build and instrument a retrieval-augmented generation (RAG) pipeline with full observability -- including retrieval steps, LLM calls, sessions, user tracking, and quality scoring.
Prerequisites
- Python 3.10+
- A Waxell API key (get one from your control plane dashboard)
- A vector database (this tutorial uses ChromaDB, but any will work)
What You'll Learn
- Instrument retrieval and generation steps in a RAG pipeline
- Use sessions and user tracking to group related interactions
- Add metadata and tags for filtering and search
- Record quality scores for evaluation
- View traces in the Waxell dashboard
Step 1: Install Dependencies
pip install waxell-observe openai chromadb
Set your credentials:
export WAXELL_API_URL="https://acme.waxell.dev"
export WAXELL_API_KEY="wax_sk_..."
export OPENAI_API_KEY="sk-..."
Step 2: Set Up a Simple Vector Store
Create a ChromaDB collection and populate it with sample documents:
import chromadb
chroma_client = chromadb.Client()
collection = chroma_client.create_collection("knowledge_base")
# Add some documents
collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell_agent decorator captures inputs, outputs, and LLM calls automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)
Step 3: Create the RAG Function
Write a function that retrieves relevant documents and generates an answer:
import openai
client = openai.OpenAI()
def retrieve_documents(query: str, n_results: int = 3) -> list[str]:
"""Retrieve relevant documents from the vector store."""
results = collection.query(query_texts=[query], n_results=n_results)
return results["documents"][0]
def generate_answer(query: str, context_docs: list[str]) -> dict:
"""Generate an answer using retrieved context."""
context = "\n".join(f"- {doc}" for doc in context_docs)
prompt = f"""Answer the question using only the provided context.
Context:
{context}
Question: {query}
Answer:"""
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
message = response.choices[0].message.content
usage = response.usage
return {
"answer": message,
"prompt": prompt,
"tokens_in": usage.prompt_tokens,
"tokens_out": usage.completion_tokens,
}
Step 4: Wrap with WaxellContext
Use WaxellContext to track the entire RAG pipeline as a single observed run. Set session_id and user_id to group and attribute interactions:
from waxell_observe import WaxellContext
async def rag_query(query: str, session_id: str, user_id: str) -> str:
async with WaxellContext(
agent_name="rag-pipeline",
session_id=session_id,
user_id=user_id,
inputs={"query": query},
) as ctx:
# Step 1: Retrieve
docs = retrieve_documents(query)
# Step 2: Record the retrieval step
ctx.record_step(
step_name="retrieval",
output={"documents": docs, "count": len(docs)},
)
# Step 3: Generate answer
result = generate_answer(query, docs)
# Step 4: Record the LLM call
ctx.record_llm_call(
model="gpt-4o",
tokens_in=result["tokens_in"],
tokens_out=result["tokens_out"],
task="answer_generation",
prompt_preview=result["prompt"][:200],
response_preview=result["answer"][:200],
)
# Step 5: Set result and return
ctx.set_result({"answer": result["answer"]})
return result["answer"]
The session_id groups related queries from the same conversation. The user_id attributes runs to a specific user for analytics.
Step 5: Add Metadata and Tags
Tags are searchable string key-value pairs. Metadata supports any JSON-serializable value. Both are useful for filtering runs in the dashboard:
# Inside the WaxellContext block, after retrieval:
# Tags: searchable string values
ctx.set_tag("pipeline", "rag-v1")
ctx.set_tag("environment", "production")
# Metadata: any JSON-serializable value
ctx.set_metadata("retrieval_count", len(docs))
ctx.set_metadata("model_config", {
"model": "gpt-4o",
"temperature": 0.7,
"max_tokens": 1024,
})
Tags appear as OTel span attributes prefixed with waxell.tag. and are queryable in Grafana TraceQL: { span.waxell.tag.pipeline = "rag-v1" }.
Step 6: Add Quality Scoring
Record a relevance score to evaluate answer quality over time:
# Inside the WaxellContext block, after generating the answer:
ctx.record_score(
name="relevance",
value=0.85,
data_type="numeric",
comment="Automated relevance assessment",
)
You can record multiple scores per run:
ctx.record_score(name="relevance", value=0.85, data_type="numeric")
ctx.record_score(name="grounded", value=True, data_type="boolean")
ctx.record_score(name="tone", value="professional", data_type="categorical")
Step 7: View Results in the Dashboard
After running your pipeline, open your Waxell dashboard:
- Observability > Overview -- Find your
rag-pipelineagent run with its input query, generated answer, token counts, and cost estimate. - Observability > Sessions -- Click your session ID to see the full conversation timeline with all runs grouped together.
- Run Detail > Scores -- View the
relevancescores attached to your runs.
Full Working Example
Here is the complete, runnable pipeline:
import asyncio
import chromadb
import openai
from waxell_observe import WaxellContext, WaxellObserveClient, generate_session_id
# ---- Configuration ----
WaxellObserveClient.configure(
api_url="https://acme.waxell.dev",
api_key="wax_sk_...",
)
openai_client = openai.OpenAI()
# ---- Vector Store ----
chroma = chromadb.Client()
collection = chroma.create_collection("knowledge_base")
collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell_agent decorator captures inputs and outputs automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)
def retrieve_documents(query: str, n_results: int = 3) -> list[str]:
results = collection.query(query_texts=[query], n_results=n_results)
return results["documents"][0]
def generate_answer(query: str, context_docs: list[str]) -> dict:
context = "\n".join(f"- {doc}" for doc in context_docs)
prompt = f"Answer using this context:\n{context}\n\nQuestion: {query}\nAnswer:"
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
msg = response.choices[0].message.content
usage = response.usage
return {
"answer": msg,
"prompt": prompt,
"tokens_in": usage.prompt_tokens,
"tokens_out": usage.completion_tokens,
}
async def rag_query(query: str, session_id: str, user_id: str) -> str:
async with WaxellContext(
agent_name="rag-pipeline",
session_id=session_id,
user_id=user_id,
inputs={"query": query},
) as ctx:
# Retrieve
docs = retrieve_documents(query)
ctx.record_step(step_name="retrieval", output={"documents": docs})
# Tag and metadata
ctx.set_tag("pipeline", "rag-v1")
ctx.set_metadata("retrieval_count", len(docs))
# Generate
result = generate_answer(query, docs)
ctx.record_llm_call(
model="gpt-4o",
tokens_in=result["tokens_in"],
tokens_out=result["tokens_out"],
task="answer_generation",
prompt_preview=result["prompt"][:200],
response_preview=result["answer"][:200],
)
# Score
ctx.record_score(name="relevance", value=0.85, data_type="numeric")
ctx.set_result({"answer": result["answer"]})
return result["answer"]
async def main():
session_id = generate_session_id()
user_id = "user_alice"
answer = await rag_query(
"How does Waxell handle observability?",
session_id=session_id,
user_id=user_id,
)
print(f"Answer: {answer}")
# Follow-up in the same session
answer2 = await rag_query(
"What about governance?",
session_id=session_id,
user_id=user_id,
)
print(f"Follow-up: {answer2}")
if __name__ == "__main__":
asyncio.run(main())
Next Steps
- Session Analytics -- Analyze conversation patterns across sessions
- Cost Optimization -- Reduce LLM spending with observability data
- Context Manager Reference -- Full
WaxellContextAPI reference - Cost Management -- Track and control LLM spending