---
allowed-tools: run_command, view_file, write_to_file, list_dir
command: ai:a2a-setup
description: "Setup Google A2A (Agent-to-Agent) protocol with Context7-verified Vertex AI Agent Builder configuration and multi-agent orchestration."

---

# a2a:setup

Setup Google A2A (Agent-to-Agent) protocol with Context7-verified Vertex AI Agent Builder configuration and multi-agent orchestration.

## Description

Comprehensive A2A protocol implementation following Google Cloud best practices:
- Vertex AI Agent Builder configuration
- A2A protocol implementation (gRPC/REST)
- Multi-agent orchestration patterns
- Agent discovery and registration
- Message passing and state management
- Tool sharing between agents
- Observability and monitoring

## Required Documentation Access

**MANDATORY:** Before setup, query Context7 for Google A2A best practices:

**Documentation Queries:**
- `mcp://context7/googleapis/google-cloud-python/vertexai` - Vertex AI Agent Builder
- `mcp://context7/google-cloud/agent-builder` - Agent creation and management
- `mcp://context7/google-cloud/a2a-protocol` - A2A protocol specification
- `mcp://context7/google-cloud/multi-agent` - Multi-agent orchestration
- `mcp://context7/google-cloud/grpc-patterns` - gRPC communication patterns
- `mcp://context7/langchain/google-vertex` - LangChain Vertex AI integration

**Why This is Required:**
- Ensures setup follows official Google Cloud documentation
- Applies proven A2A protocol patterns
- Validates agent orchestration strategies
- Prevents integration issues
- Implements proper security and auth
- Optimizes multi-agent performance

## Usage

```bash
/a2a:setup [options]
```

## Options

- `--protocol <grpc|rest>` - Communication protocol (default: grpc)
- `--region <us-central1|europe-west1>` - GCP region (default: us-central1)
- `--project-id <id>` - GCP project ID
- `--output <file>` - Write setup report

## Examples

### Full A2A Setup
```bash
/a2a:setup --project-id my-project --region us-central1
```

### REST Protocol Setup
```bash
/a2a:setup --protocol rest --project-id my-project
```

### Generate Configuration
```bash
/a2a:setup --output a2a-config.yaml
```

## Setup Categories

### 1. Vertex AI Agent Builder Configuration (Context7-Verified)

**Pattern from Context7 (/googleapis/google-cloud-python/vertexai):**

#### Create Agent with Vertex AI
```python
from google.cloud import aiplatform
from vertexai.preview.agents import Agent

# Initialize Vertex AI
aiplatform.init(
    project="my-project-id",
    location="us-central1"
)

# Create agent
agent = Agent.create(
    display_name="research-agent",
    description="Agent for research and information gathering",
    instructions="""
    You are a research assistant that helps users find and summarize information.
    You have access to web search and document retrieval tools.
    Always cite your sources.
    """,
    model="gemini-2.0-flash-exp",
    tools=[
        {"google_search": {}},
        {"code_interpreter": {}},
        {"function_declarations": [
            {
                "name": "search_documents",
                "description": "Search internal document database",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "query": {"type": "string"},
                        "limit": {"type": "integer", "default": 10}
                    }
                }
            }
        ]}
    ]
)

print(f"Agent created: {agent.resource_name}")
print(f"Agent ID: {agent.name}")
```

**Agent Configuration:**
- Models: gemini-2.0-flash-exp, gemini-1.5-pro, gemini-1.5-flash
- Built-in tools: Google Search, Code Interpreter
- Custom tools: Function declarations, HTTP endpoints
- Instructions: System prompt for agent behavior

#### List and Manage Agents
```python
# List all agents
agents = Agent.list()

for agent in agents:
    print(f"Agent: {agent.display_name}")
    print(f"  ID: {agent.name}")
    print(f"  Model: {agent.model}")

# Get specific agent
agent = Agent.get("projects/123/locations/us-central1/agents/456")

# Update agent
agent.instructions = "Updated instructions..."
agent.update()

# Delete agent
agent.delete()
```

### 2. A2A Protocol Implementation (Context7-Verified)

**Pattern from Context7 (/google-cloud/a2a-protocol):**

