Skip to main content

Decorator Pattern

The @observe decorator (also available as @waxell_agent) is the simplest way to add observability to any Python function. It wraps your function with automatic run tracking, IO capture, and policy enforcement -- with zero changes to your function's logic.

Basic Usage

from waxell_observe import observe

@observe(agent_name="support-bot")
async def handle_ticket(query: str) -> str:
return await process_query(query)
Alias

@observe and @waxell_agent are identical. Use whichever reads better in your codebase.

Every call to handle_ticket now:

  1. Checks policies (if enforce_policy=True)
  2. Starts an execution run on the control plane
  3. Captures function inputs and return value
  4. Completes the run with success/error status

Enhanced Decorator Options

Session and User Tracking

Pass a session ID to group related runs, and user ID for attribution:

@observe(
agent_name="my-chatbot",
session_id="session-abc-123",
user_id="user_456",
user_group="enterprise",
)
def chat(message: str):
return call_llm(message)
info

The session_id on the decorator can be set statically (applies to every invocation) or dynamically at call time. See Dynamic Call-time Overrides below.

Scores, Tags, and Metadata

Use the injected waxell_ctx or top-level convenience functions to enrich traces:

import waxell_observe
from waxell_observe import observe

@observe(agent_name="my-agent")
async def run_agent(query: str, waxell_ctx=None) -> str:
# Top-level convenience functions (no ctx needed)
waxell_observe.tag("pipeline", "rag-v2")
waxell_observe.metadata("model_version", "gpt-4-turbo")

response = await call_llm(query)

# Or use the context directly
if waxell_ctx:
waxell_ctx.record_score(
name="relevance",
value=0.95,
data_type="numeric",
)

# Record multiple score types
waxell_observe.score("quality", 0.92)
waxell_observe.score("safety", True, data_type="boolean")
waxell_observe.score("category", "informational", data_type="categorical")

return response

Parameters

ParameterTypeDefaultDescription
agent_namestr | NoneNoneName for this agent. Defaults to the decorated function's name
workflow_namestr"default"Workflow name for grouping runs
enforce_policyboolTrueCheck policies before execution. Raises PolicyViolationError if blocked
capture_ioboolTrueCapture function inputs and outputs in the run record
session_idstr""Session ID for grouping related runs
user_idstr""End-user ID for attribution and analytics
user_groupstr""User group for authorization policies
mid_execution_governanceboolFalseFlush data and check governance on each record_step() call
clientWaxellObserveClient | NoneNonePre-configured client instance. If None, uses current configuration

Async Functions

The decorator works natively with async functions:

@observe(agent_name="research-agent", workflow_name="analyze")
async def analyze_data(dataset: dict) -> dict:
results = await run_analysis(dataset)
return {"findings": results}

Sync Functions

Sync functions are also supported. The decorator wraps them in an async execution context internally:

@observe(agent_name="classifier")
def classify_text(text: str) -> str:
return model.predict(text)
warning

Sync wrappers use asyncio.run() by default. When called inside an already-running event loop (e.g., Jupyter notebooks, uvicorn, or other async frameworks), the decorator falls back to running in a ThreadPoolExecutor to avoid blocking the event loop. This works but adds threading overhead -- if your application is async, prefer making the decorated function async for best performance.

Context Injection

To record LLM calls, steps, or perform mid-execution policy checks, add a waxell_ctx parameter to your function signature. The decorator automatically injects a WaxellContext instance:

@observe(agent_name="support-bot")
async def handle_ticket(query: str, waxell_ctx=None) -> str:
# Record an LLM call
response = await call_openai(query)
if waxell_ctx:
waxell_ctx.record_llm_call(
model="gpt-4o",
tokens_in=150,
tokens_out=80,
task="answer_question",
prompt_preview=query[:500],
response_preview=response[:500],
)

# Record an execution step
if waxell_ctx:
waxell_ctx.record_step("generate_response", output={"length": len(response)})

return response
tip

Default waxell_ctx=None ensures your function works normally when called without the decorator -- for example, in unit tests. Always guard with if waxell_ctx: before recording.

Available Context Methods

When waxell_ctx is injected, you have access to all WaxellContext recording methods:

