Skip to main content

Anthropic Integration

Add observability to Anthropic Claude API calls with auto-instrumentation plus decorators for structure.

Quick Start

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")

# Import Anthropic AFTER init() -- now auto-instrumented
import anthropic

client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced with model, tokens, cost

Drop-in Import

Alternative approach using pre-instrumented module:

from waxell_observe.anthropic import anthropic

client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced

For multi-step Claude pipelines, wrap your function with @observe. Every client.messages.create(...) call is auto-captured -- no manual record_llm_call needed.

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")

import anthropic
from waxell_observe.errors import PolicyViolationError

client = anthropic.AsyncAnthropic()


@waxell.observe(
agent_name="anthropic-demo",
workflow_name="content-analysis",
enforce_policy=True,
)
async def analyze_content(query: str) -> dict:
waxell.tag("demo", "anthropic")
waxell.tag("provider", "anthropic")
waxell.metadata("sdk", "anthropic-python")

# Step 1: Classify content (auto-captured)
classify_response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Classify this text: {query}",
}],
)
classification = classify_response.content[0].text
waxell.step("classify_content", output={"classification": classification[:200]})

# Step 2: Extract entities (auto-captured)
extract_response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Extract key entities from: {query}",
}],
)
entities = extract_response.content[0].text
waxell.step("extract_entities", output={"entities": entities[:200]})

# Step 3: Summarize (auto-captured)
summary_response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": (
f"Summarize:\n\n"
f"Text: {query}\n"
f"Classification: {classification}\n"
f"Entities: {entities}"
),
}],
)
summary = summary_response.content[0].text

waxell.score("completeness", 0.95)
return {
"classification": classification,
"entities": entities,
"summary": summary,
}


# Run it -- pass session_id / user_id at call time
try:
result = await analyze_content(
"Analyze the impact of AI on healthcare",
session_id="sess_health_001",
user_id="user_789",
user_group="enterprise",
)
except PolicyViolationError as e:
print(f"Policy violation: {e}")

Streaming with Anthropic

Auto-instrumentation captures Anthropic streaming end-to-end -- the SDK aggregates content, input tokens (from message_start), and output tokens (from message_delta) automatically. Just iterate the stream:

import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")

import anthropic

client = anthropic.AsyncAnthropic()

@waxell.observe(agent_name="streaming-claude")
async def stream_claude(query: str) -> str:
stream = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
)

content = ""
async for event in stream:
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
content += event.delta.text
print(event.delta.text, end="", flush=True)

return content

See Streaming Integration for advanced patterns when you need custom stream processing.

Supported Models

ModelAuto-InstrumentedCost Tracking
claude-opus-4YesYes
claude-sonnet-4YesYes
claude-3-5-sonnetYesYes
claude-3-5-haikuYesYes
claude-3-haikuYesYes

Tags and Metadata

Enrich traces with contextual information using convenience functions:

@waxell.observe(agent_name="claude-agent")
async def my_claude_agent(query: str) -> str:
# Tags: searchable in UI
waxell.tag("provider", "anthropic")
waxell.tag("model_tier", "premium")
waxell.tag("use_case", "analysis")

# Metadata: arbitrary JSON-serializable values
waxell.metadata("sdk_version", "0.25.0")
waxell.metadata("config", {"max_tokens": 500, "temperature": 0.7})

response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
)
return response.content[0].text

Error Handling

from waxell_observe.errors import PolicyViolationError

@waxell.observe(agent_name="claude-agent", enforce_policy=True)
async def my_claude_agent(query: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
)
return response.content[0].text


try:
result = await my_claude_agent("Hello")
except PolicyViolationError as e:
# Policy blocked execution (budget, rate limit, etc.)
print(f"Blocked: {e.policy_result.reason}")
except anthropic.APIError as e:
# Anthropic API error
print(f"API error: {e}")

Best Practices

  1. Call init() before importing anthropic -- enables auto-instrumentation
  2. Trust auto-instrumentation for token + cost capture -- Anthropic's input_tokens / output_tokens and message-streaming events are handled for you
  3. Wrap pipelines in @observe -- groups all Claude calls into one run
  4. Set max_tokens -- required by the Anthropic API
  5. Add a provider tag -- makes filtering easy in the UI

Advanced: Context Manager

If you need fine-grained control (batch loops, mid-execution policy checks, manual run lifecycle), see the Context Manager page. For most use cases, the decorator pattern above is the right call.

Next Steps