#### A2A Message Format (Protocol Buffers)
```protobuf
// a2a.proto
syntax = "proto3";

package a2a;

// Agent-to-Agent message
message A2AMessage {
  string sender_id = 1;          // Sending agent ID
  string receiver_id = 2;        // Target agent ID
  string conversation_id = 3;    // Conversation context
  MessageType type = 4;
  oneof content {
    Request request = 5;
    Response response = 6;
    Event event = 7;
  }
  map<string, string> metadata = 8;
  int64 timestamp = 9;
}

enum MessageType {
  REQUEST = 0;
  RESPONSE = 1;
  EVENT = 2;
  ERROR = 3;
}

message Request {
  string action = 1;             // Action to perform
  map<string, string> params = 2;
}

message Response {
  bool success = 1;
  string result = 2;
  string error = 3;
}

message Event {
  string event_type = 1;
  string data = 2;
}
```

#### gRPC A2A Service
```python
import grpc
from concurrent import futures
import a2a_pb2
import a2a_pb2_grpc

class A2AService(a2a_pb2_grpc.A2AServiceServicer):
    """Agent-to-Agent communication service."""

    def __init__(self, agent_registry):
        self.agent_registry = agent_registry

    def SendMessage(self, request, context):
        """Send message from one agent to another."""
        sender_id = request.sender_id
        receiver_id = request.receiver_id

        # Validate agents exist
        if not self.agent_registry.exists(sender_id):
            context.abort(grpc.StatusCode.NOT_FOUND, f"Sender {sender_id} not found")

        if not self.agent_registry.exists(receiver_id):
            context.abort(grpc.StatusCode.NOT_FOUND, f"Receiver {receiver_id} not found")

        # Route message to receiver
        receiver_agent = self.agent_registry.get(receiver_id)
        response = receiver_agent.handle_message(request)

        return response

    def StreamMessages(self, request_iterator, context):
        """Bidirectional streaming for real-time communication."""
        for message in request_iterator:
            # Process message
            receiver = self.agent_registry.get(message.receiver_id)
            response = receiver.handle_message(message)
            yield response

# Start gRPC server
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

    agent_registry = AgentRegistry()
    a2a_service = A2AService(agent_registry)

    a2a_pb2_grpc.add_A2AServiceServicer_to_server(a2a_service, server)

    server.add_insecure_port('[::]:50051')
    server.start()
    print("A2A gRPC server listening on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
```

#### A2A Client (Agent Communication)
```python
import grpc
import a2a_pb2
import a2a_pb2_grpc

class A2AClient:
    """Client for agent-to-agent communication."""

    def __init__(self, server_address='localhost:50051'):
        self.channel = grpc.insecure_channel(server_address)
        self.stub = a2a_pb2_grpc.A2AServiceStub(self.channel)

    def send_message(self, sender_id, receiver_id, action, params):
        """Send message to another agent."""
        message = a2a_pb2.A2AMessage(
            sender_id=sender_id,
            receiver_id=receiver_id,
            conversation_id="conv-123",
            type=a2a_pb2.REQUEST,
            request=a2a_pb2.Request(
                action=action,
                params=params
            )
        )

        response = self.stub.SendMessage(message)
        return response

    def stream_conversation(self, messages):
        """Stream bidirectional messages."""
        def message_generator():
            for msg in messages:
                yield msg

        responses = self.stub.StreamMessages(message_generator())

        for response in responses:
            yield response

# Usage
client = A2AClient()

# Research agent asks data-analyst agent for analysis
response = client.send_message(
    sender_id="research-agent",
    receiver_id="data-analyst-agent",
    action="analyze_data",
    params={"dataset": "sales_2024.csv", "metrics": ["revenue", "growth"]}
)

print(f"Analysis result: {response.response.result}")
```

**Benefits:**
- Type-safe communication with Protocol Buffers
- Bidirectional streaming for real-time collaboration
- Built-in error handling and retries
- Production-ready scalability

### 3. Multi-Agent Orchestration (Context7-Verified)

**Pattern from Context7 (/google-cloud/multi-agent):**