MethodDescription
record_llm_call(*, model, tokens_in, tokens_out, cost=0.0, task="", prompt_preview="", response_preview="", duration_ms=None, provider="")Record an LLM call with token counts and optional cost
record_step(step_name, output=None)Record a named execution step
record_score(name, value, data_type="numeric", comment="")Record a quality score or feedback metric
record_tool_call(*, name, input="", output="", duration_ms=None, status="ok", tool_type="function", error="")Record a tool/function call
record_retrieval(*, query, documents, source="", duration_ms=None, top_k=None, scores=None)Record a RAG retrieval operation
record_decision(*, name, options, chosen, reasoning="", confidence=None, metadata=None, instrumentation_type="manual")Record a decision/routing point
record_reasoning(*, step, thought, evidence=None, conclusion="")Record a reasoning/chain-of-thought step
record_retry(*, attempt, reason, strategy="retry", original_error="", fallback_to="", max_attempts=None)Record a retry or fallback event
set_tag(key, value)Set a searchable tag (string value) on the current span
set_metadata(key, value)Set arbitrary metadata (any JSON-serializable value)
set_result(result)Set the run result (overrides auto-captured output)
check_policy() / check_policy_sync()Perform a mid-execution policy check (async / sync)
record_policy_check(*, policy_name, action, category="", reason="", duration_ms=0, phase="pre_execution", priority=100)Record a policy evaluation result
run_idProperty returning the current run ID

IO Capture

When capture_io=True (the default), the decorator captures:

  • Inputs: All positional and keyword arguments, serialized to JSON-safe values
  • Outputs: The return value, serialized as a dict

Non-serializable values are converted to their string representation. To disable capture (for sensitive data):

@waxell_agent(agent_name="sensitive-agent", capture_io=False)
async def process_pii(data: dict) -> dict:
...

Error Handling

If the decorated function raises an exception, the run is automatically completed with status="error" and the error message is recorded:

@observe(agent_name="risky-agent")
async def might_fail(input: str) -> str:
if not input:
raise ValueError("Input required")
return await process(input)

# The run is recorded with status="error" and the ValueError message
try:
result = await might_fail("")
except ValueError:
pass # The error is already recorded in the run

The original exception is always re-raised so your error handling works as expected.

Policy Enforcement

With enforce_policy=True, the decorator checks policies before running your function:

from waxell_observe.errors import PolicyViolationError

@observe(agent_name="my-agent", enforce_policy=True)
async def my_function(query: str) -> str:
return await process(query)

try:
result = await my_function("test")
except PolicyViolationError as e:
print(f"Blocked: {e}")
print(f"Action: {e.policy_result.action}")
print(f"Reason: {e.policy_result.reason}")

Set enforce_policy=False to skip the check:

@observe(agent_name="my-agent", enforce_policy=False)
async def my_function(query: str) -> str:
...

When to Use Decorator vs Context Manager

Use Decorator When...Use Context Manager When...
You have a single function that represents an agent runYou need to wrap complex multi-step logic
You want minimal code changesYou need multiple policy checks during execution
Auto IO capture is sufficientYou want explicit control over run start/complete
One function = one runOne run spans multiple functions or classes

Full Example

When combined with waxell.init(), LLM calls are auto-captured. The @observe decorator adds run tracking, and specialized decorators (@tool, @decision, etc.) add behavior recording.

import waxell_observe as waxell

# init() BEFORE importing LLM SDKs -- patches them for auto-instrumentation
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")

from openai import AsyncOpenAI

client = AsyncOpenAI()

@waxell.tool(tool_type="api")
async def fetch_context(question: str) -> dict:
"""Tool calls are auto-recorded with timing and IO."""
docs = await retrieve_documents(question)
return {"docs": docs, "count": len(docs)}

@waxell.observe(
agent_name="qa-agent",
workflow_name="answer-question",
enforce_policy=True,
)
async def answer_question(question: str) -> str:
# Tool call -- auto-recorded by @tool decorator
result = await fetch_context(question)

# LLM call -- auto-captured by init(), no manual recording needed
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer based on context."},
{"role": "user", "content": f"{question}\n\n{result['docs']}"},
],
)
answer = response.choices[0].message.content

# Enrich with scores and tags (convenience functions)
waxell.score("relevance", 0.95)
waxell.score("grounded", True, data_type="boolean")
waxell.tag("pipeline", "rag-v2")
waxell.metadata("doc_count", result["count"])

return answer

# Run it
result = await answer_question("What is Waxell?")

