Skip to main content

Milvus Agent

A multi-agent vector search pipeline using Milvus/Zilliz. A parent orchestrator coordinates 3 child agents -- an indexer that creates collections and inserts vectors, a searcher that runs cosine similarity search and boolean expression queries then ranks results, and a synthesizer that generates answers with quality assessment. The pipeline demonstrates SDK primitives across Milvus-specific operations including collection creation, vector insert, ANN search, and expression-based queries.

Environment variables

This example requires OPENAI_API_KEY, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to skip real API calls.

Architecture

Key Code

Milvus Collection and Vector Operations with @tool

The indexer creates a collection with schema and inserts vectors. The searcher performs ANN similarity search and expression-based filtering.

@waxell.tool(tool_type="vector_db")
def milvus_create_collection(name: str, dimension: int,
metric_type: str = "COSINE") -> dict:
"""Create or reference a Milvus collection."""
return {"collection": name, "dimension": dimension, "metric_type": metric_type}

@waxell.tool(tool_type="vector_db")
def milvus_insert_vectors(collection, data: list) -> dict:
"""Insert vectors into a Milvus collection."""
result = collection.insert(data)
return {"insert_count": result.get("insert_count", len(data[0]))}

@waxell.tool(tool_type="vector_db")
def milvus_vector_search(collection, query_vector: list, limit: int = 3) -> dict:
"""Search Milvus by vector similarity."""
search_results = collection.search(
data=[query_vector], anns_field="embedding",
param={"metric_type": "COSINE", "params": {"nprobe": 10}},
limit=limit, output_fields=["text", "source"],
)
return {"hits": len(hits), "top_score": round(hits[0].distance, 3)}

@waxell.tool(tool_type="vector_db")
def milvus_query_by_expr(collection, expr: str) -> dict:
"""Query Milvus by boolean expression filter."""
results = collection.query(expr=expr, output_fields=["text", "source"])
return {"count": len(results), "expr": expr}

Ranking with @retrieval

@waxell.retrieval(source="milvus")
def rank_search_hits(query: str, hits: list, embeddings: list) -> list[dict]:
"""Rank and enrich Milvus search hits with document metadata."""
ranked = []
for h in hits:
doc = embeddings[h.id]
ranked.append({
"id": h.id, "text": doc["text"], "source": doc["source"],
"score": round(1.0 / (1.0 + h.distance), 4),
})
ranked.sort(key=lambda x: x["score"], reverse=True)
return ranked

What this demonstrates

  • @waxell.observe -- parent-child agent hierarchy (orchestrator + 3 child agents) with automatic lineage via WaxellContext
  • @waxell.tool(tool_type="vector_db") -- Milvus operations (create collection, insert vectors, vector search, query by expression) recorded as tool spans
  • @waxell.retrieval(source="milvus") -- search hit ranking recorded with Milvus as the source
  • @waxell.decision -- search strategy selection via OpenAI (cosine, L2, inner product) and output format
  • waxell.decide() -- manual collection routing and merge strategy decisions
  • @waxell.reasoning_dec -- chain-of-thought quality assessment
  • @waxell.step_dec -- query preprocessing with keyword extraction
  • waxell.score() -- answer quality and relevance scores attached to the trace
  • waxell.tag() / waxell.metadata() -- vector DB type, collection name, dimension, and corpus size
  • Auto-instrumented LLM calls -- OpenAI calls captured without extra code
  • Dual query modes -- ANN vector search and boolean expression query combined in one pipeline

Run it

# Dry-run mode (no API key needed)
cd dev/waxell-dev
python -m app.demos.milvus_agent --dry-run

# Live mode
export OPENAI_API_KEY="sk-..."
python -m app.demos.milvus_agent

# Custom query
python -m app.demos.milvus_agent --dry-run --query "Find documents about model evaluation"

Source

dev/waxell-dev/app/demos/milvus_agent.py