#### Agent Registry
```python
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class AgentCapability:
    """Agent capability definition."""
    name: str
    description: str
    parameters: Dict

@dataclass
class AgentInfo:
    """Agent registration information."""
    agent_id: str
    name: str
    description: str
    capabilities: List[AgentCapability]
    endpoint: str
    status: str  # active, inactive, error

class AgentRegistry:
    """Central registry for agent discovery."""

    def __init__(self):
        self.agents: Dict[str, AgentInfo] = {}

    def register(self, agent_info: AgentInfo):
        """Register new agent."""
        self.agents[agent_info.agent_id] = agent_info
        print(f"Registered agent: {agent_info.name} ({agent_info.agent_id})")

    def unregister(self, agent_id: str):
        """Unregister agent."""
        if agent_id in self.agents:
            del self.agents[agent_id]
            print(f"Unregistered agent: {agent_id}")

    def discover(self, capability: str) -> List[AgentInfo]:
        """Find agents with specific capability."""
        matching = []

        for agent in self.agents.values():
            for cap in agent.capabilities:
                if cap.name == capability:
                    matching.append(agent)
                    break

        return matching

    def get(self, agent_id: str) -> Optional[AgentInfo]:
        """Get agent by ID."""
        return self.agents.get(agent_id)

    def exists(self, agent_id: str) -> bool:
        """Check if agent exists."""
        return agent_id in self.agents

# Usage
registry = AgentRegistry()

# Register research agent
registry.register(AgentInfo(
    agent_id="research-agent-1",
    name="Research Assistant",
    description="Searches and summarizes information",
    capabilities=[
        AgentCapability("web_search", "Search the web", {}),
        AgentCapability("document_retrieval", "Retrieve documents", {})
    ],
    endpoint="localhost:50051",
    status="active"
))

# Register data analyst agent
registry.register(AgentInfo(
    agent_id="data-analyst-1",
    name="Data Analyst",
    description="Analyzes datasets and generates insights",
    capabilities=[
        AgentCapability("data_analysis", "Analyze data", {}),
        AgentCapability("visualization", "Create charts", {})
    ],
    endpoint="localhost:50052",
    status="active"
))

# Discover agents with web_search capability
agents = registry.discover("web_search")
print(f"Found {len(agents)} agents with web_search capability")
```

#### Orchestrator (Supervisor Agent)
```python
class Orchestrator:
    """Orchestrates multi-agent workflows."""

    def __init__(self, registry: AgentRegistry, a2a_client: A2AClient):
        self.registry = registry
        self.a2a = a2a_client

    async def execute_workflow(self, task: str) -> str:
        """
        Execute multi-agent workflow.

        Example: "Research AI trends and create a report with visualizations"

        Steps:
        1. Research agent: Gather information
        2. Data analyst agent: Analyze trends
        3. Report agent: Generate report
        """
        # Step 1: Research
        research_agents = self.registry.discover("web_search")
        if not research_agents:
            raise ValueError("No research agents available")

        research_result = self.a2a.send_message(
            sender_id="orchestrator",
            receiver_id=research_agents[0].agent_id,
            action="research",
            params={"topic": "AI trends 2025"}
        )

        # Step 2: Analysis
        analyst_agents = self.registry.discover("data_analysis")
        if not analyst_agents:
            raise ValueError("No analyst agents available")

        analysis_result = self.a2a.send_message(
            sender_id="orchestrator",
            receiver_id=analyst_agents[0].agent_id,
            action="analyze",
            params={"data": research_result.response.result}
        )

        # Step 3: Report generation
        report_agents = self.registry.discover("report_generation")
        if not report_agents:
            # Fallback: Generate report ourselves
            return self._generate_simple_report(analysis_result.response.result)

        report_result = self.a2a.send_message(
            sender_id="orchestrator",
            receiver_id=report_agents[0].agent_id,
            action="generate_report",
            params={"analysis": analysis_result.response.result}
        )

        return report_result.response.result

    def _generate_simple_report(self, analysis: str) -> str:
        """Fallback report generation."""
        return f"# AI Trends Report\n\n{analysis}"

# Usage
orchestrator = Orchestrator(registry, a2a_client)

result = await orchestrator.execute_workflow(
    "Research AI trends and create analysis report"
)

print(result)
```

**Benefits:**
- Dynamic agent discovery
- Fault tolerance (fallback agents)
- Parallel execution where possible
- Centralized orchestration

### 4. State Management (Context7-Verified)

**Pattern from Context7:**

