Context Manager
Most users should start with waxell.init() for zero-code tracing, then add @observe decorators for named traces. WaxellContext is for advanced cases where you need explicit control over the run lifecycle -- session IDs, user tracking, or multi-function workflows.
WaxellContext is a context manager that gives you explicit control over run lifecycle, LLM call recording, step tracking, and mid-execution policy checks. Use it when you need more flexibility than the @observe decorator provides.
It works as both async with (for async code) and plain with (for sync code).
Async Usage
from waxell_observe import WaxellContext
async with WaxellContext(agent_name="research-agent") as ctx:
result = await run_research(query)
ctx.record_llm_call(model="gpt-4o", tokens_in=300, tokens_out=150)
ctx.record_step("research", output={"sources": 5})
ctx.set_result({"answer": result})
Sync Usage
from waxell_observe import WaxellContext
with WaxellContext(agent_name="batch-processor") as ctx:
result = process_data(input_data)
ctx.record_llm_call(model="gpt-4o", tokens_in=300, tokens_out=150)
ctx.record_step("process", output={"items": 42})
ctx.set_result({"output": result})
The sync path uses native __enter__ / __exit__ with synchronous HTTP calls — ContextVars are set in the calling thread, so auto-instrumentation works correctly.
Use with (sync) for batch processing scripts, CLI tools, ETL pipelines, and any code that doesn't use async/await. Use async with for async web servers, async agent frameworks, and code that's already async.
Lifecycle
On entering the context:
- Policies are checked (if
enforce_policy=True) - A new execution run is started on the control plane
On exiting the context:
- Buffered LLM calls are flushed to the control plane
- Buffered steps are flushed to the control plane
- The run is completed with success or error status
Enhanced Context Options
Session and User Tracking
Group related runs into sessions and track end-user identity:
with WaxellContext(
agent_name="my-chatbot",
session_id="session-abc-123", # Group related runs
user_id="user-456", # Track end-user
) as ctx:
# Your LLM calls here
response = call_llm(prompt)
Tags and Metadata
Add structured metadata to runs for filtering and analysis:
with WaxellContext(agent_name="my-agent") as ctx:
ctx.set_tag("environment", "production")
ctx.set_tag("pipeline", "rag-v2")
ctx.set_metadata("retrieval_count", 5)
ctx.set_metadata("model_version", "gpt-4-turbo")
# Your LLM calls here
Recording Scores
Capture quality metrics and user feedback:
with WaxellContext(agent_name="my-agent") as ctx:
response = call_llm(prompt)
# Numeric score (0-1 range)
ctx.record_score(
name="relevance",
value=0.92,
data_type="numeric",
comment="Highly relevant to the query"
)
# Boolean score
ctx.record_score(
name="contains_hallucination",
value=False,
data_type="boolean"
)
# Categorical score
ctx.record_score(
name="tone",
value="professional",
data_type="categorical"
)
Recording Steps
Track sub-operations within a run:
with WaxellContext(agent_name="rag-pipeline") as ctx:
# Step 1: Retrieval
docs = retrieve_documents(query)
ctx.record_step("retrieval", output={"doc_count": len(docs)})
# Step 2: Generation
response = generate_response(query, docs)
ctx.record_step("generation", output={"response_length": len(response)})
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name | str | (required) | Name for this agent in the control plane |
workflow_name | str | "default" | Workflow name for grouping runs |
inputs | dict | None | None | Input data to record with the run |
metadata | dict | None | None | Arbitrary metadata to attach to the run |
client | WaxellObserveClient | None | None | Pre-configured client. If None, creates a new one using current configuration |
enforce_policy | bool | True | Check policies on context entry |
session_id | str | "" | Session ID for grouping related runs |
user_id | str | "" | End-user ID for per-user tracking and analytics |
user_group | str | "" | User group for authorization policies (e.g., "enterprise", "free") |
mid_execution_governance | bool | False | Flush data and check governance on each record_step() call |
Recording Methods
record_llm_call
Record an LLM API call. All parameters are keyword-only.
ctx.record_llm_call(
model="gpt-4o",
tokens_in=500,
tokens_out=200,
cost=0.0, # Optional: auto-estimated if 0.0
task="summarize", # Optional: label for this call
prompt_preview="...", # Optional: first N chars of prompt
response_preview="...", # Optional: first N chars of response
duration_ms=350, # Optional: call duration in milliseconds
provider="openai", # Optional: inferred from model name if empty
)
| Parameter | Type | Default | Description |
|---|---|---|---|
model | str | (required) | Model name (e.g., "gpt-4o", "claude-sonnet-4") |
tokens_in | int | (required) | Input/prompt token count |
tokens_out | int | (required) | Output/completion token count |
cost | float | 0.0 | Cost in USD. If 0.0, automatically estimated using built-in model pricing |
task | str | "" | A label describing this LLM call's purpose |
prompt_preview | str | "" | Preview of the prompt text |
response_preview | str | "" | Preview of the response text |
duration_ms | int | None | None | LLM call duration in milliseconds |
provider | str | "" | Provider name (e.g., "openai", "anthropic"). If empty, inferred from model name |
LLM calls are buffered in memory and flushed to the control plane when the context exits.
record_step
Record a named execution step.
ctx.record_step("extract_entities", output={"count": 12})
| Parameter | Type | Default | Description |
|---|---|---|---|
step_name | str | (required) | Name identifying this step |
output | dict | None | None | Optional output data for the step |
Steps are automatically numbered in order of recording. Like LLM calls, they are buffered and flushed on context exit.
set_result
Set the final result for the run.
ctx.set_result({"answer": "The capital of France is Paris.", "confidence": 0.95})
| Parameter | Type | Default | Description |
|---|---|---|---|
result | dict | (required) | Result data to include when the run is completed |
Call this before the context exits. If not called, the run completes with an empty result.
check_policy / check_policy_sync
Perform a mid-execution policy check. This is useful for long-running agents that should re-validate policies between steps.
# Async
policy = await ctx.check_policy()
# Sync
policy = ctx.check_policy_sync()
if policy.blocked:
print(f"Blocked: {policy.reason}")
# Handle the block (e.g., stop processing)
elif policy.action == "warn":
print(f"Warning: {policy.reason}")
# Continue but log the warning
Returns a PolicyCheckResult with:
action-- one of"allow","block","warn","throttle"reason-- human-readable explanationmetadata-- additional policy dataallowed-- property,Trueif action is"allow"or"warn"blocked-- property,Trueif action is"block"or"throttle"
record_score
Record a quality score or feedback metric for the current run.
ctx.record_score(
name="relevance",
value=0.92,
data_type="numeric",
comment="Highly relevant to the query",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Score name (e.g., "relevance", "accuracy", "thumbs_up") |
value | float | str | bool | (required) | Score value. Type depends on data_type |
data_type | str | "numeric" | One of "numeric", "categorical", "boolean" |
comment | str | "" | Optional free-text comment |
Scores are buffered and flushed to the control plane when the context exits.
set_tag
Set a searchable tag on the current run. Tags become OTel span attributes and are queryable in Grafana TraceQL.
ctx.set_tag("environment", "production")
ctx.set_tag("pipeline", "rag-v2")
| Parameter | Type | Default | Description |
|---|---|---|---|
key | str | (required) | Tag name (alphanumeric, underscores, hyphens) |
value | str | (required) | Tag value (string only) |
set_metadata
Set arbitrary metadata on the current run. Complex values are JSON-serialized.
ctx.set_metadata("retrieval_count", 5)
ctx.set_metadata("model_version", "gpt-4-turbo")
| Parameter | Type | Default | Description |
|---|---|---|---|
key | str | (required) | Metadata key |
value | Any | (required) | Any JSON-serializable value |
Behavior Tracking
Track agent behaviors beyond LLM calls and steps. These methods buffer data as spans and flush on context exit.
record_tool_call
Record a tool or function call.
ctx.record_tool_call(
name="web_search",
input={"query": "latest news"},
output={"results": [...]},
duration_ms=250,
status="ok",
tool_type="api",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Tool name (e.g., "web_search", "database_query") |
input | dict | str | "" | Tool input parameters |
output | dict | str | "" | Tool output/result |
duration_ms | int | None | None | Execution time in milliseconds |
status | str | "ok" | "ok" or "error" |
tool_type | str | "function" | Classification: "function", "api", "database", "retriever" |
error | str | "" | Error message if status is "error" |
record_retrieval
Record a RAG document retrieval.
ctx.record_retrieval(
query="How does the billing system work?",
documents=[{"id": "doc1", "title": "Billing FAQ", "score": 0.92}],
source="pinecone",
duration_ms=120,
top_k=5,
scores=[0.92, 0.87, 0.81],
)
| Parameter | Type | Default | Description |
|---|---|---|---|
query | str | (required) | The retrieval query string |
documents | list[dict] | (required) | Retrieved documents (e.g., [{id, title, score, snippet}]) |
source | str | "" | Data source name (e.g., "pinecone", "elasticsearch") |
duration_ms | int | None | None | Retrieval time in milliseconds |
top_k | int | None | None | Number of documents requested |
scores | list[float] | None | None | Relevance scores for each retrieved document |
record_decision
Record a decision or routing point.
ctx.record_decision(
name="route_to_agent",
options=["billing", "technical", "general"],
chosen="billing",
reasoning="User mentioned invoice and payment",
confidence=0.95,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Decision name (e.g., "route_to_agent", "select_model") |
options | list[str] | (required) | Available choices |
chosen | str | (required) | The selected option |
reasoning | str | "" | Why this option was chosen |
confidence | float | None | None | Confidence score (0.0-1.0) |
metadata | dict | None | None | Additional context |
instrumentation_type | str | "manual" | How this decision was captured: "manual", "decorator", or "auto" |
record_reasoning
Record a reasoning or chain-of-thought step.
ctx.record_reasoning(
step="evaluate_sources",
thought="Source A is more recent but Source B has higher authority",
evidence=["Source A: 2024", "Source B: cited 500 times"],
conclusion="Use Source B as primary, Source A as supplement",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
step | str | (required) | Reasoning step name |
thought | str | (required) | The reasoning text/thought process |
evidence | list[str] | None | None | Supporting evidence or references |
conclusion | str | "" | Conclusion reached at this step |
record_retry
Record a retry or fallback event.
ctx.record_retry(
attempt=2,
reason="Rate limited by OpenAI",
strategy="fallback",
original_error="429 Too Many Requests",
fallback_to="claude-sonnet-4",
max_attempts=3,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
attempt | int | (required) | Current attempt number (1-based) |
reason | str | (required) | Why a retry/fallback occurred |
strategy | str | "retry" | "retry", "fallback", or "circuit_break" |
original_error | str | "" | The error that triggered the retry |
fallback_to | str | "" | Name of fallback target (model, agent, tool) |
max_attempts | int | None | None | Maximum attempts configured |
record_policy_check
Record a policy evaluation result as a governance span.
ctx.record_policy_check(
policy_name="budget-limit",
action="warn",
category="budget",
reason="Approaching 80% of daily budget",
phase="mid_execution",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
policy_name | str | (required) | Name of the policy evaluated |
action | str | (required) | Evaluation result: "allow", "warn", "block", etc. |
category | str | "" | Policy category (e.g., "budget", "rate-limit") |
reason | str | "" | Reason for the action (empty for allow) |
duration_ms | float | 0 | Evaluation time in milliseconds |
phase | str | "pre_execution" | "pre_execution", "mid_execution", or "post_execution" |
priority | int | 100 | Policy priority (lower = evaluated first) |
Properties
| Property | Type | Description |
|---|---|---|
run_id | str | The run ID from the control plane, or "" if the run has not started |
Error Handling
If an exception occurs inside the context, the run is automatically completed with status="error" and the error message. The exception is not suppressed -- it propagates normally:
# Async
try:
async with WaxellContext(agent_name="my-agent") as ctx:
raise ValueError("Something went wrong")
except ValueError:
pass # Run was completed with status="error"
# Sync
try:
with WaxellContext(agent_name="my-agent") as ctx:
raise ValueError("Something went wrong")
except ValueError:
pass # Run was completed with status="error"
If flushing telemetry to the control plane fails (e.g., network error), the failure is logged as a warning but does not interfere with your agent's execution.
Policy Enforcement on Entry
When enforce_policy=True, policies are checked before the run starts. If the policy result is block or throttle, a PolicyViolationError is raised and no run is created:
from waxell_observe.errors import PolicyViolationError
# Works identically with both async and sync context managers
try:
with WaxellContext(
agent_name="my-agent",
enforce_policy=True,
) as ctx:
...
except PolicyViolationError as e:
print(f"Blocked: {e}")
print(f"Action: {e.policy_result.action}")
When to Use Context Manager vs Decorator
Choose WaxellContext over @observe when you need:
- Multi-step orchestration -- wrap complex logic that spans multiple functions
- Mid-execution policy checks -- re-validate policies between steps
- Explicit input/metadata control -- pass structured inputs and metadata at context creation
- Multiple runs in one function -- start and complete several runs in sequence
- Conditional observability -- only create a context under certain conditions
- Synchronous code -- batch scripts, CLI tools, and ETL pipelines that don't use async
Example of multiple runs (sync -- natural fit for batch processing):
def batch_process(items: list[str]):
for item in items:
with WaxellContext(
agent_name="batch-processor",
inputs={"item": item},
) as ctx:
result = process_item(item)
ctx.record_llm_call(model="gpt-4o-mini", tokens_in=50, tokens_out=30)
ctx.set_result({"output": result})
The same pattern works with async with for async code:
async def batch_process(items: list[str]):
for item in items:
async with WaxellContext(
agent_name="batch-processor",
inputs={"item": item},
) as ctx:
result = await process_item(item)
ctx.record_llm_call(model="gpt-4o-mini", tokens_in=50, tokens_out=30)
ctx.set_result({"output": result})
Full Example (Async)
from waxell_observe import WaxellObserveClient, WaxellContext
WaxellObserveClient.configure(
api_url="https://acme.waxell.dev",
api_key="wax_sk_...",
)
async def run_pipeline(query: str) -> dict:
async with WaxellContext(
agent_name="research-pipeline",
workflow_name="deep-research",
inputs={"query": query},
metadata={"version": "2.1"},
enforce_policy=True,
) as ctx:
# Step 1: Search
sources = await search(query)
ctx.record_step("search", output={"source_count": len(sources)})
# Step 2: Synthesize
synthesis = await synthesize(query, sources)
ctx.record_llm_call(
model="claude-sonnet-4",
tokens_in=2000,
tokens_out=500,
task="synthesize",
)
ctx.record_step("synthesize", output={"length": len(synthesis)})
# Mid-execution policy check
policy = await ctx.check_policy()
if policy.blocked:
ctx.set_result({"error": "Policy blocked continuation"})
return {"error": policy.reason}
# Step 3: Refine
final = await refine(synthesis)
ctx.record_llm_call(
model="gpt-4o",
tokens_in=800,
tokens_out=300,
task="refine",
)
ctx.record_step("refine")
result = {"answer": final, "sources": len(sources)}
ctx.set_result(result)
return result
Full Example (Sync)
from waxell_observe import WaxellObserveClient, WaxellContext
WaxellObserveClient.configure(
api_url="https://acme.waxell.dev",
api_key="wax_sk_...",
)
def process_tickets(tickets: list[dict]) -> list[dict]:
results = []
for ticket in tickets:
with WaxellContext(
agent_name="ticket-processor",
workflow_name="support-pipeline",
inputs={"ticket_id": ticket["id"], "subject": ticket["subject"]},
enforce_policy=True,
) as ctx:
ctx.set_tag("priority", ticket["priority"])
# Step 1: Classify
category = classify_ticket(ticket)
ctx.record_llm_call(model="gpt-4o-mini", tokens_in=200, tokens_out=10, task="classify")
ctx.record_step("classify", output={"category": category})
# Step 2: Generate response
response = generate_response(ticket, category)
ctx.record_llm_call(model="gpt-4o", tokens_in=500, tokens_out=200, task="respond")
ctx.record_step("respond", output={"length": len(response)})
# Mid-execution policy check (sync variant)
policy = ctx.check_policy_sync()
if policy.blocked:
ctx.set_result({"error": policy.reason})
results.append({"ticket_id": ticket["id"], "error": policy.reason})
continue
ctx.record_score("response_quality", 0.9)
result = {"ticket_id": ticket["id"], "category": category, "response": response}
ctx.set_result(result)
results.append(result)
return results
Conversation State
WaxellContext automatically tracks conversation metrics from LLM calls:
ctx.conversation_turns— number of user turns in the conversationctx.context_utilization— context window usage as a percentage (0-100%)ctx.message_count— total messages in the LLM context
These properties are read-only and updated automatically when auto-instrumentation records LLM calls.
Manual Recording
For agents not using auto-instrumented LLM providers:
ctx.record_user_message("What's the weather?")
ctx.record_agent_response("It's sunny in Paris today.")
These methods create IO spans that appear in the trace timeline alongside LLM calls and tool invocations. See Conversation Tracking for full details.
Next Steps
- Decorator Pattern -- Simpler alternative for single-function agents
- LLM Call Tracking -- Details on captured LLM data
- Conversation Tracking -- Auto-captured conversation data
- Policy & Governance -- Policy actions and enforcement
- Sessions -- Group related runs
- User Tracking -- Track end-user identity
- Scoring -- Quality metrics