Skip to main content

Context Manager

Start with Auto-Instrumentation

Most users should start with waxell.init() for zero-code tracing, then add @observe decorators for named traces. WaxellContext is for advanced cases where you need explicit control over the run lifecycle -- session IDs, user tracking, or multi-function workflows.

WaxellContext is a context manager that gives you explicit control over run lifecycle, LLM call recording, step tracking, and mid-execution policy checks. Use it when you need more flexibility than the @observe decorator provides.

It works as both async with (for async code) and plain with (for sync code).

Async Usage

from waxell_observe import WaxellContext

async with WaxellContext(agent_name="research-agent") as ctx:
result = await run_research(query)

ctx.record_llm_call(model="gpt-4o", tokens_in=300, tokens_out=150)
ctx.record_step("research", output={"sources": 5})
ctx.set_result({"answer": result})

Sync Usage

from waxell_observe import WaxellContext

with WaxellContext(agent_name="batch-processor") as ctx:
result = process_data(input_data)

ctx.record_llm_call(model="gpt-4o", tokens_in=300, tokens_out=150)
ctx.record_step("process", output={"items": 42})
ctx.set_result({"output": result})

The sync path uses native __enter__ / __exit__ with synchronous HTTP calls — ContextVars are set in the calling thread, so auto-instrumentation works correctly.

When to use sync vs async

Use with (sync) for batch processing scripts, CLI tools, ETL pipelines, and any code that doesn't use async/await. Use async with for async web servers, async agent frameworks, and code that's already async.

Lifecycle

On entering the context:

  1. Policies are checked (if enforce_policy=True)
  2. A new execution run is started on the control plane

On exiting the context:

  1. Buffered LLM calls are flushed to the control plane
  2. Buffered steps are flushed to the control plane
  3. The run is completed with success or error status

Enhanced Context Options

Session and User Tracking

Group related runs into sessions and track end-user identity:

with WaxellContext(
agent_name="my-chatbot",
session_id="session-abc-123", # Group related runs
user_id="user-456", # Track end-user
) as ctx:
# Your LLM calls here
response = call_llm(prompt)

Tags and Metadata

Add structured metadata to runs for filtering and analysis:

with WaxellContext(agent_name="my-agent") as ctx:
ctx.set_tag("environment", "production")
ctx.set_tag("pipeline", "rag-v2")
ctx.set_metadata("retrieval_count", 5)
ctx.set_metadata("model_version", "gpt-4-turbo")

# Your LLM calls here

Recording Scores

Capture quality metrics and user feedback:

with WaxellContext(agent_name="my-agent") as ctx:
response = call_llm(prompt)

# Numeric score (0-1 range)
ctx.record_score(
name="relevance",
value=0.92,
data_type="numeric",
comment="Highly relevant to the query"
)

# Boolean score
ctx.record_score(
name="contains_hallucination",
value=False,
data_type="boolean"
)

# Categorical score
ctx.record_score(
name="tone",
value="professional",
data_type="categorical"
)

Recording Steps

Track sub-operations within a run:

with WaxellContext(agent_name="rag-pipeline") as ctx:
# Step 1: Retrieval
docs = retrieve_documents(query)
ctx.record_step("retrieval", output={"doc_count": len(docs)})

# Step 2: Generation
response = generate_response(query, docs)
ctx.record_step("generation", output={"response_length": len(response)})

Constructor Parameters

ParameterTypeDefaultDescription
agent_namestr(required)Name for this agent in the control plane
workflow_namestr"default"Workflow name for grouping runs
inputsdict | NoneNoneInput data to record with the run
metadatadict | NoneNoneArbitrary metadata to attach to the run
clientWaxellObserveClient | NoneNonePre-configured client. If None, creates a new one using current configuration
enforce_policyboolTrueCheck policies on context entry
session_idstr""Session ID for grouping related runs
user_idstr""End-user ID for per-user tracking and analytics
user_groupstr""User group for authorization policies (e.g., "enterprise", "free")
mid_execution_governanceboolFalseFlush data and check governance on each record_step() call

Recording Methods

record_llm_call

Record an LLM API call. All parameters are keyword-only.