Dynamic Call-time Overrides

The @observe decorator supports passing context parameters at call time. Any keyword argument matching a WaxellContext parameter that is not in the wrapped function's signature is intercepted and passed to the context:

@observe(agent_name="my-agent")
async def run(query: str):
return await process(query)

# Dynamic session/user at call time:
result = await run(
"What is RAG?",
session_id="sess_abc123",
user_id="user_456",
user_group="enterprise",
)

Supported overrides: session_id, user_id, user_group, enforce_policy, mid_execution_governance, client, inputs, metadata, workflow_name.


@tool Decorator

Auto-record function calls as tool invocations:

import waxell_observe as waxell

@waxell.tool(tool_type="vector_db")
def create_index(dim: int):
import faiss
return faiss.IndexFlatL2(dim)

@waxell.tool(tool_type="api")
async def call_weather_api(city: str):
return await httpx.get(f"https://api.weather.com/{city}")
ParameterTypeDefaultDescription
namestr | NoneNoneTool name. Defaults to the function name
tool_typestr"function"Classification: "function", "vector_db", "database", "api"

@decision Decorator

Auto-record a function's return value as a decision:

import waxell_observe as waxell

@waxell.decision(name="route_task", options=["direct", "research", "multi_agent"])
async def route_task(query: str) -> dict:
response = await client.chat.completions.create(...)
return {"chosen": "research", "reasoning": "Complex query", "confidence": 0.92}

The SDK extracts chosen, reasoning, and confidence from dict returns. For string returns, the entire string is used as chosen.

ParameterTypeDefaultDescription
namestr | NoneNoneDecision name. Defaults to function name
optionslist[str] | NoneNoneAvailable choices

@retrieval Decorator

Auto-instrumented

If you use a supported vector database SDK (Pinecone, Chroma, Weaviate, Qdrant, Milvus, FAISS, LanceDB, pgvector, etc.), retrieval operations are captured automatically with zero code. Use @retrieval for custom search functions that aren't auto-instrumented.

Auto-record search and retrieval operations:

import waxell_observe as waxell

@waxell.retrieval(source="faiss")
async def search_docs(query: str, top_k: int = 5) -> list[dict]:
results = await vector_store.search(query, top_k=top_k)
return [{"id": r.id, "title": r.title, "score": r.score} for r in results]
ParameterTypeDefaultDescription
sourcestr""Data source name
namestr | NoneNoneOverride name. Defaults to function name

@reasoning Decorator

Auto-record chain-of-thought steps:

import waxell_observe as waxell

@waxell.reasoning_dec(step="quality_check")
async def assess_quality(answer: str) -> dict:
return {
"thought": "Answer is well-grounded in sources",
"evidence": ["Source A cited", "Source B referenced"],
"conclusion": "High quality",
}
ParameterTypeDefaultDescription
stepstr | NoneNoneStep name. Defaults to function name

@retry Decorator

Wrap a function with retry logic and automatic retry recording:

import waxell_observe as waxell

@waxell.retry_dec(max_attempts=3, strategy="retry")
async def call_llm(prompt: str) -> str:
return await client.chat.completions.create(...)
ParameterTypeDefaultDescription
max_attemptsint3Maximum attempts
strategystr"retry""retry", "fallback", or "circuit_break"
fallback_tostr""Fallback target name

@step Decorator

Auto-record function calls as execution steps:

import waxell_observe as waxell

@waxell.step_dec(name="preprocess")
async def preprocess(query: str) -> dict:
return {"cleaned": query.strip().lower()}
ParameterTypeDefaultDescription
namestr | NoneNoneStep name. Defaults to function name

Top-Level Convenience Functions

These functions delegate to the current context and are no-ops outside a WaxellContext:

FunctionDescription
waxell.score(name, value, data_type="numeric", comment="")Record a quality score
waxell.tag(key, value)Set a searchable tag
waxell.metadata(key, value)Set arbitrary metadata
waxell.decide(name, chosen, options=None, reasoning="", confidence=None)Record a decision
waxell.step(name, output=None)Record an execution step
waxell.reason(step, thought, evidence=None, conclusion="")Record a reasoning step
waxell.retrieve(query, documents, source="", scores=None)Record a retrieval
waxell.retry(attempt, reason, strategy="retry", original_error="", fallback_to="")Record a retry event

Next Steps