# PubSub

`PubSub` is the abstract base class for Mastra's event system. It defines the contract that every pub/sub backend implements, so the rest of Mastra can publish and subscribe to events without knowing which transport is in use.

Mastra uses pub/sub internally for workflow event processing, streaming, and cross-component communication. Most applications use the default [`EventEmitterPubSub`](https://mastra.ai/reference/pubsub/event-emitter) and never construct a `PubSub` directly. Implement this class only when you need a custom transport.

For built-in implementations, see [`EventEmitterPubSub`](https://mastra.ai/reference/pubsub/event-emitter), [`UnixSocketPubSub`](https://mastra.ai/reference/pubsub/unix-socket-pubsub), [`CachingPubSub`](https://mastra.ai/reference/pubsub/caching-pubsub), [`RedisStreamsPubSub`](https://mastra.ai/reference/pubsub/redis-streams), and [`GoogleCloudPubSub`](https://mastra.ai/reference/pubsub/google-cloud-pubsub).

## Usage example

Extend `PubSub` and implement the four abstract methods to add a custom backend.

```typescript
import { PubSub } from '@mastra/core/events'
import type { Event, EventCallback, SubscribeOptions } from '@mastra/core/events'

export class CustomPubSub extends PubSub {
  async publish(topic: string, event: Omit<Event, 'id' | 'createdAt'>): Promise<void> {
    // Deliver the event to subscribers of `topic`.
  }

  async subscribe(topic: string, cb: EventCallback, options?: SubscribeOptions): Promise<void> {
    // Register `cb` to receive events published to `topic`.
  }

  async unsubscribe(topic: string, cb: EventCallback): Promise<void> {
    // Remove a previously registered callback.
  }

  async flush(): Promise<void> {
    // Wait for any in-flight deliveries to settle.
  }
}
```

Pass the instance to the [Mastra](https://mastra.ai/reference/core/mastra-class) constructor:

```typescript
import { Mastra } from '@mastra/core'
import { CustomPubSub } from './pubsub'

export const mastra = new Mastra({
  pubsub: new CustomPubSub(),
})
```

## Delivery modes

A `PubSub` declares which delivery modes it supports through the `supportedModes` property. Mastra reads this to decide whether to run a long-lived worker that pulls events.

| Mode   | Description                                                                                                                   |
| ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| `pull` | Consumers actively read from the broker, for example Redis Streams `XREADGROUP`. Mastra runs an orchestration worker to read. |
| `push` | Events arrive without the consumer asking, either in-process or through an HTTP endpoint. No read loop is required.           |

The default is `['pull']` so that custom implementations keep today's behavior unless they opt in to push delivery.

## Methods

### Core methods

#### `publish(topic, event)`

Publishes an event to a topic. The `id` and `createdAt` fields are assigned by the implementation.

```typescript
await pubsub.publish('my-topic', {
  type: 'example',
  data: { value: 1 },
  runId: 'run-123',
})
```

#### `subscribe(topic, cb, options?)`

Registers a callback to receive events published to a topic. When `options.group` is set, subscribers in the same group compete for messages and each event is delivered to one member. Without a group, every subscriber receives every event.

```typescript
await pubsub.subscribe('my-topic', (event, ack, nack) => {
  console.log(event)
})
```

#### `unsubscribe(topic, cb)`

Removes a previously registered callback from a topic.

```typescript
await pubsub.unsubscribe('my-topic', callback)
```

#### `flush()`

Waits for any in-flight deliveries to settle. Call this before shutdown to avoid dropping events.

```typescript
await pubsub.flush()
```

### Replay methods

These methods support resuming a stream after a disconnect. The default implementations fall back to a regular `subscribe`, so backends without history support behave as live-only. [`CachingPubSub`](https://mastra.ai/reference/pubsub/caching-pubsub) overrides them to replay cached events.

#### `getHistory(topic, offset?)`

Returns cached events for a topic, starting at `offset`. Returns an empty array when the backend has no history.

```typescript
const events = await pubsub.getHistory('my-topic', 0)
```

Returns: `Promise<Event[]>`

#### `subscribeWithReplay(topic, cb)`

Replays cached events, then subscribes to live events.

```typescript
await pubsub.subscribeWithReplay('my-topic', event => {
  console.log(event)
})
```

#### `subscribeFromOffset(topic, offset, cb)`

Replays cached events starting at a known position, then subscribes to live events. This is more efficient than a full replay when the client knows its last position.

```typescript
await pubsub.subscribeFromOffset('my-topic', 42, event => {
  console.log(event)
})
```

## Properties

**supportedModes** (`ReadonlyArray<"pull" | "push">`): Delivery modes the implementation supports. Defaults to \`\["pull"]\`.

## Types

### `Event`

**type** (`string`): Event type identifier.

**id** (`string`): Unique event ID, assigned by the implementation on publish.

**data** (`any`): Event payload.

**runId** (`string`): Run the event belongs to.

**createdAt** (`Date`): Timestamp assigned by the implementation on publish.

**index** (`number`): Sequential position used to resume from a specific offset.

**deliveryAttempt** (`number`): Number of times the event has been delivered. Starts at 1. Defaults to 1 when the backend does not track redelivery.

### `SubscribeOptions`

**group** (`string`): When set, subscribers with the same group compete for messages and each event is delivered to one member. When omitted, every subscriber receives every event.

### `EventCallback`

The callback signature for subscribers: `(event: Event, ack?: () => Promise<void>, nack?: () => Promise<void>) => void`.

**event** (`Event`): The delivered event.

**ack** (`() => Promise<void>`): Acknowledge successful processing. The event is removed from the queue.

**nack** (`() => Promise<void>`): Negative acknowledge. The event is requeued for redelivery after a delay.