# EventEmitterPubSub

`EventEmitterPubSub` is the default [`PubSub`](https://mastra.ai/reference/pubsub/base) implementation. It delivers events in-process using a Node.js [`EventEmitter`](https://nodejs.org/api/events.html#class-eventemitter), so it works without any external service.

Use it for single-process applications. For delivery across processes on one host, see [`UnixSocketPubSub`](https://mastra.ai/reference/pubsub/unix-socket-pubsub). For distributed delivery, see [`RedisStreamsPubSub`](https://mastra.ai/reference/pubsub/redis-streams) or [`GoogleCloudPubSub`](https://mastra.ai/reference/pubsub/google-cloud-pubsub).

Because it's in-process, events aren't persisted and aren't shared with other processes. Wrap it in [`CachingPubSub`](https://mastra.ai/reference/pubsub/caching-pubsub) when you need replay for resumable streams.

## Usage example

`EventEmitterPubSub` is used automatically when you don't configure a `pubsub` option, so most applications never construct it directly. Create one explicitly only when you want to configure or share it.

```typescript
import { Mastra } from '@mastra/core'
import { EventEmitterPubSub } from '@mastra/core/events'

export const mastra = new Mastra({
  pubsub: new EventEmitterPubSub(),
})
```

To share an emitter with other parts of your application, pass an existing `EventEmitter`:

```typescript
import EventEmitter from 'node:events'
import { EventEmitterPubSub } from '@mastra/core/events'

const emitter = new EventEmitter()
const pubsub = new EventEmitterPubSub(emitter)
```

To surface batched-delivery errors, pass a logger:

```typescript
import { EventEmitterPubSub } from '@mastra/core/events'

const pubsub = new EventEmitterPubSub(undefined, { logger })
```

## Constructor parameters

**existingEmitter** (`EventEmitter`): An existing Node.js EventEmitter to use for delivery. When omitted, a new EventEmitter is created.

**options** (`EventEmitterPubSubOptions`): Optional configuration.

## Properties

**supportedModes** (`ReadonlyArray<"pull" | "push">`): Returns \`\["pull", "push"]\`. The emitter can serve a pull-style worker or push events directly to listeners.

**supportsNativeBatching** (`boolean`): Returns \`true\`. Subscribers can opt in to batched delivery with \`options.batch\`.

## Methods

`EventEmitterPubSub` implements the [`PubSub`](https://mastra.ai/reference/pubsub/base) contract. The methods below have behavior specific to this implementation.

### `subscribe(topic, cb, options?)`

Registers a callback for a topic. Without `options.group`, every subscriber receives every event. With a group, events are distributed round-robin across members of that group.

Pass `options.batch` to opt in to batched delivery. See [Batching](#batching) below.

```typescript
await pubsub.subscribe('workflow.events', (event, ack, nack) => {
  console.log(event)
})
```

### `flush()`

Waits for any pending redeliveries from `nack` to fire before resolving.

```typescript
await pubsub.flush()
```

### `close()`

Removes all listeners and cancels pending redeliveries. Call this during graceful shutdown.

```typescript
await pubsub.close()
```

## Redelivery

When a grouped subscriber calls `nack`, the event is redelivered to the group after a short delay, and its `deliveryAttempt` count increases. Calling `ack` clears the tracking for that event. Fan-out subscribers receive no-op `ack` and `nack` functions, since each event reaches every subscriber once.

## Batching

`EventEmitterPubSub` honors `options.batch` natively. When a subscriber opts in, events are held in a per-subscriber in-memory buffer and delivered as consecutive callback invocations once a flush condition is met. Both fan-out and group subscribers can batch. See [`SubscribeBatchOptions`](https://mastra.ai/reference/pubsub/base) for the full policy.

```typescript
await pubsub.subscribe(
  'workflow.events',
  event => {
    console.log(event)
  },
  {
    batch: {
      maxSize: 10, // flush once 10 events have queued
      maxWaitMs: 500, // ...or after 500ms, whichever comes first
    },
  },
)
```

The buffer is in-memory and per-process, so batched state isn't persisted and doesn't survive a restart. A flush triggered by `maxWaitMs` is best-effort: if a step such as a throwing `coalesce` fails, the error is surfaced through the configured `logger` rather than thrown. `flush()` drains every batched subscriber buffer before resolving.