Skip to main content

Knowledge Graph RAG Agent

A 12-step stress test combining graph databases (Neo4j, FalkorDB) with vector databases (Pinecone, Milvus) for hybrid retrieval, followed by dual reranking (Cohere, Jina), LLM synthesis (Anthropic), evaluation (DeepEval), graph writeback (Neo4j), and session memory (Zep). Tests graph_db + vector_db + embedding + reranker + eval categories together in one pipeline.

Environment variables

This example requires ANTHROPIC_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls. All external services are mocked.

Architecture

Key Code

Graph + Vector Hybrid Retrieval

The pipeline queries both graph databases and vector databases, then merges the results for comprehensive context.

# Graph DB queries
@waxell.tool(tool_type="graph_db")
def neo4j_query(driver, query_embedding: list) -> dict:
"""Query Neo4j knowledge graph for related entities and relationships."""
return {"entities": [...], "relationships": [...], "subgraph_size": 15}

@waxell.tool(tool_type="graph_db")
def falkordb_query(client, query: str) -> dict:
"""Query FalkorDB for graph patterns matching the query."""
return {"paths": [...], "nodes": 8, "edges": 12}

# Vector DB queries
@waxell.tool(tool_type="vector_db")
def pinecone_query(index, query_embedding: list, top_k: int = 5) -> dict:
return {"matches": [...], "top_score": 0.94}

@waxell.tool(tool_type="vector_db")
def milvus_search(collection, query_embedding: list, limit: int = 5) -> dict:
return {"hits": [...], "top_distance": 0.12}

Dual Reranking and Graph Writeback

# Dual reranking
@waxell.tool(tool_type="reranking")
def cohere_rerank(query: str, documents: list, top_n: int = 5) -> dict:
return {"reranked": [...], "top_score": 0.96}

@waxell.tool(tool_type="reranking")
def jina_rerank(query: str, documents: list, top_n: int = 5) -> dict:
return {"reranked": [...], "top_score": 0.93}

# Graph writeback -- store new knowledge
@waxell.tool(tool_type="graph_db")
def neo4j_writeback(driver, entities: list, relationships: list) -> dict:
"""Write new knowledge back to Neo4j graph."""
return {"nodes_created": len(entities), "edges_created": len(relationships)}

# Session memory
@waxell.tool(tool_type="memory")
def zep_store(session_id: str, messages: list) -> dict:
"""Store conversation in Zep session memory."""
return {"stored": len(messages), "session_id": session_id}

What this demonstrates

  • @waxell.observe -- single agent with 12-step lifecycle tracing
  • @waxell.tool(tool_type="graph_db") -- Neo4j and FalkorDB graph queries and writeback
  • @waxell.tool(tool_type="vector_db") -- Pinecone and Milvus vector queries
  • @waxell.tool(tool_type="embedding") -- Voyage AI query embedding
  • @waxell.tool(tool_type="reranking") -- dual reranking with Cohere and Jina
  • @waxell.tool(tool_type="evaluation") -- DeepEval faithfulness, relevancy, hallucination
  • @waxell.tool(tool_type="memory") -- Zep session memory storage
  • Graph + vector hybrid -- Neo4j/FalkorDB graph context merged with Pinecone/Milvus vector results
  • Auto-instrumented LLM calls -- Anthropic claude-3.5-sonnet call captured automatically
  • Graph writeback -- new knowledge written back to Neo4j after generation
  • 6 integration categories -- graph_db, vector_db, embedding, reranking, evaluation, memory in one pipeline

Run it

# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.knowledge_graph_rag_agent --dry-run

# Live mode
export ANTHROPIC_API_KEY="sk-ant-..."
python -m app.demos.knowledge_graph_rag_agent

# Custom query
python -m app.demos.knowledge_graph_rag_agent --dry-run --query "How do transformers relate to attention?"

Source

dev/waxell-dev/app/demos/knowledge_graph_rag_agent.py