OpenAI Integration
Four ways to add observability to OpenAI calls -- from zero-code auto-instrumentation to function-level decorators.
Approach 1: Auto-Instrumentation (Recommended)
The simplest approach -- call init() before importing OpenAI:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...", api_url="https://waxell.dev")
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced with model, tokens, cost
Approach 2: Drop-in Import
Pre-instrumented OpenAI module -- no init() needed:
from waxell_observe.openai import openai
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced
Approach 3: Multi-Step Pipeline with Decorators
Use @observe for the pipeline and @retrieval / convenience functions for the steps in between. init() auto-captures every OpenAI call -- no manual record_llm_call needed.
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
# Import AFTER init() so OpenAI is auto-instrumented
from openai import AsyncOpenAI
client = AsyncOpenAI()
@waxell.retrieval(source="docs-corpus")
def retrieve_documents(terms: str) -> list[dict]:
"""@retrieval auto-extracts query, documents, and scores from the return value."""
raw = vector_store.search(terms, top_k=5)
return [
{"id": r.id, "text": r.text, "score": r.score}
for r in raw
]
@waxell.observe(agent_name="rag-demo", workflow_name="document-qa")
async def document_qa(query: str) -> dict:
waxell.tag("demo", "rag")
waxell.tag("query_type", "multi-step")
waxell.metadata("document_corpus_size", 5)
# Step 1: Analyze query (LLM call auto-captured)
analysis_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Identify key search terms."},
{"role": "user", "content": query},
],
)
terms = analysis_response.choices[0].message.content
waxell.step("analyze_query", output={"terms": terms})
# Step 2: Retrieve documents (auto-recorded by @retrieval)
documents = retrieve_documents(terms)
# Step 3: Synthesize answer (LLM call auto-captured)
synthesis_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Synthesize an answer from documents."},
{"role": "user", "content": f"Question: {query}\n\nDocuments: {documents}"},
],
)
answer = synthesis_response.choices[0].message.content
waxell.score("grounded", True, data_type="boolean")
return {"answer": answer, "documents_used": len(documents)}
# Run with session and user tracking
result = await document_qa(
"What are AI best practices?",
session_id="sess_abc123",
user_id="user_123",
)
Approach 4: Decorator Pattern
Function-level tracing with automatic IO capture. The @observe decorator handles the run lifecycle; init() auto-captures every LLM call -- no manual recording needed.
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
from openai import AsyncOpenAI
client = AsyncOpenAI()
@waxell.observe(agent_name="chatbot", workflow_name="chat")
async def chat(message: str) -> str:
"""Chat function with automatic tracing."""
# Enrichment via convenience functions
waxell.tag("intent", "question")
waxell.metadata("model_config", {"temperature": 0.7})
# LLM call auto-captured by init() -- no manual record_llm_call needed
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
)
content = response.choices[0].message.content
waxell.score("quality", 0.9)
return content
# Usage -- pass session_id / user_id at call time
result = await chat(
"What is machine learning?",
session_id="sess_abc123",
user_id="user_456",
)
RAG Pipeline Example
Complete RAG pipeline using @observe and @retrieval. Every OpenAI call is auto-captured by init().
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
from waxell_observe.errors import PolicyViolationError
from openai import AsyncOpenAI
client = AsyncOpenAI()
@waxell.retrieval(source="docs-corpus")
def retrieve_docs(terms: str) -> list[dict]:
# Replace with your vector store; @retrieval auto-extracts scores
return [{"id": "doc1", "text": "AI safety involves...", "score": 0.91}]
@waxell.observe(
agent_name="rag-agent",
workflow_name="document-qa",
enforce_policy=True,
)
async def run_rag_pipeline(query: str) -> str:
waxell.tag("pipeline", "rag")
waxell.metadata("corpus_version", "2024-01")
# Step 1: Query analysis (auto-captured)
analysis = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Extract search terms."},
{"role": "user", "content": query},
],
)
terms = analysis.choices[0].message.content
waxell.step("analyze", output={"terms": terms})
# Step 2: Retrieval (auto-recorded by @retrieval)
docs = retrieve_docs(terms)
# Step 3: Synthesis (auto-captured)
synthesis = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer using the documents."},
{"role": "user", "content": f"Q: {query}\nDocs: {docs}"},
],
)
answer = synthesis.choices[0].message.content
waxell.score("grounded", True, data_type="boolean")
return answer
# Run it
try:
answer = await run_rag_pipeline(
"What are AI safety best practices?",
session_id="sess_xyz",
user_id="user_456",
)
except PolicyViolationError as e:
print(f"Policy violation: {e}")
Streaming Support
Auto-instrumentation handles OpenAI streaming end-to-end -- the SDK wraps the stream iterator to aggregate content, token counts, and timing. Just iterate the stream as you normally would:
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")
from openai import AsyncOpenAI
client = AsyncOpenAI()
@waxell.observe(agent_name="streaming-agent")
async def stream_response(query: str) -> str:
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
stream=True,
)
content = ""
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content
print(chunk.choices[0].delta.content, end="", flush=True)
return content
See Streaming Integration for advanced patterns when you need custom stream processing.
Best Practices
- Use auto-instrumentation for simple cases -- no code changes needed
- Wrap pipelines in
@observe-- groups all LLM calls into a single run - Use
waxell.step()/waxell.score()/waxell.tag()-- enrichment without managing a context object - Pass
session_idanduser_idat call time -- the decorator intercepts these kwargs - Handle PolicyViolationError -- governance can block execution
Advanced: Context Manager
For batch loops or multi-agent orchestration that spans multiple functions, see the Context Manager page. For 90%+ of use cases, the decorator pattern above is the right call.
Next Steps
- Streaming Integration -- Detailed streaming patterns
- Multi-Agent -- Coordinate multiple agents
- Governance -- Policy enforcement