# EventEmitterPubSub

`EventEmitterPubSub` is the default [`PubSub`](https://mastra.ai/reference/pubsub/base) implementation. It delivers events in-process using a Node.js [`EventEmitter`](https://nodejs.org/api/events.html#class-eventemitter), so it works without any external service.

Use it for single-process applications. For delivery across processes on one host, see [`UnixSocketPubSub`](https://mastra.ai/reference/pubsub/unix-socket-pubsub). For distributed delivery, see [`RedisStreamsPubSub`](https://mastra.ai/reference/pubsub/redis-streams) or [`GoogleCloudPubSub`](https://mastra.ai/reference/pubsub/google-cloud-pubsub).

Because it is in-process, events are not persisted and are not shared with other processes. Wrap it in [`CachingPubSub`](https://mastra.ai/reference/pubsub/caching-pubsub) when you need replay for resumable streams.

## Usage example

`EventEmitterPubSub` is used automatically when you do not configure a `pubsub` option, so most applications never construct it directly. Create one explicitly only when you want to configure or share it.

```typescript
import { Mastra } from '@mastra/core'
import { EventEmitterPubSub } from '@mastra/core/events'

export const mastra = new Mastra({
  pubsub: new EventEmitterPubSub(),
})
```

To share an emitter with other parts of your application, pass an existing `EventEmitter`:

```typescript
import EventEmitter from 'node:events'
import { EventEmitterPubSub } from '@mastra/core/events'

const emitter = new EventEmitter()
const pubsub = new EventEmitterPubSub(emitter)
```

## Constructor parameters

**existingEmitter** (`EventEmitter`): An existing Node.js EventEmitter to use for delivery. When omitted, a new EventEmitter is created.

## Properties

**supportedModes** (`ReadonlyArray<"pull" | "push">`): Returns \`\["pull", "push"]\`. The emitter can serve a pull-style worker or push events directly to listeners.

## Methods

`EventEmitterPubSub` implements the [`PubSub`](https://mastra.ai/reference/pubsub/base) contract. The methods below have behavior specific to this implementation.

### `subscribe(topic, cb, options?)`

Registers a callback for a topic. Without `options.group`, every subscriber receives every event. With a group, events are distributed round-robin across members of that group.

```typescript
await pubsub.subscribe('workflow.events', (event, ack, nack) => {
  console.log(event)
})
```

### `flush()`

Waits for any pending redeliveries from `nack` to fire before resolving.

```typescript
await pubsub.flush()
```

### `close()`

Removes all listeners and cancels pending redeliveries. Call this during graceful shutdown.

```typescript
await pubsub.close()
```

## Redelivery

When a grouped subscriber calls `nack`, the event is redelivered to the group after a short delay, and its `deliveryAttempt` count increases. Calling `ack` clears the tracking for that event. Fan-out subscribers receive no-op `ack` and `nack` functions, since each event reaches every subscriber once.