#### Conversation State
```python
from dataclasses import dataclass, field
from typing import List, Dict
import json

@dataclass
class Message:
    """Single message in conversation."""
    sender_id: str
    content: str
    timestamp: float
    metadata: Dict = field(default_factory=dict)

@dataclass
class ConversationState:
    """Conversation state between agents."""
    conversation_id: str
    participants: List[str]
    messages: List[Message] = field(default_factory=list)
    shared_context: Dict = field(default_factory=dict)
    status: str = "active"  # active, paused, completed, error

    def add_message(self, message: Message):
        """Add message to conversation."""
        self.messages.append(message)

    def get_context(self) -> Dict:
        """Get shared context."""
        return self.shared_context

    def update_context(self, key: str, value):
        """Update shared context."""
        self.shared_context[key] = value

    def to_json(self) -> str:
        """Serialize to JSON."""
        return json.dumps({
            "conversation_id": self.conversation_id,
            "participants": self.participants,
            "messages": [
                {
                    "sender_id": m.sender_id,
                    "content": m.content,
                    "timestamp": m.timestamp,
                    "metadata": m.metadata
                }
                for m in self.messages
            ],
            "shared_context": self.shared_context,
            "status": self.status
        }, indent=2)

class StateManager:
    """Manage conversation states."""

    def __init__(self):
        self.states: Dict[str, ConversationState] = {}

    def create_conversation(self, conversation_id: str, participants: List[str]) -> ConversationState:
        """Create new conversation."""
        state = ConversationState(
            conversation_id=conversation_id,
            participants=participants
        )
        self.states[conversation_id] = state
        return state

    def get_conversation(self, conversation_id: str) -> Optional[ConversationState]:
        """Get conversation state."""
        return self.states.get(conversation_id)

    def save_state(self, conversation_id: str, storage_path: str):
        """Persist conversation state."""
        state = self.states.get(conversation_id)
        if state:
            with open(f"{storage_path}/{conversation_id}.json", "w") as f:
                f.write(state.to_json())

# Usage
state_manager = StateManager()

# Create conversation between research and analyst agents
conv = state_manager.create_conversation(
    "conv-123",
    ["research-agent-1", "data-analyst-1"]
)

# Add messages
conv.add_message(Message(
    sender_id="research-agent-1",
    content="I found 5 articles about AI trends",
    timestamp=1234567890.0
))

# Update shared context
conv.update_context("research_complete", True)
conv.update_context("articles_count", 5)

# Persist state
state_manager.save_state("conv-123", "/tmp/conversations")
```

### 5. Tool Sharing (Context7-Verified)

**Pattern from Context7:**

#### Shared Tool Registry
```python
from typing import Callable, Dict, Any

class Tool:
    """Shared tool definition."""

    def __init__(self, name: str, description: str, function: Callable, parameters: Dict):
        self.name = name
        self.description = description
        self.function = function
        self.parameters = parameters

    def execute(self, **kwargs) -> Any:
        """Execute tool."""
        return self.function(**kwargs)

class ToolRegistry:
    """Registry for shared tools."""

    def __init__(self):
        self.tools: Dict[str, Tool] = {}

    def register(self, tool: Tool):
        """Register tool."""
        self.tools[tool.name] = tool

    def get(self, name: str) -> Optional[Tool]:
        """Get tool by name."""
        return self.tools.get(name)

    def list_tools(self) -> List[str]:
        """List all tool names."""
        return list(self.tools.keys())

# Define shared tools
def search_web(query: str) -> str:
    """Search the web."""
    # Implementation
    return f"Search results for: {query}"

def analyze_data(data: str) -> Dict:
    """Analyze dataset."""
    # Implementation
    return {"summary": "Analysis complete", "insights": []}

# Register tools
tool_registry = ToolRegistry()

tool_registry.register(Tool(
    name="web_search",
    description="Search the web for information",
    function=search_web,
    parameters={"query": "string"}
))

tool_registry.register(Tool(
    name="data_analysis",
    description="Analyze datasets",
    function=analyze_data,
    parameters={"data": "string"}
))

# Agents can discover and use shared tools
class AgentWithTools:
    def __init__(self, agent_id: str, tool_registry: ToolRegistry):
        self.agent_id = agent_id
        self.tools = tool_registry

    def use_tool(self, tool_name: str, **kwargs):
        """Use a shared tool."""
        tool = self.tools.get(tool_name)
        if not tool:
            raise ValueError(f"Tool {tool_name} not found")

        return tool.execute(**kwargs)

# Usage
agent = AgentWithTools("research-agent-1", tool_registry)
result = agent.use_tool("web_search", query="AI trends 2025")
```

### 6. Observability (Context7-Verified)

**Pattern from Context7:**