ctx.record_llm_call(
model="gpt-4o",
tokens_in=500,
tokens_out=200,
cost=0.0, # Optional: auto-estimated if 0.0
task="summarize", # Optional: label for this call
prompt_preview="...", # Optional: first N chars of prompt
response_preview="...", # Optional: first N chars of response
duration_ms=350, # Optional: call duration in milliseconds
provider="openai", # Optional: inferred from model name if empty
)
ParameterTypeDefaultDescription
modelstr(required)Model name (e.g., "gpt-4o", "claude-sonnet-4")
tokens_inint(required)Input/prompt token count
tokens_outint(required)Output/completion token count
costfloat0.0Cost in USD. If 0.0, automatically estimated using built-in model pricing
taskstr""A label describing this LLM call's purpose
prompt_previewstr""Preview of the prompt text
response_previewstr""Preview of the response text
duration_msint | NoneNoneLLM call duration in milliseconds
providerstr""Provider name (e.g., "openai", "anthropic"). If empty, inferred from model name

LLM calls are buffered in memory and flushed to the control plane when the context exits.

record_step

Record a named execution step.

ctx.record_step("extract_entities", output={"count": 12})
ParameterTypeDefaultDescription
step_namestr(required)Name identifying this step
outputdict | NoneNoneOptional output data for the step

Steps are automatically numbered in order of recording. Like LLM calls, they are buffered and flushed on context exit.

set_result

Set the final result for the run.

ctx.set_result({"answer": "The capital of France is Paris.", "confidence": 0.95})
ParameterTypeDefaultDescription
resultdict(required)Result data to include when the run is completed

Call this before the context exits. If not called, the run completes with an empty result.

check_policy / check_policy_sync

Perform a mid-execution policy check. This is useful for long-running agents that should re-validate policies between steps.

# Async
policy = await ctx.check_policy()

# Sync
policy = ctx.check_policy_sync()
if policy.blocked:
print(f"Blocked: {policy.reason}")
# Handle the block (e.g., stop processing)
elif policy.action == "warn":
print(f"Warning: {policy.reason}")
# Continue but log the warning

Returns a PolicyCheckResult with:

  • action -- one of "allow", "block", "warn", "throttle"
  • reason -- human-readable explanation
  • metadata -- additional policy data
  • allowed -- property, True if action is "allow" or "warn"
  • blocked -- property, True if action is "block" or "throttle"

record_score

Record a quality score or feedback metric for the current run.

ctx.record_score(
name="relevance",
value=0.92,
data_type="numeric",
comment="Highly relevant to the query",
)
ParameterTypeDefaultDescription
namestr(required)Score name (e.g., "relevance", "accuracy", "thumbs_up")
valuefloat | str | bool(required)Score value. Type depends on data_type
data_typestr"numeric"One of "numeric", "categorical", "boolean"
commentstr""Optional free-text comment

Scores are buffered and flushed to the control plane when the context exits.

set_tag

Set a searchable tag on the current run. Tags become OTel span attributes and are queryable in Grafana TraceQL.

ctx.set_tag("environment", "production")
ctx.set_tag("pipeline", "rag-v2")
ParameterTypeDefaultDescription
keystr(required)Tag name (alphanumeric, underscores, hyphens)
valuestr(required)Tag value (string only)

set_metadata

Set arbitrary metadata on the current run. Complex values are JSON-serialized.

ctx.set_metadata("retrieval_count", 5)
ctx.set_metadata("model_version", "gpt-4-turbo")
ParameterTypeDefaultDescription
keystr(required)Metadata key
valueAny(required)Any JSON-serializable value

Behavior Tracking

Track agent behaviors beyond LLM calls and steps. These methods buffer data as spans and flush on context exit.

record_tool_call

Record a tool or function call.

ctx.record_tool_call(
name="web_search",
input={"query": "latest news"},
output={"results": [...]},
duration_ms=250,
status="ok",
tool_type="api",
)
ParameterTypeDefaultDescription
namestr(required)Tool name (e.g., "web_search", "database_query")
inputdict | str""Tool input parameters
outputdict | str""Tool output/result
duration_msint | NoneNoneExecution time in milliseconds
statusstr"ok""ok" or "error"
tool_typestr"function"Classification: "function", "api", "database", "retriever"
errorstr""Error message if status is "error"

record_retrieval

Record a RAG document retrieval.

