# RedisStreamsPubSub

`RedisStreamsPubSub` is a [`PubSub`](https://mastra.ai/reference/pubsub/base) implementation backed by [Redis Streams](https://redis.io/docs/latest/develop/data-types/streams/). It delivers events across processes and hosts, with persistence, consumer groups, and redelivery on failure.

Use it for distributed deployments where several services share an event stream. For single-process delivery, use [`EventEmitterPubSub`](https://mastra.ai/reference/pubsub/event-emitter). For Google Cloud, use [`GoogleCloudPubSub`](https://mastra.ai/reference/pubsub/google-cloud-pubsub).

Each topic maps to a Redis stream key. Subscriptions with a group use a Redis consumer group, so members share work round-robin. Subscriptions without a group create a private consumer group, so every subscriber receives every event.

`RedisStreamsPubSub` is a pull transport: consumers read events with `XREADGROUP`, so Mastra runs an orchestration worker to read on its behalf.

## Installation

**npm**:

```bash
npm install @mastra/redis-streams
```

**pnpm**:

```bash
pnpm add @mastra/redis-streams
```

**Yarn**:

```bash
yarn add @mastra/redis-streams
```

**Bun**:

```bash
bun add @mastra/redis-streams
```

## Usage example

Provide a Redis connection URL.

```typescript
import { Mastra } from '@mastra/core'
import { RedisStreamsPubSub } from '@mastra/redis-streams'

export const mastra = new Mastra({
  pubsub: new RedisStreamsPubSub({
    url: 'redis://localhost:6379',
  }),
})
```

## Constructor parameters

**url** (`string`): Redis connection URL. Falls back to \`redisOptions.url\`. (Default: `redis://localhost:6379`)

**keyPrefix** (`string`): Prefix for stream keys. Each topic maps to \`\<keyPrefix>:\<topic>\`. (Default: `mastra:topic`)

**blockMs** (`number`): How long, in milliseconds, each read blocks while waiting for new events. (Default: `1000`)

**redisOptions** (`RedisClientOptions`): Options passed to the underlying \`redis\` client for advanced configuration.

**maxStreamLength** (`number`): Approximate maximum number of entries kept per stream. Set to 0 to disable trimming. (Default: `10000`)

**reclaimIntervalMs** (`number`): How often, in milliseconds, a subscription reclaims events that an earlier consumer read but never acknowledged. Set to 0 to disable. (Default: `30000`)

**reclaimIdleMs** (`number`): Minimum idle time, in milliseconds, before a pending event is eligible for reclaim. Keep this well above typical processing time to avoid double delivery. (Default: `60000`)

**maxDeliveryAttempts** (`number`): Maximum times an event is redelivered through \`nack\` before it is dropped. Pass \`Infinity\` to disable the cap. (Default: `5`)

**logger** (`{ debug?: Function; warn?: Function }`): Optional logger for diagnostics. When omitted, suppressed errors are silent.

## Properties

**supportedModes** (`ReadonlyArray<"pull" | "push">`): Returns \`\["pull"]\`.

## Methods

`RedisStreamsPubSub` implements the [`PubSub`](https://mastra.ai/reference/pubsub/base) contract. The methods below have behavior specific to this implementation.

### `subscribe(topic, cb, options?)`

Subscribes to a topic. With `options.group`, members of the group share events through a Redis consumer group. Without a group, the subscriber receives every event through a private consumer group.

```typescript
await pubsub.subscribe('workflow.events', (event, ack, nack) => {
  console.log(event)
})
```

### `flush()`

Waits for in-flight publishes to complete.

```typescript
await pubsub.flush()
```

### `close()`

Closes the Redis connections and stops all subscriptions. Call this during graceful shutdown.

```typescript
await pubsub.close()
```

## Redelivery and reclaim

When a subscriber calls `nack`, the event is republished with an incremented `deliveryAttempt` and the original is acknowledged. Once an event reaches `maxDeliveryAttempts`, it is dropped instead of redelivered. Separately, each subscription periodically reclaims events that an earlier consumer in the group read but never acknowledged, controlled by `reclaimIntervalMs` and `reclaimIdleMs`.