Decorator Pattern
The @observe decorator (also available as @waxell_agent) is the simplest way to add observability to any Python function. It wraps your function with automatic run tracking, IO capture, and policy enforcement -- with zero changes to your function's logic.
Basic Usage
from waxell_observe import observe
@observe(agent_name="support-bot")
async def handle_ticket(query: str) -> str:
return await process_query(query)
@observe and @waxell_agent are identical. Use whichever reads better in your codebase.
Every call to handle_ticket now:
- Checks policies (if
enforce_policy=True) - Starts an execution run on the control plane
- Captures function inputs and return value
- Completes the run with success/error status
Enhanced Decorator Options
Session and User Tracking
Pass a session ID to group related runs, and user ID for attribution:
@observe(
agent_name="my-chatbot",
session_id="session-abc-123",
user_id="user_456",
user_group="enterprise",
)
def chat(message: str):
return call_llm(message)
The session_id on the decorator can be set statically (applies to every invocation) or dynamically at call time. See Dynamic Call-time Overrides below.
Scores, Tags, and Metadata
Use the injected waxell_ctx or top-level convenience functions to enrich traces:
import waxell_observe
from waxell_observe import observe
@observe(agent_name="my-agent")
async def run_agent(query: str, waxell_ctx=None) -> str:
# Top-level convenience functions (no ctx needed)
waxell_observe.tag("pipeline", "rag-v2")
waxell_observe.metadata("model_version", "gpt-4-turbo")
response = await call_llm(query)
# Or use the context directly
if waxell_ctx:
waxell_ctx.record_score(
name="relevance",
value=0.95,
data_type="numeric",
)
# Record multiple score types
waxell_observe.score("quality", 0.92)
waxell_observe.score("safety", True, data_type="boolean")
waxell_observe.score("category", "informational", data_type="categorical")
return response
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name | str | None | None | Name for this agent. Defaults to the decorated function's name |
workflow_name | str | "default" | Workflow name for grouping runs |
enforce_policy | bool | True | Check policies before execution. Raises PolicyViolationError if blocked |
capture_io | bool | True | Capture function inputs and outputs in the run record |
session_id | str | "" | Session ID for grouping related runs |
user_id | str | "" | End-user ID for attribution and analytics |
user_group | str | "" | User group for authorization policies |
mid_execution_governance | bool | False | Flush data and check governance on each record_step() call |
client | WaxellObserveClient | None | None | Pre-configured client instance. If None, uses current configuration |
Async Functions
The decorator works natively with async functions:
@observe(agent_name="research-agent", workflow_name="analyze")
async def analyze_data(dataset: dict) -> dict:
results = await run_analysis(dataset)
return {"findings": results}
Sync Functions
Sync functions are also supported. The decorator wraps them in an async execution context internally:
@observe(agent_name="classifier")
def classify_text(text: str) -> str:
return model.predict(text)
Sync wrappers use asyncio.run() by default. When called inside an already-running event loop (e.g., Jupyter notebooks, uvicorn, or other async frameworks), the decorator falls back to running in a ThreadPoolExecutor to avoid blocking the event loop. This works but adds threading overhead -- if your application is async, prefer making the decorated function async for best performance.
Context Injection
To record LLM calls, steps, or perform mid-execution policy checks, add a waxell_ctx parameter to your function signature. The decorator automatically injects a WaxellContext instance:
@observe(agent_name="support-bot")
async def handle_ticket(query: str, waxell_ctx=None) -> str:
# Record an LLM call
response = await call_openai(query)
if waxell_ctx:
waxell_ctx.record_llm_call(
model="gpt-4o",
tokens_in=150,
tokens_out=80,
task="answer_question",
prompt_preview=query[:500],
response_preview=response[:500],
)
# Record an execution step
if waxell_ctx:
waxell_ctx.record_step("generate_response", output={"length": len(response)})
return response
Default waxell_ctx=None ensures your function works normally when called without the decorator -- for example, in unit tests. Always guard with if waxell_ctx: before recording.
Available Context Methods
When waxell_ctx is injected, you have access to all WaxellContext recording methods:
| Method | Description |
|---|---|
record_llm_call(*, model, tokens_in, tokens_out, cost=0.0, task="", prompt_preview="", response_preview="", duration_ms=None, provider="") | Record an LLM call with token counts and optional cost |
record_step(step_name, output=None) | Record a named execution step |
record_score(name, value, data_type="numeric", comment="") | Record a quality score or feedback metric |
record_tool_call(*, name, input="", output="", duration_ms=None, status="ok", tool_type="function", error="") | Record a tool/function call |
record_retrieval(*, query, documents, source="", duration_ms=None, top_k=None, scores=None) | Record a RAG retrieval operation |
record_decision(*, name, options, chosen, reasoning="", confidence=None, metadata=None, instrumentation_type="manual") | Record a decision/routing point |
record_reasoning(*, step, thought, evidence=None, conclusion="") | Record a reasoning/chain-of-thought step |
record_retry(*, attempt, reason, strategy="retry", original_error="", fallback_to="", max_attempts=None) | Record a retry or fallback event |
set_tag(key, value) | Set a searchable tag (string value) on the current span |
set_metadata(key, value) | Set arbitrary metadata (any JSON-serializable value) |
set_result(result) | Set the run result (overrides auto-captured output) |
check_policy() / check_policy_sync() | Perform a mid-execution policy check (async / sync) |
record_policy_check(*, policy_name, action, category="", reason="", duration_ms=0, phase="pre_execution", priority=100) | Record a policy evaluation result |
run_id | Property returning the current run ID |
IO Capture
When capture_io=True (the default), the decorator captures:
- Inputs: All positional and keyword arguments, serialized to JSON-safe values
- Outputs: The return value, serialized as a dict
Non-serializable values are converted to their string representation. To disable capture (for sensitive data):
@waxell_agent(agent_name="sensitive-agent", capture_io=False)
async def process_pii(data: dict) -> dict:
...
Error Handling
If the decorated function raises an exception, the run is automatically completed with status="error" and the error message is recorded:
@observe(agent_name="risky-agent")
async def might_fail(input: str) -> str:
if not input:
raise ValueError("Input required")
return await process(input)
# The run is recorded with status="error" and the ValueError message
try:
result = await might_fail("")
except ValueError:
pass # The error is already recorded in the run
The original exception is always re-raised so your error handling works as expected.
Policy Enforcement
With enforce_policy=True, the decorator checks policies before running your function:
from waxell_observe.errors import PolicyViolationError
@observe(agent_name="my-agent", enforce_policy=True)
async def my_function(query: str) -> str:
return await process(query)
try:
result = await my_function("test")
except PolicyViolationError as e:
print(f"Blocked: {e}")
print(f"Action: {e.policy_result.action}")
print(f"Reason: {e.policy_result.reason}")
Set enforce_policy=False to skip the check:
@observe(agent_name="my-agent", enforce_policy=False)
async def my_function(query: str) -> str:
...
When to Use Decorator vs Context Manager
| Use Decorator When... | Use Context Manager When... |
|---|---|
| You have a single function that represents an agent run | You need to wrap complex multi-step logic |
| You want minimal code changes | You need multiple policy checks during execution |
| Auto IO capture is sufficient | You want explicit control over run start/complete |
| One function = one run | One run spans multiple functions or classes |
Full Example
When combined with waxell.init(), LLM calls are auto-captured. The @observe decorator adds run tracking, and specialized decorators (@tool, @decision, etc.) add behavior recording.
import waxell_observe as waxell
# init() BEFORE importing LLM SDKs -- patches them for auto-instrumentation
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
from openai import AsyncOpenAI
client = AsyncOpenAI()
@waxell.tool(tool_type="api")
async def fetch_context(question: str) -> dict:
"""Tool calls are auto-recorded with timing and IO."""
docs = await retrieve_documents(question)
return {"docs": docs, "count": len(docs)}
@waxell.observe(
agent_name="qa-agent",
workflow_name="answer-question",
enforce_policy=True,
)
async def answer_question(question: str) -> str:
# Tool call -- auto-recorded by @tool decorator
result = await fetch_context(question)
# LLM call -- auto-captured by init(), no manual recording needed
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer based on context."},
{"role": "user", "content": f"{question}\n\n{result['docs']}"},
],
)
answer = response.choices[0].message.content
# Enrich with scores and tags (convenience functions)
waxell.score("relevance", 0.95)
waxell.score("grounded", True, data_type="boolean")
waxell.tag("pipeline", "rag-v2")
waxell.metadata("doc_count", result["count"])
return answer
# Run it
result = await answer_question("What is Waxell?")
Dynamic Call-time Overrides
The @observe decorator supports passing context parameters at call time. Any keyword argument matching a WaxellContext parameter that is not in the wrapped function's signature is intercepted and passed to the context:
@observe(agent_name="my-agent")
async def run(query: str):
return await process(query)
# Dynamic session/user at call time:
result = await run(
"What is RAG?",
session_id="sess_abc123",
user_id="user_456",
user_group="enterprise",
)
Supported overrides: session_id, user_id, user_group, enforce_policy, mid_execution_governance, client, inputs, metadata, workflow_name.
@tool Decorator
Auto-record function calls as tool invocations:
import waxell_observe as waxell
@waxell.tool(tool_type="vector_db")
def create_index(dim: int):
import faiss
return faiss.IndexFlatL2(dim)
@waxell.tool(tool_type="api")
async def call_weather_api(city: str):
return await httpx.get(f"https://api.weather.com/{city}")
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None | Tool name. Defaults to the function name |
tool_type | str | "function" | Classification: "function", "vector_db", "database", "api" |
@decision Decorator
Auto-record a function's return value as a decision:
import waxell_observe as waxell
@waxell.decision(name="route_task", options=["direct", "research", "multi_agent"])
async def route_task(query: str) -> dict:
response = await client.chat.completions.create(...)
return {"chosen": "research", "reasoning": "Complex query", "confidence": 0.92}
The SDK extracts chosen, reasoning, and confidence from dict returns. For string returns, the entire string is used as chosen.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None | Decision name. Defaults to function name |
options | list[str] | None | None | Available choices |
@retrieval Decorator
If you use a supported vector database SDK (Pinecone, Chroma, Weaviate, Qdrant, Milvus, FAISS, LanceDB, pgvector, etc.), retrieval operations are captured automatically with zero code. Use @retrieval for custom search functions that aren't auto-instrumented.
Auto-record search and retrieval operations:
import waxell_observe as waxell
@waxell.retrieval(source="faiss")
async def search_docs(query: str, top_k: int = 5) -> list[dict]:
results = await vector_store.search(query, top_k=top_k)
return [{"id": r.id, "title": r.title, "score": r.score} for r in results]
| Parameter | Type | Default | Description |
|---|---|---|---|
source | str | "" | Data source name |
name | str | None | None | Override name. Defaults to function name |
@reasoning Decorator
Auto-record chain-of-thought steps:
import waxell_observe as waxell
@waxell.reasoning_dec(step="quality_check")
async def assess_quality(answer: str) -> dict:
return {
"thought": "Answer is well-grounded in sources",
"evidence": ["Source A cited", "Source B referenced"],
"conclusion": "High quality",
}
| Parameter | Type | Default | Description |
|---|---|---|---|
step | str | None | None | Step name. Defaults to function name |
@retry Decorator
Wrap a function with retry logic and automatic retry recording:
import waxell_observe as waxell
@waxell.retry_dec(max_attempts=3, strategy="retry")
async def call_llm(prompt: str) -> str:
return await client.chat.completions.create(...)
| Parameter | Type | Default | Description |
|---|---|---|---|
max_attempts | int | 3 | Maximum attempts |
strategy | str | "retry" | "retry", "fallback", or "circuit_break" |
fallback_to | str | "" | Fallback target name |
@step Decorator
Auto-record function calls as execution steps:
import waxell_observe as waxell
@waxell.step_dec(name="preprocess")
async def preprocess(query: str) -> dict:
return {"cleaned": query.strip().lower()}
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None | Step name. Defaults to function name |
Top-Level Convenience Functions
These functions delegate to the current context and are no-ops outside a WaxellContext:
| Function | Description |
|---|---|
waxell.score(name, value, data_type="numeric", comment="") | Record a quality score |
waxell.tag(key, value) | Set a searchable tag |
waxell.metadata(key, value) | Set arbitrary metadata |
waxell.decide(name, chosen, options=None, reasoning="", confidence=None) | Record a decision |
waxell.step(name, output=None) | Record an execution step |
waxell.reason(step, thought, evidence=None, conclusion="") | Record a reasoning step |
waxell.retrieve(query, documents, source="", scores=None) | Record a retrieval |
waxell.retry(attempt, reason, strategy="retry", original_error="", fallback_to="") | Record a retry event |
Next Steps
- Context Manager -- For more complex instrumentation needs
- Behavior Tracking -- Deep dive into all behavior types
- LangChain Integration -- Auto-capture with LangChain callbacks
- Policy & Governance -- Configure and enforce policies
- Sessions -- Group related runs
- User Tracking -- Track end-user identity
- Scoring -- Quality metrics