Skip to main content

Track a RAG Pipeline

Build and instrument a retrieval-augmented generation (RAG) pipeline with full observability using the decorator pattern -- including retrieval steps, LLM calls, sessions, user tracking, and quality scoring.

Prerequisites

  • Python 3.10+
  • A Waxell API key (get one from your control plane dashboard)
  • A vector database (this tutorial uses ChromaDB, but any will work)

What You'll Learn

  • Wrap retrieval and generation functions with @waxell.retrieval and @waxell.observe
  • Let waxell.init() auto-instrument the LLM calls
  • Group related interactions with sessions and user tracking
  • Add metadata, tags, and quality scores inline
  • View traces in the Waxell dashboard

Step 1: Install Dependencies

pip install waxell-observe openai chromadb

Set your credentials:

export WAXELL_API_URL="https://acme.waxell.dev"
export WAXELL_API_KEY="wax_sk_..."
export OPENAI_API_KEY="sk-..."

Step 2: Initialize Waxell

Call init() before importing OpenAI -- this enables auto-instrumentation, so every OpenAI call is captured automatically with model, tokens, cost, and latency.

import waxell_observe as waxell

waxell.init() # reads WAXELL_API_KEY + WAXELL_API_URL from env

# Import OpenAI AFTER init() so it's auto-instrumented
import openai
client = openai.OpenAI()

Step 3: Set Up a Vector Store

Create a ChromaDB collection and populate it. ChromaDB is also auto-instrumented -- queries appear automatically in the trace.

import chromadb

chroma = chromadb.Client()
collection = chroma.create_collection("knowledge_base")

collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell.observe decorator captures inputs, outputs, and LLM calls automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)

Step 4: Decorate Retrieval and Generation

Wrap your retrieval function with @waxell.retrieval to capture the query, documents, and scores as a structured span:

@waxell.retrieval(source="chromadb")
def retrieve_documents(query: str, n_results: int = 3) -> list[dict]:
"""Returns documents with scores so the decorator can record them."""
results = collection.query(query_texts=[query], n_results=n_results)
docs = results["documents"][0]
distances = results["distances"][0]
# Convert distances to similarity scores (higher is better)
return [
{"id": f"doc{i}", "text": doc, "score": 1.0 - dist}
for i, (doc, dist) in enumerate(zip(docs, distances))
]

The generation step doesn't need a decorator -- waxell.init() auto-instruments OpenAI:

def generate_answer(query: str, docs: list[dict]) -> str:
context = "\n".join(f"- {d['text']}" for d in docs)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
],
)
return response.choices[0].message.content

Step 5: Wrap the Pipeline with @observe

The @observe decorator creates a tracked execution run. Pass session_id and user_id to group conversations and attribute usage:

@waxell.observe(agent_name="rag-pipeline")
async def rag_query(query: str) -> str:
docs = retrieve_documents(query) # auto-recorded by @retrieval
answer = generate_answer(query, docs) # auto-recorded by init()

# Inline enrichment
waxell.tag("pipeline", "rag-v1")
waxell.tag("environment", "production")
waxell.metadata("retrieval_count", len(docs))
waxell.score("relevance", 0.85)
waxell.score("grounded", True, data_type="boolean")

return answer

That's the entire instrumented pipeline. No WaxellContext, no ctx.record_llm_call, no ctx.record_step -- decorators and auto-instrumentation handle everything.

Step 6: Call with Session and User

Pass session_id and user_id at call time. The @observe decorator intercepts these kwargs and applies them to the run:

import asyncio
from waxell_observe import generate_session_id

async def main():
session = generate_session_id()

answer = await rag_query(
"How does Waxell handle observability?",
session_id=session,
user_id="user_alice",
)
print(f"Answer: {answer}")

# Follow-up in the same session
answer2 = await rag_query(
"What about governance?",
session_id=session,
user_id="user_alice",
)
print(f"Follow-up: {answer2}")

asyncio.run(main())

Step 7: View Results in the Dashboard

After running your pipeline, open your Waxell dashboard:

  1. Observability > Overview -- Find your rag-pipeline agent run with its query, generated answer, token counts, and cost estimate.
  2. Observability > Sessions -- Click your session ID to see the full conversation timeline with both queries grouped.
  3. Run Detail -- See the retrieval span (with documents and scores), the auto-captured LLM call (model, tokens, cost), and the inline scores/tags/metadata.

Full Working Example

import asyncio
import waxell_observe as waxell

waxell.init() # before OpenAI import

import openai
import chromadb
from waxell_observe import generate_session_id

# ---- Setup ----
client = openai.OpenAI()
chroma = chromadb.Client()
collection = chroma.create_collection("knowledge_base")
collection.add(
documents=[
"Waxell agents use a decorator pattern for observability.",
"The @waxell.observe decorator captures inputs and outputs automatically.",
"Governance policies can block, warn, or throttle agent executions.",
"Sessions group related runs together for conversation tracking.",
"Cost management tracks token usage and estimates spending per model.",
],
ids=["doc1", "doc2", "doc3", "doc4", "doc5"],
)


# ---- Instrumented pipeline ----
@waxell.retrieval(source="chromadb")
def retrieve_documents(query: str, n_results: int = 3) -> list[dict]:
results = collection.query(query_texts=[query], n_results=n_results)
docs = results["documents"][0]
distances = results["distances"][0]
return [
{"id": f"doc{i}", "text": doc, "score": 1.0 - dist}
for i, (doc, dist) in enumerate(zip(docs, distances))
]


def generate_answer(query: str, docs: list[dict]) -> str:
context = "\n".join(f"- {d['text']}" for d in docs)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}],
)
return response.choices[0].message.content


@waxell.observe(agent_name="rag-pipeline")
async def rag_query(query: str) -> str:
docs = retrieve_documents(query)
answer = generate_answer(query, docs)

waxell.tag("pipeline", "rag-v1")
waxell.metadata("retrieval_count", len(docs))
waxell.score("relevance", 0.85)

return answer


# ---- Run it ----
async def main():
session = generate_session_id()

answer = await rag_query(
"How does Waxell handle observability?",
session_id=session,
user_id="user_alice",
)
print(f"Answer: {answer}")

answer2 = await rag_query(
"What about governance?",
session_id=session,
user_id="user_alice",
)
print(f"Follow-up: {answer2}")


if __name__ == "__main__":
asyncio.run(main())

Why Decorators, Not WaxellContext?

This tutorial uses the decorator pattern because:

  • waxell.init() auto-instruments OpenAI and ChromaDB -- no manual record_llm_call or record_step needed
  • @waxell.retrieval captures query, documents, and scores from the function's return value
  • @waxell.observe creates the run with session/user attribution
  • waxell.score() / tag() / metadata() convenience functions add inline enrichment

WaxellContext is reserved for advanced cases: batch loops with many runs, multi-step orchestration spanning multiple agent functions, or mid-execution policy checks. See the Context Manager page for those scenarios.

Next Steps