Skip to main content

OpenAI Integration

Three ways to add observability to OpenAI calls -- from zero-code auto-instrumentation to fine-grained manual control.

The simplest approach -- call init() before importing OpenAI:

import waxell_observe
waxell_observe.init(api_key="wax_sk_...", api_url="https://waxell.dev")

from openai import OpenAI

client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced with model, tokens, cost

Approach 2: Drop-in Import

Pre-instrumented OpenAI module -- no init() needed:

from waxell_observe.openai import openai

client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced

Approach 3: Context Manager (Full Control)

For fine-grained control over what gets recorded:

import waxell_observe
waxell_observe.init(api_key="wax_sk_...")

from waxell_observe import WaxellContext, generate_session_id
from openai import AsyncOpenAI

client = AsyncOpenAI()

async with WaxellContext(
agent_name="rag-demo",
workflow_name="document-qa",
inputs={"query": "What are AI best practices?"},
session_id=generate_session_id(),
user_id="user_123",
) as ctx:
# Add tags for categorization
ctx.set_tag("demo", "rag")
ctx.set_tag("query_type", "multi-step")
ctx.set_metadata("document_corpus_size", 5)

# Step 1: Analyze query
analysis_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Identify key search terms."},
{"role": "user", "content": query},
],
)
analysis = analysis_response.choices[0].message.content

ctx.record_step("analyze_query", output={"terms": analysis})
ctx.record_llm_call(
model=analysis_response.model,
tokens_in=analysis_response.usage.prompt_tokens,
tokens_out=analysis_response.usage.completion_tokens,
task="analyze_query",
prompt_preview=query[:200],
response_preview=analysis[:200],
)

# Step 2: Retrieve documents
documents = retrieve_relevant_docs(analysis)
ctx.record_step("retrieve_documents", output={"count": len(documents)})

# Step 3: Synthesize answer
synthesis_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Synthesize an answer from documents."},
{"role": "user", "content": f"Question: {query}\n\nDocuments: {documents}"},
],
)
answer = synthesis_response.choices[0].message.content

ctx.record_step("synthesize_answer", output={"length": len(answer)})
ctx.record_llm_call(
model=synthesis_response.model,
tokens_in=synthesis_response.usage.prompt_tokens,
tokens_out=synthesis_response.usage.completion_tokens,
task="synthesize_answer",
)

ctx.set_result({"answer": answer, "documents_used": len(documents)})

Approach 4: Decorator Pattern

Function-level tracing with automatic IO capture:

import waxell_observe
waxell_observe.init(api_key="wax_sk_...")

from waxell_observe import observe
from openai import AsyncOpenAI

client = AsyncOpenAI()

@observe(agent_name="chatbot", workflow_name="chat")
async def chat(message: str, waxell_ctx=None) -> str:
"""Chat function with automatic tracing."""

# Add enrichment
waxell_observe.tag("intent", "question")
waxell_observe.metadata("model_config", {"temperature": 0.7})

response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": message}],
)
content = response.choices[0].message.content

# Manual LLM call recording for cost tracking
if waxell_ctx:
waxell_ctx.record_llm_call(
model=response.model,
tokens_in=response.usage.prompt_tokens,
tokens_out=response.usage.completion_tokens,
task="chat",
)
waxell_ctx.record_step("generate_response", output={"length": len(content)})

# Record quality score
waxell_observe.score("quality", 0.9)

return content

# Usage
result = await chat("What is machine learning?")

RAG Pipeline Example

Complete RAG pipeline with full observability:

import waxell_observe
waxell_observe.init(api_key="wax_sk_...", api_url="https://waxell.dev")

from waxell_observe import WaxellContext, generate_session_id
from waxell_observe.errors import PolicyViolationError
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def run_rag_pipeline(query: str, user_id: str = ""):
session = generate_session_id()

async with WaxellContext(
agent_name="rag-agent",
workflow_name="document-qa",
inputs={"query": query},
session_id=session,
user_id=user_id,
enforce_policy=True,
mid_execution_governance=True,
) as ctx:
ctx.set_tag("pipeline", "rag")
ctx.set_metadata("corpus_version", "2024-01")

try:
# Step 1: Query analysis
analysis = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Extract search terms."},
{"role": "user", "content": query},
],
)
terms = analysis.choices[0].message.content
ctx.record_step("analyze", output={"terms": terms})
ctx.record_llm_call(
model=analysis.model,
tokens_in=analysis.usage.prompt_tokens,
tokens_out=analysis.usage.completion_tokens,
)

# Step 2: Document retrieval (simulated)
docs = [{"id": 1, "content": "AI safety involves..."}]
ctx.record_step("retrieve", output={"count": len(docs)})

# Step 3: Synthesis
synthesis = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer using the documents."},
{"role": "user", "content": f"Q: {query}\nDocs: {docs}"},
],
)
answer = synthesis.choices[0].message.content
ctx.record_step("synthesize", output={"length": len(answer)})
ctx.record_llm_call(
model=synthesis.model,
tokens_in=synthesis.usage.prompt_tokens,
tokens_out=synthesis.usage.completion_tokens,
)

ctx.set_result({"answer": answer})
return answer

except PolicyViolationError as e:
print(f"Policy violation: {e}")
raise

# Run it
answer = await run_rag_pipeline(
"What are AI safety best practices?",
user_id="user_456"
)

Streaming Support

For streaming responses, accumulate chunks and record after completion:

async with WaxellContext(agent_name="streaming-agent") as ctx:
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": query}],
stream=True,
)

content = ""
tokens_in = 0
tokens_out = 0

async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content
if chunk.usage:
tokens_in = chunk.usage.prompt_tokens
tokens_out = chunk.usage.completion_tokens

# Estimate tokens if not provided in stream
if not tokens_out:
tokens_in = 150 # Estimate
tokens_out = len(content.split()) * 2

ctx.record_llm_call(
model="gpt-4o-mini",
tokens_in=tokens_in,
tokens_out=tokens_out,
response_preview=content[:200],
)
ctx.set_result({"response": content})

Best Practices

  1. Use auto-instrumentation for simple cases -- no code changes needed
  2. Add context manager for multi-step pipelines -- correlates all calls
  3. Record steps for debugging -- understand execution flow
  4. Set user_id for attribution -- track per-user costs
  5. Use session_id for conversations -- group related runs
  6. Handle PolicyViolationError -- governance can block execution

Next Steps