Skip to main content

Data Ingestion Agent

A 14-step data ingestion pipeline stress test that scrapes content, embeds with two models, indexes into three vector databases, queries all three, reranks with two rerankers, and evaluates retrieval quality with RAGAS -- all traced with waxell-observe decorators.

Environment variables

This example requires WAXELL_API_KEY and WAXELL_API_URL. No LLM API key needed (no LLM calls in this pipeline). Use --dry-run to skip observe reporting.

Architecture

Key Code

Web Scraping Tools

Two scraping integrations are wrapped as @tool spans -- Crawl4AI for markdown extraction and ScrapeGraphAI for structured schema extraction.

@waxell.tool(name="crawl4ai.crawl", tool_type="web_scraping")
async def tool_crawl4ai(urls: list[str], extract: str = "markdown+metadata") -> dict:
"""Crawl multiple URLs via Crawl4AI."""
crawler = MockCrawl4AIClient()
crawl_results = await crawler.crawl(urls)
return {
"pages_crawled": len(crawl_results),
"total_chars": sum(len(r["markdown"]) for r in crawl_results),
"results": crawl_results,
}

@waxell.tool(name="scrapegraphai.smart_scrape", tool_type="web_scraping")
async def tool_scrapegraphai(url: str, schema_type: str = "benchmark") -> dict:
"""Intelligent scrape with schema extraction via ScrapeGraphAI."""
scraper = MockScrapeGraphAIClient()
return await scraper.smart_scrape(url=url, schema={"type": schema_type})

Embedding and Indexing

Two embedding models feed three vector databases, each wrapped as separate tool spans with distinct tool_type values.

@waxell.tool(name="sentence_transformers.encode", tool_type="embedding")
def tool_embed_sentence_transformers(texts: list[str]) -> dict:
"""Embed texts using Sentence Transformers (384-dim)."""
st_model = MockSentenceTransformerModel()
embeddings = st_model.encode(texts)
return {"model": st_model.model_name, "embeddings_count": len(embeddings), "dimension": 384}

@waxell.tool(name="nomic.embed", tool_type="embedding")
def tool_embed_nomic(texts: list[str], task_type: str = "search_document") -> dict:
"""Embed texts using Nomic (768-dim)."""
nomic_model = MockNomicEmbedder()
return {"model": nomic_model.model_name, "embeddings_count": len(texts), "dimension": 768}

@waxell.tool(name="faiss.index_flat_ip.add", tool_type="vector_db")
def tool_faiss_index(embeddings: list[list[float]], dimension: int = 384) -> dict: ...

@waxell.tool(name="qdrant.upsert", tool_type="vector_db")
def tool_qdrant_upsert(collection_name: str, points: list[dict]) -> dict: ...

@waxell.tool(name="weaviate.batch_import", tool_type="vector_db")
def tool_weaviate_import(objects: list[dict], collection: str = "VectorDBChunks") -> dict: ...

Retrieval, Reranking, and Evaluation

Three @retrieval-decorated queries feed two @tool-wrapped rerankers, compared via @reasoning, and scored with RAGAS.

@waxell.retrieval(source="faiss")
def retrieval_faiss(query: str, faiss_index, chunks, st_model, top_k=5) -> list[dict]:
"""Search FAISS index and return matched documents."""
...

@waxell.retrieval(source="qdrant")
def retrieval_qdrant(query: str, qdrant, nomic_model, collection_name, top_k=5) -> list[dict]: ...

@waxell.retrieval(source="weaviate")
def retrieval_weaviate(query: str, weaviate, collection, top_k=5) -> list[dict]:
"""Hybrid search in Weaviate (BM25 + vector)."""
...

@waxell.reasoning_dec(step="reranker_comparison")
def reason_reranker_comparison(ce_ranked, voyage_results, ce_avg, voyage_avg) -> dict:
"""Compare cross-encoder and Voyage reranker results."""
overlap_count = len(set(idx for _, idx in ce_ranked) & set(vr["index"] for vr in voyage_results))
return {
"thought": f"Cross-encoder avg: {ce_avg:.4f}, Voyage avg: {voyage_avg:.4f}",
"conclusion": f"Both rerankers agree on {overlap_count} of top-8 results",
}

# RAGAS evaluation scores
waxell.score("ragas_context_precision", 0.85, comment="RAGAS context_precision")
waxell.score("ragas_context_recall", 0.81, comment="RAGAS context_recall")
waxell.score("ragas_overall", 0.84, comment="RAGAS overall average")
waxell.score("reranker_agreement", 0.75, comment="Cross-encoder vs Voyage overlap ratio")

What this demonstrates

  • 14-step pipeline in a single @observe -- the entire scrape-embed-index-query-rerank-evaluate pipeline runs inside one agent trace with granular step visibility.
  • Web scraping tools -- crawl4ai.crawl and scrapegraphai.smart_scrape recorded as tool_type="web_scraping" spans.
  • Dual embedding models -- Sentence Transformers (384-dim) and Nomic (768-dim) recorded as tool_type="embedding" spans with dimension metadata.
  • Three vector database indexing tools -- FAISS, Qdrant, and Weaviate operations recorded as tool_type="vector_db" spans.
  • @retrieval decorator -- three source-attributed retrieval spans (faiss, qdrant, weaviate) auto-record query, result count, and top scores.
  • Weaviate hybrid search -- BM25 + vector combined search with configurable alpha for keyword/semantic balance.
  • Dual reranker comparison -- cross-encoder and Voyage reranker results compared via @reasoning with overlap analysis.
  • RAGAS evaluation scores -- context_precision, context_recall, faithfulness, and answer_relevancy metrics recorded via waxell.score().
  • waxell.metadata() -- pipeline configuration (vector DBs, embedding models, rerankers) recorded as structured metadata.

Run it

# Dry-run mode (no API key needed for this pipeline)
cd dev/waxell-dev
python -m app.demos.data_ingestion_pipeline_agent --dry-run

# With observe reporting
export WAXELL_API_KEY="your-waxell-api-key"
export WAXELL_API_URL="https://api.waxell.ai"
python -m app.demos.data_ingestion_pipeline_agent

Source

dev/waxell-dev/app/demos/data_ingestion_pipeline_agent.py