Skip to main content

Elasticsearch Agent

A single-agent pipeline using Elasticsearch for vector search. The agent creates an index with dense_vector mapping, bulk-indexes documents with embeddings, runs both pure KNN and hybrid (text + KNN) searches, compares the two approaches, merges and deduplicates results, and synthesizes an answer with an LLM. Demonstrates SDK primitives across real Elasticsearch knn and hybrid search operations.

Environment variables

This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Requires Elasticsearch on localhost:9200. Use --dry-run to skip real API calls.

Architecture

Key Code

KNN and Hybrid Search with @tool

The agent runs both pure KNN vector search and hybrid text + KNN search, comparing their effectiveness.

@waxell.tool(tool_type="vector_db", name="es_knn_search")
def knn_search(es, query_embedding, k: int = 3) -> dict:
"""Pure KNN vector search."""
result = es.search(index=INDEX_NAME, body={
"knn": {
"field": "embedding", "query_vector": query_embedding.tolist(),
"k": k, "num_candidates": 50,
},
"_source": ["title", "text", "category"],
})
return {"hits": [...], "total": result["hits"]["total"]["value"]}

@waxell.tool(tool_type="vector_db", name="es_hybrid_search")
def hybrid_search(es, query: str, query_embedding, k: int = 3) -> dict:
"""Hybrid text match + KNN vector search."""
result = es.search(index=INDEX_NAME, body={
"query": {"match": {"text": query}},
"knn": {
"field": "embedding", "query_vector": query_embedding.tolist(),
"k": k, "num_candidates": 50,
},
"_source": ["title", "text", "category"],
})
return {"hits": [...], "total": result["hits"]["total"]["value"]}

Search Approach Comparison with @reasoning and Result Merging with @retrieval

@waxell.reasoning_dec(step="compare_search_approaches")
def compare_search_approaches(knn_results: dict, hybrid_results: dict) -> dict:
knn_avg = sum(h["score"] for h in knn_results["hits"]) / len(knn_results["hits"])
hybrid_avg = sum(h["score"] for h in hybrid_results["hits"]) / len(hybrid_results["hits"])
return {
"thought": "Comparing KNN and hybrid search quality",
"evidence": [f"KNN avg: {knn_avg:.3f}", f"Hybrid avg: {hybrid_avg:.3f}"],
"conclusion": f"Hybrid {'outperforms' if hybrid_avg >= knn_avg else 'underperforms'} KNN",
}

@waxell.retrieval(source="elasticsearch")
def merge_and_rank_results(knn_hits: list, hybrid_hits: list) -> list[dict]:
"""Merge and deduplicate results from KNN and hybrid searches."""
seen = {}
for h in knn_hits + hybrid_hits:
if h["id"] not in seen or h["score"] > seen[h["id"]]["score"]:
seen[h["id"]] = h
return sorted(seen.values(), key=lambda x: x["score"], reverse=True)[:5]

What this demonstrates

  • @waxell.observe -- single agent with full lifecycle tracing
  • @waxell.tool(tool_type="vector_db") -- Elasticsearch operations (create index, bulk index, KNN search, hybrid search, cleanup) recorded as tool spans
  • @waxell.retrieval(source="elasticsearch") -- merged KNN + hybrid result ranking with deduplication
  • @waxell.decision -- search strategy selection (knn, hybrid, text_only)
  • @waxell.reasoning_dec -- comparative analysis of KNN vs hybrid search quality
  • waxell.score() -- KNN and hybrid average scores attached to the trace
  • waxell.tag() / waxell.metadata() -- vector DB type, index name, Elasticsearch host, and search type
  • Auto-instrumented LLM calls -- OpenAI synthesis captured without extra code
  • Dual search comparison -- KNN and hybrid search run in parallel and compared

Run it

# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.elasticsearch_agent --dry-run

# Live mode (requires Elasticsearch on localhost:9200)
export OPENAI_API_KEY="sk-..."
python -m app.demos.elasticsearch_agent

# Custom query
python -m app.demos.elasticsearch_agent --dry-run --query "Search for ML deployment patterns"

Source

dev/waxell-dev/app/demos/elasticsearch_agent.py