# `eventa`

[![npm version][npm-version-src]][npm-version-href]
[![npm downloads][npm-downloads-src]][npm-downloads-href]
[![bundle][bundle-src]][bundle-href]
[![JSDocs][jsdocs-src]][jsdocs-href]
[![License][license-src]][license-href]
[![Ask DeepWiki][deepwiki-src]][deepwiki-href]

Transport-aware events powering ergonomic RPC and streaming flows.

> Heavily inspired by pragmatic RPC flows, but centred on pure events so transports stay swappable.

> [!WARNING]
> Eventa forwards whatever payload you emit. Validate data at the edges before sending it to untrusted peers.

## Installation

```sh
npm install @moeru/eventa
pnpm i @moeru/eventa
bun i @moeru/eventa
ni @moeru/eventa
yarn add @moeru/eventa
```

### Agent Skills

Install the [eventa skill](https://skills.sh) to your AI coding agent:

```sh
npx skills add moeru-ai/eventa
```

## Getting Started

### Event

It's very simple:

- `defineEventa`: all event should be defined with this util, it produces type safe constraints
- `context`: a channel bridges to peers (Electron, Worker, WebSocket Peer, you name it)
- `createContext`: to wrap any compatible event listener

If you need only events without RPC mechanism, then use with `context.emit(...)` and `context.on(...)`

```ts
import { createContext, defineEventa } from '@moeru/eventa'

const move = defineEventa<{ x: number, y: number }>()
const ctx = createContext()

ctx.emit(move, { x: 100, y: 200 })
ctx.on(move, ({ body }) => console.log(body.x, body.y))
```

### RPC/Stream RPC

Events can be seen as packets transferring in networks, so we can use pure event to form a RPC/Stream RPC like how gRPC and tRPC works.

- `defineInvokeEventa`: define types of RPC/Stream RPC
- `defineInvoke`: this produce a `function` returns `Promise` for your RPC call to be used later, you can store and use it everywhere you want
- `defineInvokeHandler`: similar to how Nuxt, h3 defines their handler, we use `defineInvokeHandler` to hook a auto
- `defineStreamInvokeHandler`: similar to gRPC, when one RPC invocation produces not only one response, but multiple intermediate events, you may want to use it

#### Simple Example

The most simple way to show how it works:

```ts
import { createContext, defineInvoke, defineInvokeEventa, defineInvokeHandler } from '@moeru/eventa'

const ctx = createContext()
const someMethodDefine = defineInvokeEventa<{ output: string }, { input: number }>('random name')
defineInvokeHandler(ctx, someMethodDefine, ({ input }) => ({ output: String(input) }))

const someMethod = defineInvoke(ctx, someMethodDefine)
console.log(await someMethod(42)) // => { output: '42' }
```

### Channels

Channels connect existing Eventa contexts. They forward ordinary events and the internal invoke events used by `defineInvoke`, so the same API works for event fan-out, RPC forwarding, and transport bridges such as iframe -> WebSocket -> server.

Use `pipeChannel(...)` for one-way forwarding:

```ts
import { createContext, defineEventa, pipeChannel } from '@moeru/eventa'

const source = createContext()
const firstTarget = createContext()
const secondTarget = createContext()

const message = defineEventa<{ text: string }>('message')

pipeChannel(source, firstTarget, secondTarget)

firstTarget.on(message, ({ body }) => console.log('first', body.text))
secondTarget.on(message, ({ body }) => console.log('second', body.text))

source.emit(message, { text: 'hello' })
```

Use `linkChannel(...)` for bidirectional forwarding:

```ts
import { linkChannel } from '@moeru/eventa'

const link = linkChannel(iframeContext, websocketContext)
```

`linkChannel(a, b, c)` creates a fully-connected bidirectional mesh. It is not a linear chain like `a <-> b <-> c`; every context is linked to every other context.

Channels propagate context aborts by default. In a pipe, aborting the source aborts every target. In a link, aborting one context aborts the linked mesh. Set `propagateAbort: false` when contexts should share events but keep independent lifetimes:

```ts
linkChannel(iframeContext, websocketContext, {
  propagateAbort: false,
})
```

#### Transform and filter

Channel plugins run before an event is forwarded. A plugin can:

- return `undefined` to keep forwarding the current event
- return a new event object to transform it
- return `false` to drop it for that pipe

Plugins receive the current event and a context object with `source`, `target`, and `direction`.

```ts
import { defineChannelPlugin, pipeChannel } from '@moeru/eventa'

const tagSource = defineChannelPlugin(event => ({
  ...event,
  body: {
    ...event.body,
    source: 'iframe',
  },
}))

const blockPrivateEvents = defineChannelPlugin((event) => {
  if (event.id.startsWith('private:')) {
    return false
  }
})

pipeChannel(iframeContext, websocketContext, {
  plugins: [tagSource, blockPrivateEvents],
})
```

You can add plugins after creating a channel. Calling `.use(...)` on the returned group applies the plugin to every pipe in that group:

```ts
const pipe = pipeChannel(sourceContext, firstTarget, secondTarget)

const removeTrace = pipe.use(event => ({
  ...event,
  metadata: {
    ...event.metadata,
    tracedAt: Date.now(),
  },
}))

removeTrace()
```

The exposed `pipes` array contains the individual directed pipes. Plugins added to a child pipe only affect that edge:

```ts
const pipe = pipeChannel(sourceContext, firstTarget, secondTarget)

pipe.pipes[0].use((event) => {
  if (event.id === 'debug') {
    return false
  }
})
```

### Adapters

Eventa comes with various adapters for common use scenarios across browsers and Node.js, including Electron, `window.postMessage`, Web Workers, Worker Threads, BroadcastChannel, EventTarget, EventEmitter, and WebSockets.

<details>
  <summary>Electron</summary>

  1. Create a shared events module:
      ```ts
      import { defineInvokeEventa } from '@moeru/eventa'

      export const readdir = defineInvokeEventa<{ directories: string[] }, { cwd: string, target: string }>('rpc:node:fs/promise:readdir')
      ```

  2. In the main process, bridge the adapter to `ipcMain` and your `BrowserWindow` instance:
     ```ts
     import { createContext as createMainContext } from '@moeru/eventa/adapters/electron/main'
     import { app, BrowserWindow, ipcMain } from 'electron'

     import { readdir } from './events/readdir'

     app.on('ready', () => {
       // ... other code
       const { context: mainCtx } = createMainContext(ipcMain, mainWindow.webContents)
       defineInvokeHandler(mainCtx, readdir, async ({ cwd, target }) => {
         const fs = await import('node:fs/promises')
         const path = await import('node:path')
         const fullPath = path.resolve(cwd, target)
         const directories = await fs.readdir(fullPath, { withFileTypes: true })
         return { directories: directories.filter(dirent => dirent.isDirectory()).map(dirent => dirent.name) }
       })
     })
     ```
  3. In the renderer (not restricted to preload scripts, but recommended), bridge to `ipcRenderer` and expose a safe API:
     ```ts
     import { createContext as createRendererContext } from '@moeru/eventa/adapters/electron/renderer'
     import { contextBridge, ipcRenderer } from 'electron'

     import { defineInvoke, readdir } from './events/readdir'

     const { context: rendererCtx } = createRendererContext(ipcRenderer)
     const invokeReaddir = defineInvoke(rendererCtx, readdir)

     document.addEventListener('DOMContentLoaded', () => {
       invokeReaddir({ cwd: '/', target: 'usr' }).then((result) => {
         console.log('directories', result.directories)
       })
     })
     ```
  4. The main and renderer contexts now share the invoke pipeline used throughout the examples in `src/adapters/electron/*.test.ts`.

</details>

<details>
  <summary>Web Worker</summary>

  1. Spawn the worker and wrap it with the main-thread adapter:
      ```ts
      import Worker from 'web-worker'

      import { createContext, defineInvoke, defineInvokeEventa } from '@moeru/eventa/adapters/webworkers'

      const worker = new Worker(new URL('./worker.js', import.meta.url), { type: 'module' })
      const { context: mainCtx } = createContext(worker)

      export const syncEvents = defineInvokeEventa<{ status: string }, { jobId: string }>('worker:sync')
      export const invokeSync = defineInvoke(mainCtx, syncEvents)
      ```
  2. Inside the worker entry, create the worker context and register handlers:
     ```ts
     import { defineInvokeHandler } from '@moeru/eventa'
     import { createContext } from '@moeru/eventa/adapters/webworkers/worker'

     import { syncEvents } from '../sync'

     const { context: workerCtx } = createContext()
     defineInvokeHandler(workerCtx, syncEvents, ({ jobId }) => ({ status: `synced ${jobId}` }))
     ```
  3. The same pattern works for streaming handlers and for sending transferrable(s) by switching to `defineStreamInvoke` or `defineOutboundWorkerEventa` as shown in `src/adapters/webworkers/index.test.ts`.

</details>

<details>
  <summary>BroadcastChannel</summary>

  ```ts
  import { defineEventa } from '@moeru/eventa'
  import { createContext } from '@moeru/eventa/adapters/broadcast-channel'

  const channel = new BroadcastChannel('eventa-demo')
  const { context: ctx } = createContext(channel)

  const ping = defineEventa<{ message: string }>('bc:ping')
  ctx.on(ping, ({ body }) => {
    console.log('received', body.message)
  })

  ctx.emit(ping, { message: 'Hello from BroadcastChannel' })
  ```
</details>

<details>
  <summary>Window Message (iframe / popup)</summary>

  1. Define shared invoke events once:
     ```ts
     import { defineInvokeEventa } from '@moeru/eventa'

     export const echoEvents = defineInvokeEventa<{ echoed: string }, { message: string }>('window:echo')
     ```
  2. In the host page, bridge Eventa to the child frame or popup:
     ```ts
     import { defineInvoke } from '@moeru/eventa'
     import { createContext } from '@moeru/eventa/adapters/window-message'

     import { echoEvents } from './shared-events'

     const iframe = document.querySelector('iframe')!
     const { context: hostCtx } = createContext({
       channel: 'demo:window-message',
       currentWindow: window,
       targetWindow: () => iframe.contentWindow,
       targetOrigin: '*',
     })

     const echo = defineInvoke(hostCtx, echoEvents)
     console.log(await echo({ message: 'hello iframe' })) // => { echoed: 'iframe:hello iframe' }
     ```
  3. In the iframe or popup window, create the peer context and register handlers:
     ```ts
     import { defineInvokeHandler } from '@moeru/eventa'
     import { createContext } from '@moeru/eventa/adapters/window-message'

     import { echoEvents } from './shared-events'

     const { context: childCtx } = createContext({
       channel: 'demo:window-message',
       currentWindow: window,
       targetWindow: () => window.parent,
       targetOrigin: '*',
     })

     defineInvokeHandler(childCtx, echoEvents, ({ message }) => ({
       echoed: `iframe:${message}`,
     }))
     ```
  4. This adapter handles normal invoke responses and handler-thrown errors once both sides have created their Eventa contexts. If the iframe or popup script throws before the bridge is established, Eventa may never start in that peer at all, so the caller will not get a transport-level failure automatically. More generally, `window.postMessage` does not expose a worker-style fatal error channel, so if the other window disappears, never boots, or crashes before replying, callers should use `AbortSignal` or their own timeout/liveness policy.

</details>

<details>
  <summary>EventTarget</summary>

  ```ts
  import { defineInvoke, defineInvokeEventa, defineInvokeHandler } from '@moeru/eventa'
  import { createContext } from '@moeru/eventa/adapters/event-target'

  const eventTarget = new EventTarget()
  const { context: ctx } = createContext(eventTarget)

  const echoEvents = defineInvokeEventa<{ output: string }, { input: string }>('et:echo')
  defineInvokeHandler(ctx, echoEvents, ({ input }) => ({ output: input.toUpperCase() }))

  const echo = defineInvoke(ctx, echoEvents)
  console.log(await echo({ input: 'eventa' })) // => { output: 'EVENTA' }
  ```
</details>

<details>
  <summary>EventEmitter (Node.js)</summary>

  ```ts
  import { EventEmitter } from 'node:events'

  import { defineEventa } from '@moeru/eventa'
  import { createContext } from '@moeru/eventa/adapters/event-emitter'

  const emitter = new EventEmitter()
  const { context: ctx } = createContext(emitter)

  const logEvent = defineEventa<{ message: string }>('emitter:log')
  ctx.on(logEvent, ({ body }) => console.log(body.message))
  ctx.emit(logEvent, { message: 'Hello from EventEmitter' })
  ```
</details>

<details>
  <summary>Worker Threads (Node.js)</summary>

  1. Main thread:
      ```ts
      import { Worker } from 'node:worker_threads'

      import { defineInvoke, defineInvokeEventa } from '@moeru/eventa'
      import { createContext } from '@moeru/eventa/adapters/worker-threads'

      const worker = new Worker(new URL('./worker.ts', import.meta.url), { type: 'module' })
      const { context: mainCtx } = createContext(worker)

      const helloEvents = defineInvokeEventa<{ output: string }, { input: string }>('node-worker-hello')
      const hello = defineInvoke(mainCtx, helloEvents)
      console.log(await hello({ input: 'Eventa' })) // => { output: 'Hello, Eventa' }
      ```
  2. Worker entry:
      ```ts
      import { defineInvokeEventa, defineInvokeHandler } from '@moeru/eventa'
      import { createContext } from '@moeru/eventa/adapters/worker-threads/worker'

      const helloEvents = defineInvokeEventa<{ output: string }, { input: string }>('node-worker-hello')

      const { context: workerCtx } = createContext()
      defineInvokeHandler(workerCtx, helloEvents, ({ input }) => ({ output: `Hello, ${input}` }))
      ```
</details>

<details>
  <summary>WebSocket (Client)</summary>

  1. Open a `WebSocket` and wrap it with the native adapter:
      ```ts
      import { defineInvoke, defineInvokeEventa } from '@moeru/eventa'
      import { createContext as createWsContext } from '@moeru/eventa/adapters/websocket/native'

      const socket = new WebSocket('wss://example.com/ws')
      const { context: wsCtx } = createWsContext(socket)

      const chatEvents = defineInvokeEventa<{ message: string }, { text: string }>('chat:send')
      export const sendChat = defineInvoke(wsCtx, chatEvents)
      ```
  2. Listen for connection lifecycle events to update UI state or retry logic:
     ```ts
     import { wsConnectedEvent, wsDisconnectedEvent } from '@moeru/eventa/adapters/websocket/native'

     wsCtx.on(wsConnectedEvent, () => console.log('connected'))
     wsCtx.on(wsDisconnectedEvent, () => console.log('disconnected'))
     ```
  3. Pair the client with either the H3 global or peer adapter on the server for a full RPC channel (`src/adapters/websocket/h3/*.test.ts`).

</details>

<details>
  <summary>WebSocket (Server with H3)</summary>

  ```ts
  import { defineInvoke, defineInvokeHandler } from '@moeru/eventa'
  // we support h3 by default, you can implement whatever you want, it's simple
  import { createContext } from '@moeru/eventa/adapters/websocket/h3'

  const chatEvents = defineInvokeEventa<{ message: string }, { text: string }>('chat:send')

  const app = new H3()
  const { untilLeastOneConnected, hooks } = createPeerHooks()
  app.get('/ws', defineWebSocketHandler(hooks))

  const { context } = await untilLeastOneConnected
  defineInvokeHandler(context, chatEvents, ({ text: string }) => {
    // you can safely throw any error you want, you can even make the error type safe when using `defineInvoke`
    return { message: `Echo: ${text}` }
  })
  ```
</details>

### Advanced Usage

#### Streaming RPC

`defineInvokeHandler` is complemented by `defineStreamInvokeHandler` for long-running operations that need to report progress or intermediate results.

```ts
import { createContext, defineInvokeEventa, defineStreamInvoke, defineStreamInvokeHandler, toStreamHandler } from '@moeru/eventa'

const ctx = createContext()
const syncEvents = defineInvokeEventa<
  { type: 'progress' | 'result', value: number },
  { jobId: string }
>('rpc:sync')

// toStreamHelper converts an async function into an async generator
// so you can use imperative code instead of a generator function.
defineStreamInvokeHandler(ctx, syncEvents, toStreamHandler(async ({ payload, emit }) => {
  emit({ type: 'progress', value: 0 })
  for (let i = 1; i <= 5; i++) {
    emit({ type: 'progress', value: i * 20 })
  }
  emit({ type: 'result', value: 100 })
}))

const sync = defineStreamInvoke(ctx, syncEvents)
for await (const update of sync({ jobId: 'import' })) {
  console.log(update.type, update.value)
}
```

Both generator-style and imperative handlers are exercised in `src/stream.test.ts:7`.

#### Abort/Cancel

Eventa supports cancellation via `AbortSignal` on the client side and exposes an `AbortController` inside handlers so you can stop work early.

Client-side (unary invoke):

```ts
import { createContext, defineInvoke, defineInvokeEventa } from '@moeru/eventa'

const ctx = createContext()
const slowEvents = defineInvokeEventa<{ output: string }, { input: string }>('rpc:slow')
const slow = defineInvoke(ctx, slowEvents)

const controller = new AbortController()
const promise = slow({ input: 'work' }, { signal: controller.signal })

controller.abort('user cancelled')
await promise // rejects with AbortError
```

Server-side handler (unary):

```ts
import { defineInvokeHandler } from '@moeru/eventa'

defineInvokeHandler(ctx, slowEvents, async ({ input }, options) => {
  const signal = options?.abortController?.signal
  if (signal?.aborted) {
    return { output: 'aborted' }
  }

  signal?.addEventListener('abort', () => {
    // clean up resources, cancel timers, close connections, etc.
  }, { once: true })

  // ... do work
  return { output: `done: ${input}` }
})
```

Client-side (stream invoke):

```ts
import { defineInvokeEventa, defineStreamInvoke } from '@moeru/eventa'

const streamEvents = defineInvokeEventa<{ type: 'progress' | 'done', value: number }, { jobId: string }>('rpc:stream')
const stream = defineStreamInvoke(ctx, streamEvents)

const controller = new AbortController()
const results = stream({ jobId: 'import' }, { signal: controller.signal })

setTimeout(() => controller.abort('timeout'), 1000)
for await (const msg of results) {
  console.log(msg)
}
```

Server-side handler (streaming):

```ts
import { defineStreamInvokeHandler } from '@moeru/eventa'

defineStreamInvokeHandler(ctx, streamEvents, async function* ({ jobId }, options) {
  const signal = options?.abortController?.signal

  for (let i = 0; i <= 5; i++) {
    if (signal?.aborted) {
      return
    }
    yield { type: 'progress', value: i * 20 }
    await new Promise(r => setTimeout(r, 200))
  }

  yield { type: 'done', value: 100 }
})
```

#### Streaming Input

Eventa supports stream inputs on unary invokes (client-streaming) and full bidirectional streaming. This mirrors the gRPC shapes:

```proto
// Client-streaming request -> unary response
rpc RecordRoute(stream Point) returns (RouteSummary) {}

// Bidirectional streaming request/response
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
```

Client-streaming input with `defineInvoke` (stream in, single response out):

```ts
import { createContext, defineInvoke, defineInvokeEventa, defineInvokeHandler } from '@moeru/eventa'

const ctx = createContext()

const recordRoute = defineInvokeEventa<
  { distance: number, points: number },
  ReadableStream<{ lat: number, lng: number }>
>('rpc:record-route')

defineInvokeHandler(ctx, recordRoute, async (stream) => {
  let points = 0
  for await (const _ of stream) {
    points += 1
  }
  return { distance: points * 10, points }
})

const input = new ReadableStream({
  start(controller) {
    controller.enqueue({ lat: 0, lng: 0 })
    controller.enqueue({ lat: 1, lng: 1 })
    controller.close()
  },
})

const invoke = defineInvoke(ctx, recordRoute)
console.log(await invoke(input))
```

Bidirectional streaming with `defineStreamInvoke` (stream in, stream out):

```ts
import { createContext, defineInvokeEventa, defineStreamInvoke, defineStreamInvokeHandler } from '@moeru/eventa'

const ctx = createContext()

const routeChat = defineInvokeEventa<
  { message: string },
  ReadableStream<{ message: string }>
>('rpc:route-chat')

defineStreamInvokeHandler(ctx, routeChat, async function* (incoming) {
  for await (const note of incoming) {
    yield { message: `echo: ${note.message}` }
  }
})

const outgoing = new ReadableStream({
  start(controller) {
    controller.enqueue({ message: 'hello' })
    controller.enqueue({ message: 'from stream' })
    controller.close()
  },
})

const stream = defineStreamInvoke(ctx, routeChat)
for await (const note of stream(outgoing)) {
  console.log(note.message)
}
```

#### Shorthands for `defineInvokeHandler` and `defineInvoke`

When you have multiple invoke events to register handlers for, or to create invoke functions for, you can use `defineInvokeHandlers` and `defineInvokes` to do so in bulk.

```ts
const events = {
  double: defineInvokeEventa<number, number>(),
  append: defineInvokeEventa<string, string>(),
}

defineInvokeHandlers(ctx, events, {
  double: input => input * 2,
  append: input => `${input}!`,
})

const {
  double: invokeDouble,
  append: invokeAppend,
} = defineInvokes(ctx, events)

await invokeDouble(5) // 10
await invokeAppend('test') // 'test!'
```

## Development

```sh
pnpm i
pnpm test
```

> [!NOTE]
> `pnpm test` runs Vitest interactively. Use `pnpm test:run` for a single pass.

## Similar projects

- [`birpc`](https://github.com/antfu-collective/birpc): We dislike the way the API designs, we want fully free sharable invok-able functions, streaming input, streaming output, etc.
- [`async-call-rpc`](https://github.com/Jack-Works/async-call-rpc): it only works with JSON-RPC, but the DX is similar

## License

MIT

[npm-version-src]: https://img.shields.io/npm/v/@moeru/eventa?style=flat&colorA=080f12&colorB=1fa669
[npm-version-href]: https://npmjs.com/package/@moeru/eventa
[npm-downloads-src]: https://img.shields.io/npm/dm/@moeru/eventa?style=flat&colorA=080f12&colorB=1fa669
[npm-downloads-href]: https://npmjs.com/package/@moeru/eventa
[bundle-src]: https://img.shields.io/bundlephobia/minzip/@moeru/eventa?style=flat&colorA=080f12&colorB=1fa669&label=minzip
[bundle-href]: https://bundlephobia.com/result?p=@moeru/eventa
[license-src]: https://img.shields.io/github/license/moeru-ai/eventa.svg?style=flat&colorA=080f12&colorB=1fa669
[license-href]: https://github.com/moeru-ai/eventa/blob/main/LICENSE
[jsdocs-src]: https://img.shields.io/badge/jsdocs-reference-080f12?style=flat&colorA=080f12&colorB=1fa669
[jsdocs-href]: https://www.jsdocs.io/package/@moeru/eventa
[deepwiki-src]: https://deepwiki.com/badge.svg
[deepwiki-href]: https://deepwiki.com/moeru-ai/eventa
