# CachingPubSub

`CachingPubSub` wraps any [`PubSub`](https://mastra.ai/reference/pubsub/base) implementation and adds event caching and replay. It records every published event per topic in a cache, so a subscriber that connects late or reconnects after a disconnect can replay the events it missed before continuing with live events.

Use it to build resumable streams on top of a transport that does not keep history, such as [`EventEmitterPubSub`](https://mastra.ai/reference/pubsub/event-emitter). Transports that already persist events, such as [`RedisStreamsPubSub`](https://mastra.ai/reference/pubsub/redis-streams), do not need this wrapper.

## Usage example

Wrap an inner pub/sub and provide a server cache to store events.

```typescript
import { Mastra } from '@mastra/core'
import { CachingPubSub, EventEmitterPubSub } from '@mastra/core/events'
import { InMemoryServerCache } from '@mastra/core/cache'

const cache = new InMemoryServerCache()
const pubsub = new CachingPubSub(new EventEmitterPubSub(), cache)

export const mastra = new Mastra({
  pubsub,
})
```

The `withCaching` helper returns the same instance and reads better when wrapping inline:

```typescript
import { withCaching, EventEmitterPubSub } from '@mastra/core/events'
import { InMemoryServerCache } from '@mastra/core/cache'

const pubsub = withCaching(new EventEmitterPubSub(), new InMemoryServerCache())
```

## Constructor parameters

**inner** (`PubSub`): The pub/sub implementation to wrap. All publishes and live subscriptions pass through to this instance.

**cache** (`MastraServerCache`): Cache used to store events per topic for replay.

**options** (`CachingPubSubOptions`): Optional configuration.

## Methods

`CachingPubSub` implements the [`PubSub`](https://mastra.ai/reference/pubsub/base) contract. It overrides the replay methods to read cached events. The methods below describe the caching behavior.

### `publish(topic, event)`

Caches the event with a sequential index, then publishes it to the inner pub/sub.

```typescript
await pubsub.publish('my-topic', {
  type: 'example',
  data: { value: 1 },
  runId: 'run-123',
})
```

### `subscribeWithReplay(topic, cb)`

Replays all cached events for the topic, then subscribes to live events.

```typescript
await pubsub.subscribeWithReplay('my-topic', event => {
  console.log(event)
})
```

### `subscribeFromOffset(topic, offset, cb)`

Replays cached events starting at `offset`, then subscribes to live events. Use this when the client knows its last position, to avoid replaying the whole history.

```typescript
await pubsub.subscribeFromOffset('my-topic', 42, event => {
  console.log(event)
})
```

### `getHistory(topic, offset?)`

Returns cached events for the topic, starting at `offset`.

```typescript
const events = await pubsub.getHistory('my-topic', 0)
```

Returns: `Promise<Event[]>`

## Functions

### `withCaching(pubsub, cache, options?)`

Convenience wrapper that constructs a `CachingPubSub`. Accepts the same arguments as the constructor and returns the new instance.

```typescript
import { withCaching, EventEmitterPubSub } from '@mastra/core/events'
import { InMemoryServerCache } from '@mastra/core/cache'

const pubsub = withCaching(new EventEmitterPubSub(), new InMemoryServerCache(), {
  keyPrefix: 'events:',
})
```

Returns: `CachingPubSub`