Skip to main content

Track a RAG Pipeline

Build and instrument a retrieval-augmented generation (RAG) pipeline with full observability -- including retrieval steps, LLM calls, sessions, user tracking, and quality scoring.

Prerequisites

  • Python 3.10+
  • A Waxell API key (get one from your control plane dashboard)
  • A vector database (this tutorial uses ChromaDB, but any will work)

What You'll Learn

  • Instrument retrieval and generation steps in a RAG pipeline
  • Use sessions and user tracking to group related interactions
  • Add metadata and tags for filtering and search
  • Record quality scores for evaluation
  • View traces in the Waxell dashboard

Step 1: Install Dependencies

pip install waxell-observe openai chromadb

Set your credentials:

export WAXELL_API_URL="https://acme.waxell.dev"
export WAXELL_API_KEY="wax_sk_..."
export OPENAI_API_KEY="sk-..."

Step 2: Set Up a Simple Vector Store

Create a ChromaDB collection and populate it with sample documents:

import chromadb

chroma_client = chromadb.Client()
collection = chroma_client.create_collection("knowledge_base")

# Add some documents
collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell_agent decorator captures inputs, outputs, and LLM calls automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)

Step 3: Create the RAG Function

Write a function that retrieves relevant documents and generates an answer:

import openai

client = openai.OpenAI()

def retrieve_documents(query: str, n_results: int = 3) -> list[str]:
"""Retrieve relevant documents from the vector store."""
results = collection.query(query_texts=[query], n_results=n_results)
return results["documents"][0]

def generate_answer(query: str, context_docs: list[str]) -> dict:
"""Generate an answer using retrieved context."""
context = "\n".join(f"- {doc}" for doc in context_docs)
prompt = f"""Answer the question using only the provided context.

Context:
{context}

Question: {query}

Answer:"""

response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)

message = response.choices[0].message.content
usage = response.usage

return {
"answer": message,
"prompt": prompt,
"tokens_in": usage.prompt_tokens,
"tokens_out": usage.completion_tokens,
}

Step 4: Wrap with WaxellContext

Use WaxellContext to track the entire RAG pipeline as a single observed run. Set session_id and user_id to group and attribute interactions:

from waxell_observe import WaxellContext

async def rag_query(query: str, session_id: str, user_id: str) -> str:
async with WaxellContext(
agent_name="rag-pipeline",
session_id=session_id,
user_id=user_id,
inputs={"query": query},
) as ctx:
# Step 1: Retrieve
docs = retrieve_documents(query)

# Step 2: Record the retrieval step
ctx.record_step(
step_name="retrieval",
output={"documents": docs, "count": len(docs)},
)

# Step 3: Generate answer
result = generate_answer(query, docs)

# Step 4: Record the LLM call
ctx.record_llm_call(
model="gpt-4o",
tokens_in=result["tokens_in"],
tokens_out=result["tokens_out"],
task="answer_generation",
prompt_preview=result["prompt"][:200],
response_preview=result["answer"][:200],
)

# Step 5: Set result and return
ctx.set_result({"answer": result["answer"]})
return result["answer"]
info

The session_id groups related queries from the same conversation. The user_id attributes runs to a specific user for analytics.

Step 5: Add Metadata and Tags

Tags are searchable string key-value pairs. Metadata supports any JSON-serializable value. Both are useful for filtering runs in the dashboard:

        # Inside the WaxellContext block, after retrieval:

# Tags: searchable string values
ctx.set_tag("pipeline", "rag-v1")
ctx.set_tag("environment", "production")

# Metadata: any JSON-serializable value
ctx.set_metadata("retrieval_count", len(docs))
ctx.set_metadata("model_config", {
"model": "gpt-4o",
"temperature": 0.7,
"max_tokens": 1024,
})
tip

Tags appear as OTel span attributes prefixed with waxell.tag. and are queryable in Grafana TraceQL: { span.waxell.tag.pipeline = "rag-v1" }.

Step 6: Add Quality Scoring

Record a relevance score to evaluate answer quality over time:

        # Inside the WaxellContext block, after generating the answer:

ctx.record_score(
name="relevance",
value=0.85,
data_type="numeric",
comment="Automated relevance assessment",
)

You can record multiple scores per run:

        ctx.record_score(name="relevance", value=0.85, data_type="numeric")
ctx.record_score(name="grounded", value=True, data_type="boolean")
ctx.record_score(name="tone", value="professional", data_type="categorical")

Step 7: View Results in the Dashboard

After running your pipeline, open your Waxell dashboard:

  1. Observability > Overview -- Find your rag-pipeline agent run with its input query, generated answer, token counts, and cost estimate.
  2. Observability > Sessions -- Click your session ID to see the full conversation timeline with all runs grouped together.
  3. Run Detail > Scores -- View the relevance scores attached to your runs.

Full Working Example

Here is the complete, runnable pipeline:

import asyncio

import chromadb
import openai
from waxell_observe import WaxellContext, WaxellObserveClient, generate_session_id

# ---- Configuration ----
WaxellObserveClient.configure(
api_url="https://acme.waxell.dev",
api_key="wax_sk_...",
)

openai_client = openai.OpenAI()

# ---- Vector Store ----
chroma = chromadb.Client()
collection = chroma.create_collection("knowledge_base")
collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell_agent decorator captures inputs and outputs automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)


def retrieve_documents(query: str, n_results: int = 3) -> list[str]:
results = collection.query(query_texts=[query], n_results=n_results)
return results["documents"][0]


def generate_answer(query: str, context_docs: list[str]) -> dict:
context = "\n".join(f"- {doc}" for doc in context_docs)
prompt = f"Answer using this context:\n{context}\n\nQuestion: {query}\nAnswer:"

response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
msg = response.choices[0].message.content
usage = response.usage
return {
"answer": msg,
"prompt": prompt,
"tokens_in": usage.prompt_tokens,
"tokens_out": usage.completion_tokens,
}


async def rag_query(query: str, session_id: str, user_id: str) -> str:
async with WaxellContext(
agent_name="rag-pipeline",
session_id=session_id,
user_id=user_id,
inputs={"query": query},
) as ctx:
# Retrieve
docs = retrieve_documents(query)
ctx.record_step(step_name="retrieval", output={"documents": docs})

# Tag and metadata
ctx.set_tag("pipeline", "rag-v1")
ctx.set_metadata("retrieval_count", len(docs))

# Generate
result = generate_answer(query, docs)
ctx.record_llm_call(
model="gpt-4o",
tokens_in=result["tokens_in"],
tokens_out=result["tokens_out"],
task="answer_generation",
prompt_preview=result["prompt"][:200],
response_preview=result["answer"][:200],
)

# Score
ctx.record_score(name="relevance", value=0.85, data_type="numeric")

ctx.set_result({"answer": result["answer"]})
return result["answer"]


async def main():
session_id = generate_session_id()
user_id = "user_alice"

answer = await rag_query(
"How does Waxell handle observability?",
session_id=session_id,
user_id=user_id,
)
print(f"Answer: {answer}")

# Follow-up in the same session
answer2 = await rag_query(
"What about governance?",
session_id=session_id,
user_id=user_id,
)
print(f"Follow-up: {answer2}")


if __name__ == "__main__":
asyncio.run(main())

Next Steps