OpenAI Integration
Three ways to add observability to OpenAI calls -- from zero-code auto-instrumentation to fine-grained manual control.
Approach 1: Auto-Instrumentation (Recommended)
The simplest approach -- call init() before importing OpenAI:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...", api_url="https://waxell.dev")
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced with model, tokens, cost
Approach 2: Drop-in Import
Pre-instrumented OpenAI module -- no init() needed:
from waxell_observe.openai import openai
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced
Approach 3: Context Manager (Full Control)
For fine-grained control over what gets recorded:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...")
from waxell_observe import WaxellContext, generate_session_id
from openai import AsyncOpenAI
client = AsyncOpenAI()
async with WaxellContext(
agent_name="rag-demo",
workflow_name="document-qa",
inputs={"query": "What are AI best practices?"},
session_id=generate_session_id(),
user_id="user_123",
) as ctx:
# Add tags for categorization
ctx.set_tag("demo", "rag")
ctx.set_tag("query_type", "multi-step")
ctx.set_metadata("document_corpus_size", 5)
# Step 1: Analyze query
analysis_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Identify key search terms."},
{"role": "user", "content": query},
],
)
analysis = analysis_response.choices[0].message.content
ctx.record_step("analyze_query", output={"terms": analysis})
ctx.record_llm_call(
model=analysis_response.model,
tokens_in=analysis_response.usage.prompt_tokens,
tokens_out=analysis_response.usage.completion_tokens,
task="analyze_query",
prompt_preview=query[:200],
response_preview=analysis[:200],
)
# Step 2: Retrieve documents
documents = retrieve_relevant_docs(analysis)
ctx.record_step("retrieve_documents", output={"count": len(documents)})
# Step 3: Synthesize answer
synthesis_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Synthesize an answer from documents."},
{"role": "user", "content": f"Question: {query}\n\nDocuments: {documents}"},
],
)
answer = synthesis_response.choices[0].message.content
ctx.record_step("synthesize_answer", output={"length": len(answer)})
ctx.record_llm_call(
model=synthesis_response.model,
tokens_in=synthesis_response.usage.prompt_tokens,
tokens_out=synthesis_response.usage.completion_tokens,
task="synthesize_answer",
)
ctx.set_result({"answer": answer, "documents_used": len(documents)})
Approach 4: Decorator Pattern
Function-level tracing with automatic IO capture:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...")
from waxell_observe import observe
from openai import AsyncOpenAI
client = AsyncOpenAI()
@observe(agent_name="chatbot", workflow_name="chat")
async def chat(message: str, waxell_ctx=None) -> str:
"""Chat function with automatic tracing."""
# Add enrichment
waxell_observe.tag("intent", "question")
waxell_observe.metadata("model_config", {"temperature": 0.7})
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
)
content = response.choices[0].message.content
# Manual LLM call recording for cost tracking
if waxell_ctx:
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="chat",
)
waxell_ctx.record_step("generate_response", output={"length": len(content)})
# Record quality score
waxell_observe.score("quality", 0.9)
return content
# Usage
result = await chat("What is machine learning?")
RAG Pipeline Example
Complete RAG pipeline with full observability:
import waxell_observe
waxell_observe.init(api_key="wax_sk_...", api_url="https://waxell.dev")
from waxell_observe import WaxellContext, generate_session_id
from waxell_observe.errors import PolicyViolationError
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def run_rag_pipeline(query: str, user_id: str = ""):
session = generate_session_id()
async with WaxellContext(
agent_name="rag-agent",
workflow_name="document-qa",
inputs={"query": query},
session_id=session,
user_id=user_id,
enforce_policy=True,
mid_execution_governance=True,
) as ctx:
ctx.set_tag("pipeline", "rag")
ctx.set_metadata("corpus_version", "2024-01")
try:
# Step 1: Query analysis
analysis = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Extract search terms."},
{"role": "user", "content": query},
],
)
terms = analysis.choices[0].message.content
ctx.record_step("analyze", output={"terms": terms})
ctx.record_llm_call(
model=analysis.model,
tokens_in=analysis.usage.prompt_tokens,
tokens_out=analysis.usage.completion_tokens,
)
# Step 2: Document retrieval (simulated)
docs = [{"id": 1, "content": "AI safety involves..."}]
ctx.record_step("retrieve", output={"count": len(docs)})
# Step 3: Synthesis
synthesis = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer using the documents."},
{"role": "user", "content": f"Q: {query}\nDocs: {docs}"},
],
)
answer = synthesis.choices[0].message.content
ctx.record_step("synthesize", output={"length": len(answer)})
ctx.record_llm_call(
model=synthesis.model,
tokens_in=synthesis.usage.prompt_tokens,
tokens_out=synthesis.usage.completion_tokens,
)
ctx.set_result({"answer": answer})
return answer
except PolicyViolationError as e:
print(f"Policy violation: {e}")
raise
# Run it
answer = await run_rag_pipeline(
"What are AI safety best practices?",
user_id="user_456"
)
Streaming Support
For streaming responses, accumulate chunks and record after completion:
async with WaxellContext(agent_name="streaming-agent") as ctx:
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
stream=True,
)
content = ""
tokens_in = 0
tokens_out = 0
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content
if chunk.usage:
tokens_in = chunk.usage.prompt_tokens
tokens_out = chunk.usage.completion_tokens
# Estimate tokens if not provided in stream
if not tokens_out:
tokens_in = 150 # Estimate
tokens_out = len(content.split()) * 2
ctx.record_llm_call(
model="gpt-4o-mini",
tokens_in=tokens_in,
tokens_out=tokens_out,
response_preview=content[:200],
)
ctx.set_result({"response": content})
Best Practices
- Use auto-instrumentation for simple cases -- no code changes needed
- Add context manager for multi-step pipelines -- correlates all calls
- Record steps for debugging -- understand execution flow
- Set user_id for attribution -- track per-user costs
- Use session_id for conversations -- group related runs
- Handle PolicyViolationError -- governance can block execution
Next Steps
- Streaming Integration -- Detailed streaming patterns
- Multi-Agent -- Coordinate multiple agents
- Governance -- Policy enforcement