# a2a-amqp

AMQP-backed EventBus and WorkQueue for scaling A2A agents with long-running tasks.

## Why?

A2A agents often need to handle long-running tasks (LLM calls, complex processing, etc.). Running these tasks inline in HTTP handlers causes:

- **Timeout issues**: HTTP connections timeout on long tasks
- **Scaling problems**: Single server bottlenecks
- **Resource waste**: Servers blocked waiting for tasks to complete

This library solves these problems by:

1. **Queuing tasks** via AMQP instead of processing inline
2. **Distributing work** across multiple worker processes
3. **Event sourcing** all task events for replay and recovery
4. **Streaming results** back via SSE while workers process in the background

## Architecture

```
HTTP Request → Server (enqueues task) → Returns immediately
                    ↓
               AMQP Queue
                    ↓
          Worker Pool (scales horizontally)
                    ↓
          Process task & publish events
                    ↓
          AMQP Stream (event sourcing)
                    ↓
          Client streams results via SSE
```

## Installation

```bash
# Installing using bun
bun add @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client

# Or with npm
npm install @cloudamqp/a2a-amqp @a2a-js/sdk @cloudamqp/amqp-client
```

**Requires**: LavinMQ or RabbitMQ with stream support

```bash
docker run -d -p 5672:5672 -p 15672:15672 cloudamqp/lavinmq:latest
```

## Quick Start

### 1. HTTP Server (enqueues tasks)

```typescript
import { AMQPAgentBackend, QueuingRequestHandler } from "@84codes/a2a-amqp";
import { A2AExpressApp } from "@a2a-js/sdk/server/express";
import express from "express";

// Create AMQP backend
const backend = await AMQPAgentBackend.create({
  url: "amqp://localhost:5672",
  agentName: "my-agent",
});

// Create request handler (handles task queuing + event projection)
const requestHandler = new QueuingRequestHandler(agentCard, backend);
await requestHandler.initialize();

// Setup Express with A2A routes
const app = express();
new A2AExpressApp(requestHandler).setupRoutes(app, "/");
app.listen(3000);
```

### 2. Worker Process (processes tasks)

```typescript
import { AMQPAgentBackend, WorkerEventBus } from "@84codes/a2a-amqp";
import { AgentExecutor, RequestContext } from "@a2a-js/sdk/server";

// Create backend with same agent name as server
const backend = await AMQPAgentBackend.create({
  url: "amqp://localhost:5672",
  agentName: "my-agent",
});

// Initialize work queue
await backend.workQueue.initialize();

class MyExecutor implements AgentExecutor {
  async execute(context: RequestContext, eventBus: ExecutionEventBus) {
    // Your long-running task logic here
    eventBus.publish({
      kind: "status-update",
      taskId: context.taskId,
      contextId: context.contextId,
      status: { state: "working", timestamp: new Date().toISOString() },
      final: false,
    });

    // ... do work ...

    eventBus.publish({
      kind: "status-update",
      taskId: context.taskId,
      contextId: context.contextId,
      status: { state: "completed", timestamp: new Date().toISOString() },
      final: true,
    });
    eventBus.finished();
  }
}

const executor = new MyExecutor();

// Start consuming with async generator pattern
const messages = backend.workQueue.start();

for await (const taskMessage of messages) {
  const { taskId, contextId, requestContext } = taskMessage;

  // Create request context
  const context = new RequestContext(
    requestContext.userMessage,
    taskId,
    contextId,
    requestContext.task,
    requestContext.referenceTasks
  );

  // Create event bus for publishing task events
  const eventBus = new WorkerEventBus(backend.amqpConnection, taskId, contextId);

  // Execute task
  await executor.execute(context, eventBus);
}
```

### 3. Scale horizontally

Run multiple workers to process tasks in parallel:

```bash
# Terminal 1: HTTP Server
bun run server

# Terminal 2-N: Workers (scale as needed)
bun run worker
bun run worker  # Add more workers for higher throughput
```

## Features

- **Work Queue**: Distribute tasks across multiple worker processes
- **Event Sourcing**: All task events stored in AMQP streams for replay
- **In-Memory Projection**: Fast task lookups with automatic recovery from streams
- **SSE Streaming**: Automatic streaming of task events back to clients
- **Horizontal Scaling**: Add more workers to increase throughput
- **Graceful Shutdown**: Clean consumer and connection handling
- **Type-Safe**: Full TypeScript support with Zod validation

## Configuration

```typescript
interface AMQPAgentBackendConfig {
  url: string;                    // AMQP broker URL
  agentName: string;              // Agent identifier
  streamRetention?: string;       // Event retention (default: "7d")
  streamMaxBytes?: number;        // Max stream size (default: 1GB)
  workQueueName?: string;         // Custom work queue name
  exchangeName?: string;          // Custom exchange name
  logger?: Logger;                // Custom logger
  connection?: {
    heartbeat?: number;           // Heartbeat interval in seconds
    reconnectDelay?: number;      // Reconnection delay in ms
    maxReconnectAttempts?: number;// Max reconnection attempts
  };
  publishing?: {
    persistent?: boolean;         // Persistent messages (default: true)
    confirmMode?: boolean;        // Publisher confirms (default: true)
    messageTtl?: number;          // Message TTL in ms (0 = no expiration)
  };
}
```

## Examples

See complete working examples:

- `src/examples/http-server.ts` - HTTP server with queuing
- `src/examples/worker.ts` - Worker process

```bash
# Run the example
bun run server  # Terminal 1
bun run worker  # Terminal 2

# Send a request
curl -X POST http://localhost:3000/ \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","id":1,"method":"messages/send","params":{"message":{"kind":"message","role":"user","messageId":"1","contextId":"ctx-1","parts":[{"kind":"text","text":"Hello"}]}}}'
```

## Testing

```bash
bun run test             # Run all tests (unit + integration)
bun run test:unit        # Unit tests only
bun run test:integration # Integration tests only
bun run test:watch       # Watch mode
bun run test:coverage    # With coverage
```

## License

MIT