ctx.record_retrieval(
query="How does the billing system work?",
documents=[{"id": "doc1", "title": "Billing FAQ", "score": 0.92}],
source="pinecone",
duration_ms=120,
top_k=5,
scores=[0.92, 0.87, 0.81],
)
ParameterTypeDefaultDescription
querystr(required)The retrieval query string
documentslist[dict](required)Retrieved documents (e.g., [{id, title, score, snippet}])
sourcestr""Data source name (e.g., "pinecone", "elasticsearch")
duration_msint | NoneNoneRetrieval time in milliseconds
top_kint | NoneNoneNumber of documents requested
scoreslist[float] | NoneNoneRelevance scores for each retrieved document

record_decision

Record a decision or routing point.

ctx.record_decision(
name="route_to_agent",
options=["billing", "technical", "general"],
chosen="billing",
reasoning="User mentioned invoice and payment",
confidence=0.95,
)
ParameterTypeDefaultDescription
namestr(required)Decision name (e.g., "route_to_agent", "select_model")
optionslist[str](required)Available choices
chosenstr(required)The selected option
reasoningstr""Why this option was chosen
confidencefloat | NoneNoneConfidence score (0.0-1.0)
metadatadict | NoneNoneAdditional context
instrumentation_typestr"manual"How this decision was captured: "manual", "decorator", or "auto"

record_reasoning

Record a reasoning or chain-of-thought step.

ctx.record_reasoning(
step="evaluate_sources",
thought="Source A is more recent but Source B has higher authority",
evidence=["Source A: 2024", "Source B: cited 500 times"],
conclusion="Use Source B as primary, Source A as supplement",
)
ParameterTypeDefaultDescription
stepstr(required)Reasoning step name
thoughtstr(required)The reasoning text/thought process
evidencelist[str] | NoneNoneSupporting evidence or references
conclusionstr""Conclusion reached at this step

record_retry

Record a retry or fallback event.

ctx.record_retry(
attempt=2,
reason="Rate limited by OpenAI",
strategy="fallback",
original_error="429 Too Many Requests",
fallback_to="claude-sonnet-4",
max_attempts=3,
)
ParameterTypeDefaultDescription
attemptint(required)Current attempt number (1-based)
reasonstr(required)Why a retry/fallback occurred
strategystr"retry""retry", "fallback", or "circuit_break"
original_errorstr""The error that triggered the retry
fallback_tostr""Name of fallback target (model, agent, tool)
max_attemptsint | NoneNoneMaximum attempts configured

record_policy_check

Record a policy evaluation result as a governance span.

ctx.record_policy_check(
policy_name="budget-limit",
action="warn",
category="budget",
reason="Approaching 80% of daily budget",
phase="mid_execution",
)
ParameterTypeDefaultDescription
policy_namestr(required)Name of the policy evaluated
actionstr(required)Evaluation result: "allow", "warn", "block", etc.
categorystr""Policy category (e.g., "budget", "rate-limit")
reasonstr""Reason for the action (empty for allow)
duration_msfloat0Evaluation time in milliseconds
phasestr"pre_execution""pre_execution", "mid_execution", or "post_execution"
priorityint100Policy priority (lower = evaluated first)

Properties

PropertyTypeDescription
run_idstrThe run ID from the control plane, or "" if the run has not started

Error Handling

If an exception occurs inside the context, the run is automatically completed with status="error" and the error message. The exception is not suppressed -- it propagates normally:

# Async
try:
async with WaxellContext(agent_name="my-agent") as ctx:
raise ValueError("Something went wrong")
except ValueError:
pass # Run was completed with status="error"

# Sync
try:
with WaxellContext(agent_name="my-agent") as ctx:
raise ValueError("Something went wrong")
except ValueError:
pass # Run was completed with status="error"

If flushing telemetry to the control plane fails (e.g., network error), the failure is logged as a warning but does not interfere with your agent's execution.

Policy Enforcement on Entry

When enforce_policy=True, policies are checked before the run starts. If the policy result is block or throttle, a PolicyViolationError is raised and no run is created:

from waxell_observe.errors import PolicyViolationError

# Works identically with both async and sync context managers
try:
with WaxellContext(
agent_name="my-agent",
enforce_policy=True,
) as ctx:
...
except PolicyViolationError as e:
print(f"Blocked: {e}")
print(f"Action: {e.policy_result.action}")

When to Use Context Manager vs Decorator

