Anthropic Integration
Add observability to Anthropic Claude API calls with auto-instrumentation plus decorators for structure.
Quick Start
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
# Import Anthropic AFTER init() -- now auto-instrumented
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced with model, tokens, cost
Drop-in Import
Alternative approach using pre-instrumented module:
from waxell_observe.anthropic import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-sonnet-4",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
# Automatically traced
Decorator Pattern (Recommended for Pipelines)
For multi-step Claude pipelines, wrap your function with @observe. Every client.messages.create(...) call is auto-captured -- no manual record_llm_call needed.
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...", api_url="https://waxell.dev")
import anthropic
from waxell_observe.errors import PolicyViolationError
client = anthropic.AsyncAnthropic()
@waxell.observe(
agent_name="anthropic-demo",
workflow_name="content-analysis",
enforce_policy=True,
)
async def analyze_content(query: str) -> dict:
waxell.tag("demo", "anthropic")
waxell.tag("provider", "anthropic")
waxell.metadata("sdk", "anthropic-python")
# Step 1: Classify content (auto-captured)
classify_response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Classify this text: {query}",
}],
)
classification = classify_response.content[0].text
waxell.step("classify_content", output={"classification": classification[:200]})
# Step 2: Extract entities (auto-captured)
extract_response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Extract key entities from: {query}",
}],
)
entities = extract_response.content[0].text
waxell.step("extract_entities", output={"entities": entities[:200]})
# Step 3: Summarize (auto-captured)
summary_response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{
"role": "user",
"content": (
f"Summarize:\n\n"
f"Text: {query}\n"
f"Classification: {classification}\n"
f"Entities: {entities}"
),
}],
)
summary = summary_response.content[0].text
waxell.score("completeness", 0.95)
return {
"classification": classification,
"entities": entities,
"summary": summary,
}
# Run it -- pass session_id / user_id at call time
try:
result = await analyze_content(
"Analyze the impact of AI on healthcare",
session_id="sess_health_001",
user_id="user_789",
user_group="enterprise",
)
except PolicyViolationError as e:
print(f"Policy violation: {e}")
Streaming with Anthropic
Auto-instrumentation captures Anthropic streaming end-to-end -- the SDK aggregates content, input tokens (from message_start), and output tokens (from message_delta) automatically. Just iterate the stream:
import waxell_observe as waxell
waxell.init(api_key="wax_sk_...")
import anthropic
client = anthropic.AsyncAnthropic()
@waxell.observe(agent_name="streaming-claude")
async def stream_claude(query: str) -> str:
stream = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
stream=True,
)
content = ""
async for event in stream:
if event.type == "content_block_delta" and hasattr(event.delta, "text"):
content += event.delta.text
print(event.delta.text, end="", flush=True)
return content
See Streaming Integration for advanced patterns when you need custom stream processing.
Supported Models
| Model | Auto-Instrumented | Cost Tracking |
|---|---|---|
| claude-opus-4 | Yes | Yes |
| claude-sonnet-4 | Yes | Yes |
| claude-3-5-sonnet | Yes | Yes |
| claude-3-5-haiku | Yes | Yes |
| claude-3-haiku | Yes | Yes |
Tags and Metadata
Enrich traces with contextual information using convenience functions:
@waxell.observe(agent_name="claude-agent")
async def my_claude_agent(query: str) -> str:
# Tags: searchable in UI
waxell.tag("provider", "anthropic")
waxell.tag("model_tier", "premium")
waxell.tag("use_case", "analysis")
# Metadata: arbitrary JSON-serializable values
waxell.metadata("sdk_version", "0.25.0")
waxell.metadata("config", {"max_tokens": 500, "temperature": 0.7})
response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
)
return response.content[0].text
Error Handling
from waxell_observe.errors import PolicyViolationError
@waxell.observe(agent_name="claude-agent", enforce_policy=True)
async def my_claude_agent(query: str) -> str:
response = await client.messages.create(
model="claude-sonnet-4",
max_tokens=500,
messages=[{"role": "user", "content": query}],
)
return response.content[0].text
try:
result = await my_claude_agent("Hello")
except PolicyViolationError as e:
# Policy blocked execution (budget, rate limit, etc.)
print(f"Blocked: {e.policy_result.reason}")
except anthropic.APIError as e:
# Anthropic API error
print(f"API error: {e}")
Best Practices
- Call
init()before importing anthropic -- enables auto-instrumentation - Trust auto-instrumentation for token + cost capture -- Anthropic's
input_tokens/output_tokensand message-streaming events are handled for you - Wrap pipelines in
@observe-- groups all Claude calls into one run - Set max_tokens -- required by the Anthropic API
- Add a
providertag -- makes filtering easy in the UI
Advanced: Context Manager
If you need fine-grained control (batch loops, mid-execution policy checks, manual run lifecycle), see the Context Manager page. For most use cases, the decorator pattern above is the right call.
Next Steps
- Streaming Integration -- Detailed streaming patterns
- LiteLLM Integration -- Use Anthropic via LiteLLM
- Multi-Agent -- Coordinate Claude agents