Python SDK Reference
This is the complete API reference for the waxell-observe package. All public symbols are exported from the top-level waxell_observe module.
from waxell_observe import (
# Top-level functions
init,
shutdown,
generate_session_id,
# Decorators
observe, # Agent run decorator (alias for waxell_agent)
waxell_agent, # Original agent decorator (identical to observe)
tool, # @tool decorator
decision, # @decision decorator
retrieval, # @retrieval decorator
reasoning_dec, # @reasoning decorator
retry_dec, # @retry decorator
step_dec, # @step decorator
# Convenience functions (work within active context)
tag,
metadata,
score,
decide,
step,
reason,
retrieve,
retry,
get_context,
# Human-in-the-loop
input, # Drop-in replacement for input()
human_turn, # Context manager for non-terminal channels
human_interaction, # One-shot recording
# Approval handlers
prompt_approval, # Terminal Y/N prompt
auto_approve, # Always approve (testing)
auto_deny, # Always deny (testing)
# Core classes
WaxellObserveClient,
ObserveConfig,
WaxellContext,
HumanTurn,
# Types
ApprovalDecision,
LlmCallInfo,
PolicyCheckResult,
PromptGuardResult,
RunCompleteResult,
RunInfo,
# Errors
ConfigurationError,
ObserveError,
PolicyViolationError,
PromptGuardError,
)
init
import waxell_observe
waxell_observe.init(
api_key: str = "",
api_url: str = "",
capture_content: bool = False,
instrument: list[str] | None = None,
instrument_infra: bool = True,
infra_libraries: list[str] | None = None,
infra_exclude: list[str] | None = None,
resource_attributes: dict | None = None,
debug: bool = False,
prompt_guard: bool = False,
prompt_guard_server: bool = False,
prompt_guard_action: str = "block",
) -> None
One-line initialization for waxell-observe. This single call configures the HTTP client, initializes OTel tracing (if installed), and auto-instruments installed LLM libraries.
| Parameter | Type | Default | Description |
|---|---|---|---|
api_key | str | "" | Waxell API key (wax_sk_...). Falls back to WAXELL_API_KEY env var |
api_url | str | "" | Waxell API URL. Falls back to WAXELL_API_URL env var |
capture_content | bool | False | Include prompt/response content in OTel traces |
instrument | list[str] | None | None | Libraries to auto-instrument (e.g. ["openai", "anthropic"]). None means auto-detect all installed libraries |
instrument_infra | bool | True | Enable auto-instrumentation of infrastructure libraries (HTTP clients, databases, caches, queues). Falls back to WAXELL_INSTRUMENT_INFRA env var |
infra_libraries | list[str] | None | None | Only instrument these specific infra libraries (e.g. ["redis", "httpx"]). None means auto-detect all |
infra_exclude | list[str] | None | None | Instrument all infra libraries except these (e.g. ["celery", "grpc"]). Falls back to WAXELL_INFRA_EXCLUDE env var (comma-delimited) |
resource_attributes | dict | None | None | Custom OTel resource attributes applied to all spans (e.g. {"deployment.environment": "production"}) |
debug | bool | False | Enable debug logging and console span export |
prompt_guard | bool | False | Enable client-side prompt guard (PII, credential, injection detection). Falls back to WAXELL_PROMPT_GUARD env var |
prompt_guard_server | bool | False | Also check server-side guard service (ML-powered via Presidio + HuggingFace). Falls back to WAXELL_PROMPT_GUARD_SERVER env var |
prompt_guard_action | str | "block" | Action on violations: "block" (raise error), "warn" (log and continue), "redact" (replace with ##TYPE##). Falls back to WAXELL_PROMPT_GUARD_ACTION env var |
Behavior:
- Checks the
WAXELL_OBSERVEenvironment variable kill switch first. If set to"false","0", or"no", initialization is skipped entirely. - Idempotent: calling
init()multiple times is safe. Only the first call takes effect. - OTel tracing failure does not block the HTTP path. If tracing initialization fails, a warning is logged and the HTTP-based telemetry continues to work.
- Auto-instrumentation failure does not block manual tracing.
Example:
import waxell_observe
# Minimal setup -- auto-detects URL from env, instruments all installed libraries
waxell_observe.init(api_key="wax_sk_abc123")
# Full control
waxell_observe.init(
api_key="wax_sk_abc123",
api_url="https://acme.waxell.dev",
capture_content=True,
instrument=["openai", "anthropic"],
debug=True,
)
shutdown
import waxell_observe
waxell_observe.shutdown() -> None
Shut down waxell-observe: flush pending traces and remove auto-instrumentation.
Behavior:
- Calls
shutdown_tracing()to flush the OTel span processor and shut down the TracerProvider. - Calls
uninstrument_all()to remove monkey-patches from instrumented libraries. - Resets the internal
_initializedflag soinit()can be called again. - Safe to call even if
init()was never called.
Example:
import waxell_observe
import atexit
waxell_observe.init(api_key="wax_sk_abc123")
atexit.register(waxell_observe.shutdown)
generate_session_id
from waxell_observe import generate_session_id
generate_session_id() -> str
Generate a random session ID for grouping related runs.
Returns: A string in the format sess_ followed by 16 hex characters (e.g. sess_a1b2c3d4e5f6g7h8).
Example:
from waxell_observe import generate_session_id, WaxellContext
session = generate_session_id()
async with WaxellContext(agent_name="agent-1", session_id=session) as ctx:
...
async with WaxellContext(agent_name="agent-2", session_id=session) as ctx:
...
Top-Level Convenience Functions
These functions operate on the current WaxellContext in scope. They are no-ops when called outside of an active context, making them safe to use in code that may or may not be wrapped by @observe or WaxellContext.
tag
import waxell_observe
waxell_observe.tag(key: str, value: str) -> None
Set a searchable tag on the current context. No-op if no context is active.
Example:
from waxell_observe import observe
import waxell_observe
@observe(agent_name="my-agent")
async def run_agent(query: str) -> str:
waxell_observe.tag("environment", "production")
waxell_observe.tag("pipeline", "rag-v2")
return await process(query)
metadata
import waxell_observe
waxell_observe.metadata(key: str, value: Any) -> None
Set metadata on the current context. Values can be any JSON-serializable type. No-op if no context is active.
Example:
waxell_observe.metadata("model_version", "gpt-4-turbo")
waxell_observe.metadata("config", {"temperature": 0.7})
score
import waxell_observe
waxell_observe.score(
name: str,
value: float | str | bool,
data_type: str = "numeric",
comment: str = "",
) -> None
Record a score on the current context. No-op if no context is active.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Score name |
value | float | str | bool | (required) | Score value |
data_type | str | "numeric" | "numeric", "categorical", or "boolean" |
comment | str | "" | Optional comment |
Example:
waxell_observe.score("relevance", 0.95)
waxell_observe.score("helpful", True, data_type="boolean")
waxell_observe.score("category", "informational", data_type="categorical")
decide
import waxell_observe
waxell_observe.decide(
name: str,
chosen: str,
options: list[str] | None = None,
reasoning: str = "",
confidence: float | None = None,
) -> None
Record a decision on the current context. No-op if no context is active.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Decision name |
chosen | str | (required) | The selected option |
options | list[str] | None | None | Available choices |
reasoning | str | "" | Why this option was chosen |
confidence | float | None | None | Confidence score (0.0-1.0) |
Example:
waxell_observe.decide(
"route_query",
chosen="semantic_search",
options=["semantic", "keyword", "hybrid"],
reasoning="Query contains natural language phrasing",
confidence=0.9,
)
step
waxell_observe.step(name: str, output: dict | None = None) -> None
Record an execution step on the current context. No-op if no context is active.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name |
output | dict | None | None | Step output data |
Example:
waxell_observe.step("preprocessing", output={"tokens": 150, "language": "en"})
waxell_observe.step("validation") # output is optional
reason
waxell_observe.reason(
step: str,
thought: str,
evidence: list[str] | None = None,
conclusion: str = "",
) -> None
Record a reasoning step on the current context. No-op if no context is active.
| Parameter | Type | Default | Description |
|---|---|---|---|
step | str | (required) | Reasoning step name |
thought | str | (required) | The reasoning thought process |
evidence | list[str] | None | None | Supporting evidence |
conclusion | str | "" | Final conclusion |
Example:
waxell_observe.reason(
"source_evaluation",
thought="Document A is from a peer-reviewed journal",
evidence=["Published 2024", "Cited 45 times"],
conclusion="High reliability source",
)
retrieve
waxell_observe.retrieve(
query: str,
documents: list[dict],
source: str = "",
scores: list[float] | None = None,
) -> None
Record a retrieval operation on the current context. No-op if no context is active.
| Parameter | Type | Default | Description |
|---|---|---|---|
query | str | (required) | The search query |
documents | list[dict] | (required) | Retrieved documents |
source | str | "" | Data source name |
scores | list[float] | None | None | Relevance scores |
Example:
waxell_observe.retrieve(
query="AI safety best practices",
documents=[{"id": "doc1", "title": "Safety Guide"}],
source="pinecone",
scores=[0.95],
)
retry
waxell_observe.retry(
attempt: int,
reason: str,
strategy: str = "retry",
original_error: str = "",
fallback_to: str = "",
) -> None
Record a retry/fallback event on the current context. No-op if no context is active.
| Parameter | Type | Default | Description |
|---|---|---|---|
attempt | int | (required) | Attempt number (1-indexed) |
reason | str | (required) | Why the retry occurred |
strategy | str | "retry" | Strategy: "retry", "fallback", "circuit_break" |
original_error | str | "" | Error that triggered the retry |
fallback_to | str | "" | Fallback target name |
Example:
waxell_observe.retry(
attempt=2,
reason="Rate limit exceeded",
strategy="fallback",
original_error="429 Too Many Requests",
fallback_to="gpt-4o-mini",
)
get_context
import waxell_observe
waxell_observe.get_context() -> WaxellContext | None
Get the current WaxellContext if one is active, otherwise None.
Example:
ctx = waxell_observe.get_context()
if ctx:
ctx.record_llm_call(model="gpt-4o", tokens_in=100, tokens_out=50)
input
import waxell_observe
waxell_observe.input(prompt: str = "", *, action: str = "input") -> str
Drop-in replacement for Python's built-in input(). Calls input(prompt) and records the prompt, response, and elapsed time as a human_turn IO span.
Falls back to plain input() if called outside a WaxellContext.
Example:
answer = waxell_observe.input("Approve? (y/n): ")
human_turn
import waxell_observe
waxell_observe.human_turn(
prompt: str = "",
channel: str = "terminal",
action: str = "",
metadata: dict | None = None,
) -> HumanTurn | _NoOpHumanTurn
Returns a context manager that captures a human interaction as a timed IO span. Use for non-terminal channels (Slack, webhooks, UI).
Returns a no-op if called outside a WaxellContext.
Example:
with waxell_observe.human_turn(prompt="Deploy?", channel="slack", action="approval") as turn:
response = await wait_for_reaction()
turn.set_response(response)
human_interaction
import waxell_observe
waxell_observe.human_interaction(
prompt: str = "",
response: str = "",
channel: str = "terminal",
action: str = "",
elapsed_seconds: float | None = None,
metadata: dict | None = None,
) -> None
Record a completed human interaction. One-shot alternative to human_turn() when you already have all the data.
No-op if called outside a WaxellContext.
Example:
waxell_observe.human_interaction(
prompt="Pick target",
response="staging",
channel="slack",
elapsed_seconds=12.5,
)
prompt_approval
import waxell_observe
waxell_observe.prompt_approval(error: PolicyViolationError) -> ApprovalDecision
Built-in on_policy_block handler. Prints a terminal banner with the block reason, approvers, and timeout, then prompts y/n.
Example:
@waxell_observe.observe(
agent_name="my-agent",
enforce_policy=True,
on_policy_block=waxell_observe.prompt_approval,
)
async def my_function():
...
auto_approve / auto_deny
import waxell_observe
waxell_observe.auto_approve(error: PolicyViolationError) -> ApprovalDecision
waxell_observe.auto_deny(error: PolicyViolationError) -> ApprovalDecision
Test helpers. auto_approve always returns ApprovalDecision(approved=True). auto_deny always returns ApprovalDecision(approved=False).
Drop-in Imports
Pre-instrumented modules that you can import directly, no init() required:
from waxell_observe.openai import openai
from waxell_observe.anthropic import anthropic
These modules are thin wrappers around the real SDKs with auto-instrumentation already applied. All OpenAI/Anthropic calls made through these imports are automatically traced.
Example:
from waxell_observe.openai import openai
client = openai.OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced with model, tokens, cost, latency
Instrumentation Functions
instrument_all
from waxell_observe.instrumentors import instrument_all
instrument_all(libraries: list[str] | None = None) -> dict[str, bool]
Manually instrument LLM libraries. Called automatically by init(), but can be called directly if needed.
| Parameter | Type | Default | Description |
|---|---|---|---|
libraries | list[str] | None | None | Libraries to instrument. None means auto-detect all installed |
Returns: A dict mapping library name to whether instrumentation succeeded (e.g. {"openai": True, "anthropic": True}).
Supported libraries: 157+ LLM providers, vector databases, embedding models, frameworks, and more. Core providers include openai, anthropic, litellm, groq, huggingface, gemini, cohere, mistral, together, ai21, bedrock, vertex_ai. See Auto-Instrumentation for the full list.
uninstrument_all
from waxell_observe.instrumentors import uninstrument_all
uninstrument_all() -> None
Remove all instrumentation patches. Called automatically by shutdown().
OpenTelemetry Functions
These functions manage the OTel tracing layer. They are called automatically by init() and shutdown(), but can be used directly for advanced control.
init_tracing
from waxell_observe.tracing import init_tracing
init_tracing(
api_url: str | None = None,
api_key: str | None = None,
otel_endpoint: str | None = None,
tenant_id: str | None = None,
debug: bool | None = None,
capture_content: bool = False,
shutdown_on_exit: bool = True,
resource_attributes: dict | None = None,
) -> None
Initialize OpenTelemetry tracing with OTLP HTTP export to the Waxell backend.
| Parameter | Type | Default | Description |
|---|---|---|---|
api_url | str | None | None | Waxell API URL. Resolved from config if not provided |
api_key | str | None | None | Waxell API key. Resolved from config if not provided |
otel_endpoint | str | None | None | Explicit OTel collector endpoint. Auto-discovered if not provided |
tenant_id | str | None | None | Explicit tenant ID for trace routing. Auto-discovered from API key if not provided |
debug | bool | None | None | Enable debug logging and console span export. Defaults to WAXELL_DEBUG env var |
capture_content | bool | False | Include prompt/response content in spans |
shutdown_on_exit | bool | True | Register atexit handler for clean shutdown |
resource_attributes | dict | None | None | Extra OTel resource attributes |
flush_tracing
from waxell_observe.tracing import flush_tracing
flush_tracing(timeout_millis: int = 30000) -> None
Force flush pending spans to the backend.
shutdown_tracing
from waxell_observe.tracing import shutdown_tracing
shutdown_tracing() -> None
Shut down the TracerProvider and flush remaining spans.
WaxellObserveClient
from waxell_observe import WaxellObserveClient
HTTP client for the Waxell Observe API. Handles configuration resolution, authentication, and all API interactions.
Constructor
WaxellObserveClient(
api_url: str | None = None,
api_key: str | None = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
api_url | str | None | None | Control plane URL. Overrides all other config sources |
api_key | str | None | None | API key. Overrides all other config sources |
Class Methods
configure
@classmethod
WaxellObserveClient.configure(api_url: str, api_key: str) -> None
Set global configuration for all instances. Call once at application startup. All subsequent WaxellObserveClient() instances will use these values (unless overridden by constructor arguments).
get_config
@classmethod
WaxellObserveClient.get_config() -> ObserveConfig | None
Returns the current global configuration, or None if configure() has not been called.
is_configured
@classmethod
WaxellObserveClient.is_configured() -> bool
Returns True if global configuration is set and both api_url and api_key are non-empty.
Async Methods
start_run
async start_run(
agent_name: str,
workflow_name: str = "default",
inputs: dict | None = None,
metadata: dict | None = None,
trace_id: str = "",
user_id: str = "",
user_group: str = "",
session_id: str = "",
parent_workflow_id: str = "",
root_workflow_id: str = "",
) -> RunInfo
Start an execution run on the control plane.
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name | str | (required) | Agent name |
workflow_name | str | "default" | Workflow name |
inputs | dict | None | None | Input data for the run |
metadata | dict | None | None | Arbitrary metadata |
trace_id | str | "" | External trace ID for correlation |
user_id | str | "" | User identifier for per-user analytics |
user_group | str | "" | User group for authorization policies |
session_id | str | "" | Session ID for grouping related runs |
parent_workflow_id | str | "" | Parent workflow ID for nested agent lineage |
root_workflow_id | str | "" | Root workflow ID for top-level lineage tracking |
Returns: RunInfo with run_id, workflow_id, and started_at.
complete_run
async complete_run(
run_id: str,
result: dict | None = None,
status: str = "success",
error: str = "",
error_type: str = "",
traceback: str = "",
steps: list | None = None,
trace_id: str = "",
root_span_id: str = "",
) -> RunCompleteResult
Complete an execution run. Returns governance info including retry feedback.
| Parameter | Type | Default | Description |
|---|---|---|---|
run_id | str | (required) | Run ID from start_run |
result | dict | None | None | Result data |
status | str | "success" | "success" or "error" |
error | str | "" | Error message |
error_type | str | "" | Exception class name (e.g. "ValueError") |
traceback | str | "" | Full traceback string |
steps | list | None | None | Additional steps |
trace_id | str | "" | OTel trace ID for correlation |
root_span_id | str | "" | OTel root span ID for correlation |
Returns: RunCompleteResult with run_id, duration, governance_action, governance_reason, retry_feedback, and max_retries.
record_llm_calls
async record_llm_calls(
run_id: str,
calls: list[dict],
) -> dict
Record one or more LLM calls for a run. No-op if calls is empty.
| Parameter | Type | Description |
|---|---|---|
run_id | str | Run ID |
calls | list[dict] | List of LLM call dicts with keys: model, tokens_in, tokens_out, and optionally cost, task, prompt_preview, response_preview |
Returns: Server response dict (includes governance field for mid-execution governance).
record_steps
async record_steps(
run_id: str,
steps: list[dict],
) -> dict
Record execution steps for a run. No-op if steps is empty.
| Parameter | Type | Description |
|---|---|---|
run_id | str | Run ID |
steps | list[dict] | List of step dicts with keys: step_name and optionally output, position |
Returns: Server response dict (includes governance field for mid-execution governance).
record_scores
async record_scores(
run_id: str,
scores: list[dict],
) -> dict
Record scores (user feedback, evaluation results) for a run.
| Parameter | Type | Description |
|---|---|---|
run_id | str | Run ID |
scores | list[dict] | List of score dicts. Each dict should contain: name, data_type ("numeric", "categorical", or "boolean"), and either numeric_value or string_value depending on data type. Optional: comment |
No-op if scores is empty. Returns: Server response dict.
Example:
await client.record_scores(run_id, scores=[
{"name": "accuracy", "data_type": "numeric", "numeric_value": 0.95},
{"name": "thumbs_up", "data_type": "boolean", "numeric_value": 1.0, "string_value": "true"},
{"name": "category", "data_type": "categorical", "string_value": "helpful", "comment": "User feedback"},
])
get_prompt
async get_prompt(
name: str,
*,
label: str = "",
version: int = 0,
) -> PromptInfo
Fetch a prompt from the control plane. Returns the prompt content, config, and a compile() helper for template rendering.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Prompt name |
label | str | "" | Label to fetch (e.g. "production"). If empty, fetches latest version |
version | int | 0 | Specific version number. Takes precedence over label if both provided |
Returns: PromptInfo with name, version, prompt_type, content, config, labels, and compile() method.
Example:
# Fetch by label (recommended for production)
prompt = await client.get_prompt("summarizer", label="production")
rendered = prompt.compile(topic="AI safety", length="short")
# Fetch specific version
prompt = await client.get_prompt("summarizer", version=3)
# Fetch latest
prompt = await client.get_prompt("summarizer")
check_policy
async check_policy(
agent_name: str,
workflow_name: str = "",
agent_id: str = "",
) -> PolicyCheckResult
Check if execution is allowed by policies.
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name | str | (required) | Agent name |
workflow_name | str | "" | Workflow name for scoped policies |
agent_id | str | "" | Specific agent instance ID |
Returns: PolicyCheckResult.
record_events
async record_events(events: list[dict]) -> None
Record governance events. No-op if events is empty.
close
async close() -> None
Close the underlying HTTP client. Call this when you are done using the client.
Sync Methods
Each async method has a synchronous counterpart that uses asyncio.run() internally:
| Async Method | Sync Method |
|---|---|
start_run() | start_run_sync() |
complete_run() | complete_run_sync() |
record_llm_calls() | record_llm_calls_sync() |
record_steps() | record_steps_sync() |
record_scores() | record_scores_sync() |
record_spans() | record_spans_sync() |
check_policy() | check_policy_sync() |
record_events() | record_events_sync() |
get_prompt() | get_prompt_sync() |
Sync methods accept the same keyword arguments as their async counterparts.
Sync methods cannot be used inside an already-running async event loop. If a running event loop is detected, the SDK delegates to a background thread with a 60-second timeout. Use the async versions in async code when possible.
WaxellContext
from waxell_observe import WaxellContext
Context manager (sync and async) that wraps agent execution with observability and governance.
Constructor
WaxellContext(
agent_name: str,
workflow_name: str = "default",
inputs: dict | None = None,
metadata: dict | None = None,
client: WaxellObserveClient | None = None,
enforce_policy: bool = True,
session_id: str = "",
user_id: str = "",
user_group: str = "",
mid_execution_governance: bool = False,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name | str | (required) | Agent name |
workflow_name | str | "default" | Workflow name |
inputs | dict | None | None | Input data for the run |
metadata | dict | None | None | Arbitrary metadata |
client | WaxellObserveClient | None | None | Pre-configured client. If None, creates one using global config |
enforce_policy | bool | True | Check policies on entry |
session_id | str | "" | Session ID for grouping related runs. Use generate_session_id() to create one |
user_id | str | "" | User identifier for per-user analytics and tracking |
user_group | str | "" | User group for authorization policies (e.g., "enterprise", "free") |
mid_execution_governance | bool | False | Enable cooperative mid-execution governance. When True, each record_step() flushes data and checks for policy violations |
Usage
Works as both async with (async code) and plain with (sync code):
# Async
async with WaxellContext(agent_name="my-agent") as ctx:
result = await my_agent.run(query)
ctx.set_result({"output": result})
# Sync
with WaxellContext(agent_name="my-agent") as ctx:
result = my_agent.run(query)
ctx.set_result({"output": result})
With session and user tracking:
from waxell_observe import WaxellContext, generate_session_id
session = generate_session_id()
async with WaxellContext(
agent_name="my-agent",
session_id=session,
user_id="user_456",
) as ctx:
ctx.set_tag("environment", "production")
ctx.set_metadata("request_id", "req_abc123")
result = await my_agent.run(query)
ctx.record_score("relevance", 0.92)
ctx.set_result({"output": result})
Lifecycle
On enter (__aenter__ / __enter__):
- Checks policies (if
enforce_policy=True). RaisesPolicyViolationErrorif blocked. - Starts an execution run on the control plane.
session_idanduser_idare injected into the run metadata. - Creates an OTel agent span (if tracing is initialized). Session and user IDs are set as span attributes.
- Sets the ContextVar so auto-instrumented LLM calls are associated with this run.
On exit (__aexit__ / __exit__):
- Flushes buffered LLM calls via
record_llm_calls. - Flushes buffered steps via
record_steps. - Flushes buffered scores via
record_scores. - Flushes buffered behavior spans via
record_spans. - Completes the run with result or error status.
- Ends the OTel agent span and clears the ContextVar.
The sync path (__enter__ / __exit__) uses synchronous HTTP calls and sets the ContextVar in the calling thread, ensuring auto-instrumentation works correctly.
Methods
record_llm_call
record_llm_call(
*,
model: str,
tokens_in: int,
tokens_out: int,
cost: float = 0.0,
task: str = "",
prompt_preview: str = "",
response_preview: str = "",
duration_ms: int | None = None,
provider: str = "",
) -> None
Buffer an LLM call for later flushing. All parameters are keyword-only. If cost is 0.0, it is automatically estimated using built-in model pricing. Also emits an OTel LLM span (if tracing is initialized).
record_step
record_step(step_name: str, output: dict | None = None) -> None
Buffer an execution step. Steps are automatically numbered in the order they are recorded (via an internal position counter). Also emits an OTel step span.
If mid_execution_governance is enabled, this method also flushes buffered data to the server and checks the governance response. Raises PolicyViolationError if the server returns a block action.
set_result
set_result(result: dict) -> None
Set the result to include when the run is completed.
record_score
record_score(
name: str,
value: float | str | bool,
data_type: str = "numeric",
comment: str = "",
) -> None
Buffer a score (user feedback or evaluation result) for the current run. Scores are flushed to the server when the context exits.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Score name (e.g. "thumbs_up", "accuracy", "relevance") |
value | float | str | bool | (required) | Score value. Type depends on data_type |
data_type | str | "numeric" | One of "numeric", "categorical", or "boolean" |
comment | str | "" | Optional free-text comment |
Value handling by data type:
"numeric":valueis stored asnumeric_value(converted to float)"boolean":valueis stored as bothnumeric_value(1.0for truthy,0.0for falsy) andstring_value("true"or"false")"categorical":valueis stored asstring_value(converted to string)
Example:
async with WaxellContext(agent_name="my-agent") as ctx:
result = await run_agent(query)
ctx.record_score("accuracy", 0.95)
ctx.record_score("thumbs_up", True, data_type="boolean")
ctx.record_score("category", "helpful", data_type="categorical", comment="User selected")
ctx.set_result({"output": result})
set_tag
set_tag(key: str, value: str) -> None
Set a searchable tag on the current agent span. Tags are string key-value pairs that become OTel span attributes with the waxell.tag. prefix.
| Parameter | Type | Description |
|---|---|---|
key | str | Tag name (alphanumeric, underscores, hyphens) |
value | str | Tag value (string) |
Tags are queryable in Grafana TraceQL:
{ span.waxell.tag.environment = "production" }
Example:
async with WaxellContext(agent_name="my-agent") as ctx:
ctx.set_tag("environment", "production")
ctx.set_tag("customer_tier", "enterprise")
ctx.set_tag("region", "us-east-1")
set_metadata
set_metadata(key: str, value: Any) -> None
Set metadata on the current agent span. Unlike tags, metadata values can be any JSON-serializable type. Complex values are automatically JSON-serialized for OTel compatibility.
| Parameter | Type | Description |
|---|---|---|
key | str | Metadata key |
value | Any | Any JSON-serializable value |
Metadata is queryable in Grafana TraceQL:
{ span.waxell.meta.request_id != nil }
Example:
async with WaxellContext(agent_name="my-agent") as ctx:
ctx.set_metadata("request_id", "req_abc123")
ctx.set_metadata("config", {"temperature": 0.7, "max_tokens": 1000})
ctx.set_metadata("retry_count", 2)
Behavior Tracking Methods
These methods buffer behavior data as spans, flushed to the server on context exit via the POST /runs/{run_id}/spans/ endpoint.
record_tool_call
record_tool_call(
*,
name: str,
input: dict | str = "",
output: dict | str = "",
duration_ms: int | None = None,
status: str = "ok",
tool_type: str = "function",
error: str = "",
) -> None
Buffer a tool/function call event.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Tool name (e.g. "web_search", "database_query") |
input | dict | str | "" | Tool input parameters |
output | dict | str | "" | Tool output/result |
duration_ms | int | None | None | Execution time in milliseconds |
status | str | "ok" | "ok" or "error" |
tool_type | str | "function" | "function", "api", "database", or "retriever" |
error | str | "" | Error message if status is "error" |
record_retrieval
record_retrieval(
*,
query: str,
documents: list[dict],
source: str = "",
duration_ms: int | None = None,
top_k: int | None = None,
scores: list[float] | None = None,
) -> None
Buffer a RAG retrieval operation.
| Parameter | Type | Default | Description |
|---|---|---|---|
query | str | (required) | The retrieval query string |
documents | list[dict] | (required) | Retrieved docs (e.g. [{id, title, score, snippet}]) |
source | str | "" | Data source name (e.g. "pinecone", "elasticsearch") |
duration_ms | int | None | None | Retrieval time in milliseconds |
top_k | int | None | None | Number of documents requested |
scores | list[float] | None | None | Relevance scores for each retrieved document |
record_decision
record_decision(
*,
name: str,
options: list[str],
chosen: str,
reasoning: str = "",
confidence: float | None = None,
metadata: dict | None = None,
) -> None
Buffer a decision/routing point.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Decision name (e.g. "route_to_agent", "select_model") |
options | list[str] | (required) | Available choices |
chosen | str | (required) | The selected option |
reasoning | str | "" | Why this option was chosen |
confidence | float | None | None | Confidence score (0.0-1.0) |
metadata | dict | None | None | Additional context |
record_reasoning
record_reasoning(
*,
step: str,
thought: str,
evidence: list[str] | None = None,
conclusion: str = "",
) -> None
Buffer a reasoning/chain-of-thought step.
| Parameter | Type | Default | Description |
|---|---|---|---|
step | str | (required) | Reasoning step name |
thought | str | (required) | The reasoning text/thought process |
evidence | list[str] | None | None | Supporting evidence or references |
conclusion | str | "" | Conclusion reached at this step |
record_retry
record_retry(
*,
attempt: int,
reason: str,
strategy: str = "retry",
original_error: str = "",
fallback_to: str = "",
max_attempts: int | None = None,
) -> None
Buffer a retry or fallback event.
| Parameter | Type | Default | Description |
|---|---|---|---|
attempt | int | (required) | Current attempt number (1-based) |
reason | str | (required) | Why a retry/fallback occurred |
strategy | str | "retry" | "retry", "fallback", or "circuit_break" |
original_error | str | "" | The error that triggered the retry |
fallback_to | str | "" | Name of fallback target (model, agent, tool) |
max_attempts | int | None | None | Maximum attempts configured |
check_policy / check_policy_sync
# Async
async check_policy() -> PolicyCheckResult
# Sync
check_policy_sync() -> PolicyCheckResult
Perform a mid-execution policy check. Returns a PolicyCheckResult. Use check_policy_sync() in synchronous code.
Properties
| Property | Type | Description |
|---|---|---|
run_id | str | The run ID from the control plane, or "" if not started |
session_id | str | The session ID passed to the constructor |
user_id | str | The user ID passed to the constructor |
user_group | str | The user group passed to the constructor |
@observe / @waxell_agent
from waxell_observe import observe # Alias for waxell_agent
from waxell_observe import waxell_agent # Original decorator (identical to observe)
Decorator that adds observability and governance to any function. @observe and @waxell_agent are identical -- use whichever reads better in your codebase.
Signature
@observe(
agent_name: str | None = None,
workflow_name: str = "default",
enforce_policy: bool = True,
capture_io: bool = True,
session_id: str = "",
user_id: str = "",
user_group: str = "",
mid_execution_governance: bool = False,
client: WaxellObserveClient | None = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name | str | None | None | Agent name. Defaults to the function name |
workflow_name | str | "default" | Workflow name |
enforce_policy | bool | True | Check policies before execution |
capture_io | bool | True | Capture function inputs and outputs |
session_id | str | "" | Session ID for grouping related runs |
user_id | str | "" | End-user ID for attribution and analytics |
user_group | str | "" | User group for authorization policies |
mid_execution_governance | bool | False | Flush data and check governance on each record_step() call |
client | WaxellObserveClient | None | None | Pre-configured client |
Context Injection
If the decorated function has a waxell_ctx parameter, a WaxellContext instance is injected automatically:
@observe(agent_name="my-agent")
async def my_func(query: str, waxell_ctx=None) -> str:
if waxell_ctx:
waxell_ctx.record_llm_call(model="gpt-4o", tokens_in=100, tokens_out=50)
waxell_ctx.record_score("relevance", 0.9)
waxell_ctx.set_tag("source", "api")
return "result"
Example
import waxell_observe as waxell
@waxell.observe(agent_name="my-agent")
async def chat(query: str) -> str:
response = await openai_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}],
) # auto-captured by instrumentation
waxell.score("helpfulness", 0.9)
waxell.tag("intent", "question")
return response.choices[0].message.content
# Creates a full run with LLM call, score, and tag — all auto-recorded
Behavior
- Async functions are wrapped with an async wrapper
- Sync functions are wrapped with a sync wrapper that uses
asyncio.run()internally - On success, the run is completed with
status="success"and the captured return value - On exception, the run is completed with
status="error"and the error message; the exception is re-raised
@tool
from waxell_observe import tool
Decorator that auto-records function calls as tool invocations on the current WaxellContext.
Signature
@tool(name: str | None = None, tool_type: str = "function")
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None | Tool name. Defaults to function name |
tool_type | str | "function" | Classification: "function", "vector_db", "database", "api" |
Captures: function arguments as input, return value as output, execution time, status ("ok" or "error"). Re-raises any exceptions. No-op outside a WaxellContext. Works with sync and async functions.
Example:
import waxell_observe as waxell
@waxell.tool(tool_type="vector_db")
def search_index(query_vec, k: int = 5):
distances, indices = index.search(query_vec, k)
return {"distances": distances.tolist(), "indices": indices.tolist()}
# Auto-records: tool_call(name="search_index", input={...}, output={...}, duration_ms=...)
@decision
from waxell_observe import decision
Decorator that auto-records a function's return value as a decision.
Signature
@decision(name: str | None = None, options: list[str] | None = None)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None | Decision name. Defaults to function name |
options | list[str] | None | None | Available choices |
Return value handling: dict returns extract chosen, reasoning, confidence. String returns use the string as chosen.
Example:
import waxell_observe as waxell
@waxell.decision(name="route_query", options=["factual", "analytical", "creative"])
async def classify_query(query: str) -> dict:
response = await client.chat.completions.create(...)
return {"chosen": "factual", "reasoning": "Direct question", "confidence": 0.92}
# Dict return: extracts chosen, reasoning, confidence automatically
@retrieval
from waxell_observe import retrieval
Decorator that auto-records function calls as retrieval operations.
Signature
@retrieval(source: str = "", name: str | None = None)
| Parameter | Type | Default | Description |
|---|---|---|---|
source | str | "" | Data source name (e.g., "faiss", "pinecone") |
name | str | None | None | Override name. Defaults to function name |
Extracts query from the first string argument, documents from the return value, and scores from doc["score"] fields.
Example:
import waxell_observe as waxell
@waxell.retrieval(source="faiss")
def search_documents(query: str, corpus: list) -> list[dict]:
return [{"id": 1, "title": "Result", "score": 0.95}]
# Auto-extracts: query from first str arg, documents from return, scores from "score" keys
@reasoning_dec
from waxell_observe import reasoning_dec
Decorator that auto-records a function's return value as a reasoning step.
Signature
@reasoning_dec(step: str | None = None)
| Parameter | Type | Default | Description |
|---|---|---|---|
step | str | None | None | Reasoning step name. Defaults to function name |
Return value handling: dict returns extract thought, evidence, conclusion. String returns use the string as thought.
Example:
import waxell_observe as waxell
@waxell.reasoning_dec(step="quality_check")
async def assess_answer(answer: str) -> dict:
return {"thought": "Answer covers sources", "evidence": ["A cited"], "conclusion": "High quality"}
# Dict return: extracts thought, evidence, conclusion
@retry_dec
from waxell_observe import retry_dec
Decorator that wraps a function with retry logic AND records each attempt.
Signature
@retry_dec(max_attempts: int = 3, strategy: str = "retry", fallback_to: str = "")
| Parameter | Type | Default | Description |
|---|---|---|---|
max_attempts | int | 3 | Maximum attempts (including first) |
strategy | str | "retry" | "retry", "fallback", or "circuit_break" |
fallback_to | str | "" | Fallback target name |
On each failure, records a retry span. After exhausting attempts, re-raises the last exception.
Example:
import waxell_observe as waxell
@waxell.retry_dec(max_attempts=3, strategy="fallback", fallback_to="gpt-4o-mini")
async def call_llm(prompt: str) -> str:
response = await client.chat.completions.create(model="gpt-4o", messages=[...])
return response.choices[0].message.content
# Retries up to 3 times, records each attempt as a retry span
@step_dec
from waxell_observe import step_dec
Decorator that auto-records function calls as execution steps.
Signature
@step_dec(name: str | None = None)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | None | None | Step name. Defaults to function name |
Records the function's return value as the step output. No-op outside a WaxellContext.
Example:
import waxell_observe as waxell
@waxell.step_dec(name="preprocess")
def clean_input(text: str) -> dict:
cleaned = text.strip().lower()
return {"original": text, "cleaned": cleaned, "length": len(cleaned)}
# Return value becomes step output
WaxellLangChainHandler
from waxell_observe.integrations.langchain import WaxellLangChainHandler
Factory function that returns a LangChain BaseCallbackHandler instance.
Signature
WaxellLangChainHandler(
agent_name: str,
workflow_name: str = "default",
client: WaxellObserveClient | None = None,
enforce_policy: bool = True,
auto_start_run: bool = True,
) -> BaseCallbackHandler
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name | str | (required) | Agent name |
workflow_name | str | "default" | Workflow name |
client | WaxellObserveClient | None | None | Pre-configured client |
enforce_policy | bool | True | Check policies on first callback |
auto_start_run | bool | True | Automatically start a run on first callback |
Requires langchain-core. Install with pip install waxell-observe[langchain].
Instance Methods
flush
async flush(
result: dict | None = None,
status: str = "success",
error: str = "",
) -> None
Flush all buffered telemetry to the control plane and complete the run.
flush_sync
flush_sync(**kwargs) -> None
Synchronous version of flush. Accepts the same keyword arguments.
Instance Properties
| Property | Type | Description |
|---|---|---|
run_id | str | The run ID from the control plane, or "" if no run started |
Captured Callbacks
| Callback | Data Captured |
|---|---|
on_llm_start | Model name, prompt preview (500 chars) |
on_llm_end | Token counts, cost estimate, response preview (500 chars) |
on_chain_start | Chain name as a step |
on_chain_end | Chain output |
on_tool_start | Tool name as a step (prefixed tool:) |
on_tool_end | Tool output (1000 chars) |
Types
RunInfo
from waxell_observe import RunInfo
@dataclass
class RunInfo:
run_id: str
workflow_id: str
started_at: str
Information about a started execution run. Returned by WaxellObserveClient.start_run().
RunCompleteResult
from waxell_observe import RunCompleteResult
@dataclass
class RunCompleteResult:
run_id: str
duration: float | None = None
governance_action: str = "allow"
governance_reason: str = ""
retry_feedback: str = ""
max_retries: int = 0
@property
def should_retry(self) -> bool: ... # True if governance_action == "retry"
Result from completing a run, including governance info. Returned by WaxellObserveClient.complete_run().
| Field | Type | Default | Description |
|---|---|---|---|
run_id | str | (required) | The run ID |
duration | float | None | None | Run duration in seconds |
governance_action | str | "allow" | Post-execution governance action |
governance_reason | str | "" | Reason for governance action |
retry_feedback | str | "" | Feedback for retry attempts |
max_retries | int | 0 | Maximum retry attempts allowed |
PolicyCheckResult
from waxell_observe import PolicyCheckResult
@dataclass
class PolicyCheckResult:
action: str # "allow", "block", "warn", "throttle", "retry"
reason: str = ""
metadata: dict = field(default_factory=dict)
evaluations: list = field(default_factory=list)
@property
def allowed(self) -> bool: ... # True if action in ("allow", "warn")
@property
def blocked(self) -> bool: ... # True if action in ("block", "throttle")
@property
def should_retry(self) -> bool: ... # True if action == "retry"
Result of a policy check. Returned by WaxellObserveClient.check_policy() and WaxellContext.check_policy().
LlmCallInfo
from waxell_observe import LlmCallInfo
@dataclass
class LlmCallInfo:
model: str
tokens_in: int
tokens_out: int
cost: float = 0.0
task: str = ""
prompt_preview: str = ""
response_preview: str = ""
Typed representation of an LLM API call. Useful for constructing call records programmatically.
PromptInfo
from waxell_observe import PromptInfo
@dataclass
class PromptInfo:
name: str
version: int
prompt_type: str # "text" or "chat"
content: object # str for text, list[dict] for chat
config: dict = field(default_factory=dict)
labels: list = field(default_factory=list)
def compile(self, **variables: str) -> object: ...
A prompt version retrieved from the control plane. Returned by WaxellObserveClient.get_prompt().
| Field | Type | Description |
|---|---|---|
name | str | Prompt name |
version | int | Version number |
prompt_type | str | "text" for plain text prompts, "chat" for chat message prompts |
content | str | list[dict] | Prompt content. A string for text prompts, a list of {"role": ..., "content": ...} message dicts for chat prompts |
config | dict | Model configuration (e.g. temperature, max_tokens, model) |
labels | list[str] | Labels attached to this version (e.g. ["production", "latest"]) |
compile
prompt.compile(**variables: str) -> str | list[dict]
Render the prompt by replacing {{variable}} placeholders in the content.
- Text prompts: Returns a string with all
{{variable}}placeholders replaced. - Chat prompts: Returns a list of message dicts with
{{variable}}placeholders replaced in each message'scontentfield.
Example:
# Text prompt
prompt = await client.get_prompt("summarizer", label="production")
rendered = prompt.compile(topic="AI safety", length="short")
# rendered: "Summarize the following about AI safety in short form: ..."
# Chat prompt
prompt = await client.get_prompt("assistant", label="production")
messages = prompt.compile(user_query="What is RAG?")
# messages: [{"role": "system", "content": "..."}, {"role": "user", "content": "What is RAG?"}]
# Use config for model parameters
response = openai.chat.completions.create(
model=prompt.config.get("model", "gpt-4o"),
messages=messages,
temperature=prompt.config.get("temperature", 0.7),
)
ObserveConfig
from waxell_observe import ObserveConfig
@dataclass
class ObserveConfig:
api_url: str = ""
api_key: str = ""
otel_endpoint: str = ""
debug: bool = False
capture_content: bool = False
prompt_guard: bool = False
prompt_guard_server: bool = False
prompt_guard_action: str = "block"
instrument_infra: bool = True
infra_exclude: str = ""
@classmethod
def from_env(cls) -> ObserveConfig: ...
@classmethod
def from_cli_config(cls, config_path: Path | None = None) -> ObserveConfig: ...
@property
def is_configured(self) -> bool: ...
Configuration data class. Used internally by WaxellObserveClient to resolve settings.
| Field | Type | Default | Description |
|---|---|---|---|
api_url | str | "" | Waxell API URL |
api_key | str | "" | Waxell API key |
otel_endpoint | str | "" | Explicit OTel collector endpoint |
debug | bool | False | Enable debug logging |
capture_content | bool | False | Include prompt/response content in traces |
prompt_guard | bool | False | Enable client-side prompt guard |
prompt_guard_server | bool | False | Enable server-side prompt guard |
prompt_guard_action | str | "block" | Action on violations: "block", "warn", or "redact" |
instrument_infra | bool | True | Enable infrastructure library instrumentation |
infra_exclude | str | "" | Comma-delimited list of infra libraries to exclude |
| Class Method | Description |
|---|---|
from_env() | Load from environment variables (WAXELL_API_URL/WAXELL_API_KEY or WAX_API_URL/WAX_API_KEY) |
from_cli_config(config_path=None) | Load from CLI config file (default: ~/.waxell/config) |
| Property | Type | Description |
|---|---|---|
is_configured | bool | True if both api_url and api_key are non-empty |
ApprovalDecision
from waxell_observe import ApprovalDecision
@dataclass
class ApprovalDecision:
approved: bool
approver: str = ""
timed_out: bool = False
elapsed_seconds: float | None = None
Return type from on_policy_block handlers. Tells the decorator whether to retry the function (approved=True) or propagate the PolicyViolationError.
| Field | Type | Default | Description |
|---|---|---|---|
approved | bool | required | Whether to proceed with execution |
approver | str | "" | Who approved (email, username, system) |
timed_out | bool | False | Whether the approval window expired |
elapsed_seconds | float | None | None | Time from block to decision |
HumanTurn
from waxell_observe import HumanTurn
Context manager returned by waxell.human_turn(). Records a human interaction as a timed IO span when it exits.
| Method | Description |
|---|---|
set_response(response: str) | Record what the human replied |
__enter__() | Start timing |
__exit__() | Record the span with elapsed time |
Errors
All errors inherit from ObserveError, which inherits from Exception.
ObserveError
from waxell_observe import ObserveError
Base error class for all waxell-observe errors.
PolicyViolationError
from waxell_observe import PolicyViolationError
class PolicyViolationError(ObserveError):
def __init__(self, message: str, policy_result=None): ...
policy_result: PolicyCheckResult | None
Raised when a policy check blocks execution (action is "block" or "throttle").
| Attribute | Type | Description |
|---|---|---|
policy_result | PolicyCheckResult | None | The full policy check result |
ConfigurationError
from waxell_observe import ConfigurationError
Raised when the client is not properly configured. Inherits from ObserveError.
Functions
estimate_cost
from waxell_observe.cost import estimate_cost
estimate_cost(model: str, tokens_in: int, tokens_out: int) -> float
Estimate the USD cost of an LLM call. Uses exact match first, then prefix matching for versioned model names. Returns 0.0 for unknown models.
| Parameter | Type | Description |
|---|---|---|
model | str | Model name or prefix |
tokens_in | int | Input token count |
tokens_out | int | Output token count |
Configuration Resolution
The SDK resolves configuration from multiple sources, in order of precedence (highest to lowest):
- Explicit constructor arguments --
WaxellObserveClient(api_url="...", api_key="...") - Global config --
WaxellObserveClient.configure(...)orwaxell_observe.init(...) - CLI config file --
~/.waxell/config(INI format) - Environment variables --
WAXELL_API_URL/WAXELL_API_KEY(orWAX_API_URL/WAX_API_KEY)
Environment variables:
| Variable | Description |
|---|---|
WAXELL_API_URL | Control plane URL (e.g. https://acme.waxell.dev) |
WAXELL_API_KEY | API key (e.g. wax_sk_abc123) |
WAX_API_URL | Alias for WAXELL_API_URL |
WAX_API_KEY | Alias for WAXELL_API_KEY |
WAXELL_OBSERVE | Kill switch. Set to false, 0, or no to disable init() |
Next Steps
- REST API Reference -- Direct HTTP API usage
- Quickstart -- Get started in 5 minutes
- Installation & Configuration -- Setup guide