Choose WaxellContext over @observe when you need:

  • Multi-step orchestration -- wrap complex logic that spans multiple functions
  • Mid-execution policy checks -- re-validate policies between steps
  • Explicit input/metadata control -- pass structured inputs and metadata at context creation
  • Multiple runs in one function -- start and complete several runs in sequence
  • Conditional observability -- only create a context under certain conditions
  • Synchronous code -- batch scripts, CLI tools, and ETL pipelines that don't use async

Example of multiple runs (sync -- natural fit for batch processing):

def batch_process(items: list[str]):
for item in items:
with WaxellContext(
agent_name="batch-processor",
inputs={"item": item},
) as ctx:
result = process_item(item)
ctx.record_llm_call(model="gpt-4o-mini", tokens_in=50, tokens_out=30)
ctx.set_result({"output": result})

The same pattern works with async with for async code:

async def batch_process(items: list[str]):
for item in items:
async with WaxellContext(
agent_name="batch-processor",
inputs={"item": item},
) as ctx:
result = await process_item(item)
ctx.record_llm_call(model="gpt-4o-mini", tokens_in=50, tokens_out=30)
ctx.set_result({"output": result})

Full Example (Async)

from waxell_observe import WaxellObserveClient, WaxellContext

WaxellObserveClient.configure(
api_url="https://acme.waxell.dev",
api_key="wax_sk_...",
)

async def run_pipeline(query: str) -> dict:
async with WaxellContext(
agent_name="research-pipeline",
workflow_name="deep-research",
inputs={"query": query},
metadata={"version": "2.1"},
enforce_policy=True,
) as ctx:
# Step 1: Search
sources = await search(query)
ctx.record_step("search", output={"source_count": len(sources)})

# Step 2: Synthesize
synthesis = await synthesize(query, sources)
ctx.record_llm_call(
model="claude-sonnet-4",
tokens_in=2000,
tokens_out=500,
task="synthesize",
)
ctx.record_step("synthesize", output={"length": len(synthesis)})

# Mid-execution policy check
policy = await ctx.check_policy()
if policy.blocked:
ctx.set_result({"error": "Policy blocked continuation"})
return {"error": policy.reason}

# Step 3: Refine
final = await refine(synthesis)
ctx.record_llm_call(
model="gpt-4o",
tokens_in=800,
tokens_out=300,
task="refine",
)
ctx.record_step("refine")

result = {"answer": final, "sources": len(sources)}
ctx.set_result(result)
return result

Full Example (Sync)

from waxell_observe import WaxellObserveClient, WaxellContext

WaxellObserveClient.configure(
api_url="https://acme.waxell.dev",
api_key="wax_sk_...",
)

def process_tickets(tickets: list[dict]) -> list[dict]:
results = []
for ticket in tickets:
with WaxellContext(
agent_name="ticket-processor",
workflow_name="support-pipeline",
inputs={"ticket_id": ticket["id"], "subject": ticket["subject"]},
enforce_policy=True,
) as ctx:
ctx.set_tag("priority", ticket["priority"])

# Step 1: Classify
category = classify_ticket(ticket)
ctx.record_llm_call(model="gpt-4o-mini", tokens_in=200, tokens_out=10, task="classify")
ctx.record_step("classify", output={"category": category})

# Step 2: Generate response
response = generate_response(ticket, category)
ctx.record_llm_call(model="gpt-4o", tokens_in=500, tokens_out=200, task="respond")
ctx.record_step("respond", output={"length": len(response)})

# Mid-execution policy check (sync variant)
policy = ctx.check_policy_sync()
if policy.blocked:
ctx.set_result({"error": policy.reason})
results.append({"ticket_id": ticket["id"], "error": policy.reason})
continue

ctx.record_score("response_quality", 0.9)
result = {"ticket_id": ticket["id"], "category": category, "response": response}
ctx.set_result(result)
results.append(result)

return results

Conversation State

WaxellContext automatically tracks conversation metrics from LLM calls:

  • ctx.conversation_turns — number of user turns in the conversation
  • ctx.context_utilization — context window usage as a percentage (0-100%)
  • ctx.message_count — total messages in the LLM context

These properties are read-only and updated automatically when auto-instrumentation records LLM calls.

Manual Recording

For agents not using auto-instrumented LLM providers:

ctx.record_user_message("What's the weather?")
ctx.record_agent_response("It's sunny in Paris today.")

These methods create IO spans that appear in the trace timeline alongside LLM calls and tool invocations. See Conversation Tracking for full details.

Next Steps