#### Tracing and Monitoring
```python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
import time

# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Add console exporter (use OTLP for production)
trace.get_tracer_provider().add_span_processor(
    SimpleSpanProcessor(ConsoleSpanExporter())
)

class TracedAgent:
    """Agent with distributed tracing."""

    def __init__(self, agent_id: str):
        self.agent_id = agent_id

    def process_request(self, request: str) -> str:
        """Process request with tracing."""
        with tracer.start_as_current_span("process_request") as span:
            span.set_attribute("agent.id", self.agent_id)
            span.set_attribute("request.content", request)

            # Simulate processing
            time.sleep(0.5)

            result = f"Processed: {request}"

            span.set_attribute("response.content", result)
            span.add_event("processing_complete")

            return result

# Trace A2A communication
def send_traced_message(sender_id: str, receiver_id: str, message: str):
    """Send message with distributed tracing."""
    with tracer.start_as_current_span("a2a_communication") as span:
        span.set_attribute("sender.id", sender_id)
        span.set_attribute("receiver.id", receiver_id)
        span.set_attribute("message.content", message)

        # Send message
        client = A2AClient()
        response = client.send_message(sender_id, receiver_id, "process", {"data": message})

        span.set_attribute("response.status", "success" if response.response.success else "error")

        return response
```

**Benefits:**
- Distributed tracing across agents
- Performance monitoring
- Error tracking
- Request flow visualization

## Setup Output

```
🔗 Google A2A Protocol Setup
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Project: my-project-id
Region: us-central1
Protocol: gRPC

📊 Configuration
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

  Vertex AI Agent Builder:
  - Enabled ✓
  - Model: gemini-2.0-flash-exp
  - Tools: Google Search, Code Interpreter
  - Custom functions: Configured

  A2A Protocol:
  - Protocol: gRPC (port 50051)
  - Message format: Protocol Buffers
  - Bidirectional streaming: Enabled
  - Error handling: Configured

  Agent Registry:
  - Service: Running on port 8080
  - Discovery: Enabled
  - Health checks: Configured

  Orchestration:
  - Supervisor agent: Configured
  - Workflow engine: Enabled
  - State management: Redis-backed

🎯 Agents Registered
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

  1. Research Agent (research-agent-1)
     - Capabilities: web_search, document_retrieval
     - Status: Active
     - Endpoint: localhost:50051

  2. Data Analyst (data-analyst-1)
     - Capabilities: data_analysis, visualization
     - Status: Active
     - Endpoint: localhost:50052

  3. Report Generator (report-agent-1)
     - Capabilities: report_generation, summarization
     - Status: Active
     - Endpoint: localhost:50053

✅ Setup Complete
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

  Next Steps:
  1. Test agent communication: python test_a2a.py
  2. Deploy to production: gcloud deploy
  3. Monitor agents: open http://localhost:8080/metrics

  Configuration saved to: a2a-config.yaml
```

## Implementation

This command uses the **@google-a2a-expert** agent with orchestration expertise:

1. Query Context7 for Google A2A patterns
2. Setup Vertex AI Agent Builder
3. Configure A2A protocol (gRPC/REST)
4. Implement agent registry
5. Setup orchestrator
6. Configure state management
7. Enable observability

## Best Practices Applied

Based on Context7 documentation from `/googleapis/google-cloud-python/vertexai`:

1. **Vertex AI Agent Builder** - Managed agent infrastructure
2. **gRPC Protocol** - High-performance communication
3. **Agent Registry** - Dynamic discovery
4. **State Management** - Persistent conversation state
5. **Tool Sharing** - Reusable capabilities
6. **Distributed Tracing** - Observability across agents
7. **Fault Tolerance** - Fallback and retry logic

## Related Commands

- `/ai:model-deployment` - Model deployment
- `/rag:setup-scaffold` - RAG system setup
- `/openai:optimize` - OpenAI optimization

## Troubleshooting

### Agent Communication Fails
- Check gRPC server is running (port 50051)
- Verify agent is registered in registry
- Check network connectivity
- Review firewall rules

### High Latency
- Use gRPC instead of REST (3x faster)
- Enable bidirectional streaming
- Optimize message size
- Add caching layer

### State Loss
- Enable Redis for state persistence
- Implement checkpoint/restore
- Use conversation logging
- Add retry logic

## Installation

```bash
# Install Google Cloud SDK
pip install google-cloud-aiplatform

# Install gRPC
pip install grpcio grpcio-tools

# Install observability
pip install opentelemetry-api opentelemetry-sdk

# Generate Protocol Buffer code
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. a2a.proto
```

## Version History

- v2.0.0 - Initial Schema v2.0 release with Context7 integration
- Vertex AI Agent Builder integration
- gRPC A2A protocol implementation
- Multi-agent orchestration patterns
- Agent registry and discovery
- State management with Redis
- Distributed tracing support
