Skip to main content

Vertex AI

A comprehensive Google Cloud Vertex AI pipeline demonstrating both generate_content_async and chat session send_message_async APIs. The orchestrator dispatches an analyzer child agent for topic analysis, an advisor child agent for conversational follow-up, generates embeddings via the @tool(embedding) decorator, and evaluates analysis quality with @reasoning. Covers Vertex AI's generative models, chat sessions, and embedding models.

Environment variables

This example requires GOOGLE_CLOUD_PROJECT, WAXELL_API_KEY, and WAXELL_API_URL. Use --dry-run to run without any API keys or GCP credentials.

Architecture

Key Code

Vertex AI mode decision and embedding generation

The orchestrator decides between generate-only, chat-only, or both modes. Embedding generation uses the @tool(embedding) decorator for typed span creation.

@waxell.decision(name="choose_vertex_mode", options=["generate", "chat", "both"])
async def choose_vertex_mode(query: str) -> dict:
q_lower = query.lower()
if any(kw in q_lower for kw in ["discuss", "conversation", "follow-up"]):
chosen = "chat"
reasoning = "Query implies conversational context, use chat session"
elif any(kw in q_lower for kw in ["quick", "simple", "define"]):
chosen = "generate"
reasoning = "Direct question, single generation sufficient"
else:
chosen = "both"
reasoning = "Complex query benefits from analysis + conversational follow-up"
return {"chosen": chosen, "reasoning": reasoning}


@waxell.tool(tool_type="embedding")
def generate_embeddings(embedding_model, texts: list) -> dict:
embeddings = embedding_model.get_embeddings(texts)
dims = len(embeddings[0].values) if embeddings else 0
return {"count": len(embeddings), "dimensions": dims, "vectors": [e.values for e in embeddings]}

Child agents with different Vertex AI APIs

The analyzer uses generate_content_async for one-shot generation, while the advisor uses chat.send_message_async for conversational follow-up.

@waxell.observe(agent_name="vertex-analyzer", workflow_name="vertex-ai-pipeline")
async def run_vertex_analyzer(query: str, model, waxell_ctx=None):
waxell.tag("agent_role", "analyzer")
waxell.tag("model", getattr(model, "_model_name", "gemini-2.0-flash"))

response = await model.generate_content_async(
f"Analyze the following topic:\n\n{query}"
)
waxell.metadata("prompt_tokens", response.usage_metadata.prompt_token_count)
return {"analysis": response.text, "model": getattr(model, "_model_name", "gemini-2.0-flash")}


@waxell.observe(agent_name="vertex-advisor", workflow_name="vertex-ai-pipeline")
async def run_vertex_advisor(analysis: str, model, waxell_ctx=None):
waxell.tag("agent_role", "advisor")
chat = model.start_chat()
response = await chat.send_message_async(
f"Based on this analysis, what are the top 3 recommendations?\n\n{analysis[:500]}"
)
return {"recommendations": response.text}

What this demonstrates

  • @waxell.observe -- parent orchestrator with 2 child agents
  • @waxell.step_dec -- query preprocessing
  • @waxell.decision -- Vertex AI mode selection (generate/chat/both)
  • @waxell.tool(embedding) -- embedding generation with Vertex AI TextEmbeddingModel
  • @waxell.reasoning_dec -- analysis quality evaluation
  • waxell.tag() -- agent role and model tagging
  • waxell.score() -- quality scores including boolean completion markers
  • waxell.metadata() -- token counts, embedding dimensions, pipeline info
  • Vertex AI APIs -- generate_content_async, chat send_message_async, and get_embeddings

Run it

cd dev/waxell-dev
python -m app.demos.vertex_ai_agent --dry-run

Source

dev/waxell-dev/app/demos/vertex_ai_agent.py