LangGraph Agent
A LangGraph-style stateful graph pipeline with 3 graph nodes (classify, research, synthesize) coordinated by a parent orchestrator across 2 child agents. The graph maintains explicit state that flows between nodes, with @tool decorators recording state updates and merges. This demonstrates how waxell-observe captures stateful, graph-based agent workflows with conditional routing.
This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to run without any API keys.
Architecture
Key Code
Orchestrator with graph routing via decide()
The parent agent decides the full graph route based on query classification, then delegates to runner and evaluator children.
@waxell.observe(agent_name="langgraph-orchestrator", workflow_name="stateful-graph")
async def run_pipeline(query: str, dry_run: bool = False, waxell_ctx=None):
waxell.tag("demo", "langgraph")
waxell.tag("pipeline", "stateful-graph")
waxell.metadata("num_nodes", 3)
# @step -- preprocess query
preprocessed = await preprocess_query(query)
# @decision -- classify query type (OpenAI)
classification = await classify_query_type(query=query, openai_client=openai_client)
# Manual decide() -- graph routing
waxell.decide(
"graph_route",
chosen="classify -> research -> synthesize",
options=["classify -> research -> synthesize",
"classify -> synthesize",
"research -> synthesize"],
reasoning=f"Query type '{chosen}' with depth '{depth}' -- full 3-node path",
confidence=0.95,
)
# Child agents execute graph nodes
runner_result = await run_graph_nodes(query=query, openai_client=openai_client)
eval_result = await run_graph_evaluation(
query=query, classification=runner_result["classification"],
research=runner_result["research"], state=runner_result["state"],
openai_client=openai_client,
)
Runner with stateful graph nodes, @tool, and @retrieval
The runner executes classify and research nodes, updating graph state after each node with @tool-decorated state management functions.
@waxell.tool(tool_type="graph_state")
def update_graph_state(state: dict, node_name: str, output: dict) -> dict:
"""Update the graph state after a node executes."""
updated = {**state}
updated["nodes_executed"] = state.get("nodes_executed", []) + [node_name]
updated[f"{node_name}_output"] = output
updated["last_node"] = node_name
return updated
@waxell.retrieval(source="knowledge_base")
def search_knowledge_base(query: str, documents: list, top_k: int = 3):
"""Search the knowledge base for relevant documents."""
query_words = {w.lower() for w in query.split() if len(w) > 2}
scored = []
for doc in documents:
searchable = f"{doc['title']} {doc['content']}".lower()
score = sum(1 for w in query_words if w in searchable)
if score > 0:
scored.append({**doc, "relevance_score": round(score / len(query_words), 2)})
return sorted(scored, key=lambda d: d["relevance_score"], reverse=True)[:top_k]
Evaluator with @reasoning, @decision, and score()
The evaluator synthesizes a final answer, assesses quality, and records scores.
@waxell.reasoning_dec(step="synthesis_quality")
async def assess_synthesis_quality(answer: str, research: str, classification: str):
referenced = len([p for p in research_points
if any(word in answer.lower() for word in p.lower().split()[:3])])
return {
"thought": f"Synthesis references {referenced}/{len(research_points)} research points.",
"evidence": [f"Research point: {p[:60]}..." for p in research_points[:5]],
"conclusion": "Synthesis adequately covers research",
}
@waxell.decision(name="output_format", options=["structured", "narrative", "bullet_points"])
def choose_output_format(classification: str, context: str) -> dict:
format_choice = "structured" if classification in ("analytical", "factual") else "narrative"
return {"chosen": format_choice, "reasoning": f"Query type '{classification}'"}
@waxell.tool(tool_type="output_formatter")
def format_final_output(answer: str, metadata: dict) -> dict:
return {"answer": answer[:500], "nodes_executed": metadata.get("nodes", [])}
# Scores
waxell.score("synthesis_quality", 0.90, comment="Based on research coverage")
waxell.score("graph_completeness", True, data_type="boolean",
comment="All 3 nodes executed successfully")
What this demonstrates
@waxell.observe-- parent-child agent hierarchy modeling a stateful graph@waxell.tool-- graph state updates (graph_state) and output formatting (output_formatter)@waxell.retrieval-- knowledge base search with relevance scoring@waxell.decision-- query classification and output format selectionwaxell.decide()-- manual graph routing decision with full 3-node path@waxell.reasoning_dec-- synthesis quality assessment with evidence@waxell.step_dec-- query preprocessing as an execution stepwaxell.score()-- synthesis quality (numeric) and graph completeness (boolean)waxell.tag()/waxell.metadata()-- framework, node count, and pipeline metadata- Auto-instrumented LLM calls -- three OpenAI gpt-4o-mini calls across graph nodes
- Stateful graph pattern -- explicit state object flows between nodes, updated via
@tool-decorated functions
Run it
# Dry-run (no API keys needed)
cd dev/waxell-dev
python -m app.demos.langgraph_agent --dry-run
# Live (real OpenAI)
export OPENAI_API_KEY="sk-..."
python -m app.demos.langgraph_agent
# Custom query
python -m app.demos.langgraph_agent --query "Analyze multi-agent coordination patterns"