# libeam

An Erlang/OTP-inspired actor system for TypeScript. Build distributed, fault-tolerant applications with location-transparent actors, automatic supervision, and gossip-based cluster membership.

## Features

- **Functional API**: Simple, closure-based actor definitions with `createSystem` and `createActor` (Recommended)
- **Actor Model**: Lightweight actors with message-passing semantics (`call` for request/response, `cast` for fire-and-forget)
- **Location Transparency**: Actors communicate via `ActorRef` regardless of whether the target is local or remote
- **Supervision**: Automatic crash handling with configurable restart strategies
- **Distributed Clustering**: Gossip-based membership detection with automatic peer discovery
- **Placement Strategies**: Control where actors are spawned (`local`, `round-robin`) with optional role-based filtering
- **Transport Abstraction**: Pluggable transport layer (in-memory for testing, ZeroMQ for production)

## Elixir/OTP Parity

libeam implements the core primitives from Elixir/OTP, adapted for the Node.js runtime.

### Implemented

| Feature | Elixir/OTP | libeam |
|---------|------------|--------|
| **Actor Model** |
| Spawn processes | `spawn/1`, `GenServer.start/2` | `system.spawn()` |
| Async messages | `send/2`, `GenServer.cast/2` | `ref.cast()` |
| Sync calls | `GenServer.call/2` | `ref.call()` |
| **GenServer Callbacks** |
| `init/1` | Yes | `init()` |
| `handle_call/3` | Yes | `handleCall()` |
| `handle_cast/2` | Yes | `handleCast()` |
| `handle_info/2` | Yes | `handleInfo()` |
| `handle_continue/2` | Yes | `handleContinue()` |
| `terminate/2` | Yes | `terminate()` |
| Idle timeout | `{:noreply, state, timeout}` | `setIdleTimeout()` |
| **Supervision** |
| Supervisors | `Supervisor` | `Supervisor`, `ChildSupervisor` |
| one-for-one | Yes | Yes |
| one-for-all | Yes | Yes |
| rest-for-one | Yes | Yes |
| Max restarts | Yes | Yes |
| Dynamic supervisors | `DynamicSupervisor` | `DynamicSupervisor` |
| **Process Features** |
| Links | `Process.link/1` | `link()` |
| Monitors | `Process.monitor/1` | `watch()` |
| Trap exit | `Process.flag(:trap_exit, true)` | `setTrapExit()` |
| Exit signals | `Process.exit/2` | `exit()` |
| Timers | `Process.send_after/3` | `sendAfter()`, `sendInterval()` |
| **Introspection** |
| List children | `Supervisor.which_children/1` | `getChildren()` |
| Count children | `Supervisor.count_children/1` | `countChildren()` |
| **Abstractions** |
| Agent | `Agent` | `Agent` |
| DynamicSupervisor | `DynamicSupervisor` | `DynamicSupervisor` |
| GenStage | `GenStage` | `Producer`, `Consumer`, `ProducerConsumer`, `ConsumerSupervisor` |
| **Distribution** |
| Cluster membership | `:net_kernel` | `Cluster`, `GossipProtocol` |
| Remote messaging | Transparent | Via `Transport` |
| Registry | `Registry`, `:global` | `Registry`, `DistributedRegistry` |
| Actor migration | Manual process migration | `system.migrate()` |
| Node roles | Node profiles | `roles` config + `role` spawn option |
| Process groups | `pg` | `joinGroup()`, `getGroup()`, `broadcast()` |

### Not Implemented (Not Needed in Node.js)

| Feature | Reason |
|---------|--------|
| `Task.async/await` | Use native `Promise` / `async-await` |
| Selective receive | Not practical without BEAM VM |
| Hot code upgrades | Use `libeam deploy` for rolling deploys |
| `Application` behaviour | Use standard Node.js entry points |
| ETS/DETS | Use `Map` or external stores (Redis, etc.) |

### Not Yet Implemented

All core OTP features have been implemented. See the feature table above for the full list.

## Installation

### npm / pnpm / yarn

```bash
# npm
npm install libeam

# pnpm
pnpm add libeam

# yarn
yarn add libeam
```

### Deno (JSR)

```bash
deno add jsr:@libeam/core
```

Or import directly:

```typescript
import { createSystem, createActor } from "jsr:@libeam/core";
```

## Quick Start

### Functional API (Recommended)

The functional API provides a simple, closure-based approach to defining actors:

```typescript
import { createSystem, createActor } from "libeam";

// Define an actor with closure-based state
const Counter = createActor((ctx, self, initialValue: number) => {
  let count = initialValue;

  return self
    .onCall("get", () => count)
    .onCall("increment", () => ++count)
    .onCast("set", (value: number) => { count = value; });
});

// Create a system (one line!)
const system = createSystem();

// Spawn and interact with typed refs
const counter = system.spawn(Counter, { args: [0] });

const value = await counter.call("get");     // 0 — fully typed!
await counter.call("increment");              // 1
counter.cast("set", 100);                     // fire-and-forget

await system.shutdown();
```

### Class-Based API

For full control, extend the `Actor` class directly:

```typescript
import { Actor, ActorRef } from "libeam";

class CounterActor extends Actor {
  private count = 0;

  init(initialValue: number = 0) {
    this.count = initialValue;
    console.log(`Counter initialized with ${this.count}`);
  }

  // Synchronous request/response
  handleCall(message: { type: string }): number {
    switch (message.type) {
      case "get":
        return this.count;
      case "increment":
        return ++this.count;
      default:
        throw new Error(`Unknown message: ${message.type}`);
    }
  }

  // Fire-and-forget messages
  handleCast(message: { type: string; value?: number }): void {
    if (message.type === "set" && message.value !== undefined) {
      this.count = message.value;
    }
  }
}
```

### Single-Node Setup (In-Memory)

For testing or single-process applications:

```typescript
import {
  ActorSystem,
  InMemoryTransport,
  LocalCluster,
  LocalRegistry,
} from "libeam";

async function main() {
  const cluster = new LocalCluster("node1");
  const transport = new InMemoryTransport("node1");
  const registry = new LocalRegistry();

  await transport.connect();

  const system = new ActorSystem(cluster, transport, registry);
  system.registerActorClass(CounterActor);
  await system.start();

  // Spawn an actor
  const counter = system.spawn(CounterActor, {
    name: "my-counter",
    args: [10], // Initial value
  });

  // Interact with the actor
  const value = await counter.call({ type: "get" });
  console.log(`Current value: ${value}`); // 10

  await counter.call({ type: "increment" });
  console.log(`After increment: ${await counter.call({ type: "get" })}`); // 11

  counter.cast({ type: "set", value: 100 });
}

main();
```

### Multi-Node Setup (Functional API)

For distributed applications across multiple processes/machines, `createSystem` handles all the wiring — ZeroMQ transport, gossip protocol, cluster membership, and registry sync:

```typescript
import { createSystem, createActor, ActorRegistry } from "libeam";

// Define actors
const Ping = createActor((ctx, self) => {
  return self.onCast("ping", async (n: number) => {
    console.log(`Ping received: ${n}`);
    const pong = await ctx.getActorByName("pong");
    if (pong) pong.cast("pong", n + 1);
  });
});

const Pong = createActor((ctx, self) => {
  return self.onCast("pong", async (n: number) => {
    console.log(`Pong received: ${n}`);
    const ping = await ctx.getActorByName("ping");
    if (ping) ping.cast("ping", n + 1);
  });
});

// Register actors for typed getActorByName — same codebase on all nodes
declare module "libeam" {
  interface ActorRegistry {
    ping: typeof Ping;
    pong: typeof Pong;
  }
}

// Node 1 — port convention: rpc=5000, pub=5001, gossip=5002
const system1 = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  cookie: "my-cluster-secret",
});
const ping = system1.spawn(Ping, { name: "ping" });

// Node 2 — joins via node1's gossip port
const system2 = await createSystem({
  type: "distributed",
  port: 5010,
  seedNodes: ["127.0.0.1:5002"],
  cookie: "my-cluster-secret",
});
system2.spawn(Pong, { name: "pong" });

// Start the game — typed ref from spawn() supports clean syntax
ping.cast("ping", 0);
```

Run the full distributed example:

```bash
# Terminal 1
npx tsx examples/high-level/distributed.ts node1

# Terminal 2
npx tsx examples/high-level/distributed.ts node2
```

### Multi-Node Setup (Class-Based API)

For full control over transport, gossip, and cluster configuration:

<details>
<summary>Manual wiring example</summary>

```typescript
import {
  ActorSystem,
  ZeroMQTransport,
  GossipProtocol,
  GossipUDP,
  DistributedCluster,
  DistributedRegistry,
  RegistrySync,
} from "libeam";

async function startNode(config: {
  nodeId: string;
  rpcPort: number;
  pubPort: number;
  gossipPort: number;
  seedNodes: string[];
}) {
  const { nodeId, rpcPort, pubPort, gossipPort, seedNodes } = config;

  // 1. Setup transport (ZeroMQ)
  const transport = new ZeroMQTransport({
    nodeId,
    rpcPort,
    pubPort,
    bindAddress: "0.0.0.0",
  });
  await transport.connect();

  // 2. Setup gossip protocol for membership
  const gossipUDP = new GossipUDP(gossipPort);
  const gossipProtocol = new GossipProtocol(
    nodeId,
    `tcp://127.0.0.1:${rpcPort}`, // RPC address for peers
    `127.0.0.1:${gossipPort}`, // Gossip address
    gossipUDP,
    {
      gossipIntervalMs: 1000,
      cleanupIntervalMs: 2000,
      failureTimeoutMs: 5000,
      gossipFanout: 3,
      seedNodes,
    },
  );

  // 3. Setup cluster (wraps gossip protocol)
  const cluster = new DistributedCluster(gossipProtocol);
  await cluster.start();

  // 4. Setup registry sync for actor name resolution
  const registrySync = new RegistrySync(nodeId, transport, cluster);
  const registry = new DistributedRegistry(nodeId, registrySync);

  // 5. Wire cluster membership changes to transport
  cluster.on("member_join", (peerId: string) => {
    const peer = cluster.getPeerState(peerId);
    if (peer) {
      transport.updatePeers([[peerId, peer.address]]);
    }
  });

  // 6. Create actor system
  const system = new ActorSystem(cluster, transport, registry);
  system.registerActorClass(CounterActor);
  await system.start();

  return { system, cluster, transport };
}

// Node 1 (seed node)
const node1 = await startNode({
  nodeId: "node1",
  rpcPort: 5000,
  pubPort: 5001,
  gossipPort: 6000,
  seedNodes: [], // No seeds for first node
});

// Node 2 (joins via seed)
const node2 = await startNode({
  nodeId: "node2",
  rpcPort: 5010,
  pubPort: 5011,
  gossipPort: 6010,
  seedNodes: ["127.0.0.1:6000"], // Connect to node1's gossip port
});
```

</details>

## Functional API

The functional API is the recommended way to build applications with libeam. It provides better type safety, less boilerplate, and a more modern developer experience.

### createSystem

The `createSystem` factory simplifies system creation and configuration.

```typescript
import { createSystem } from "libeam";

// Local system (synchronous, zero config)
const system = createSystem();

// Local with options
const system = createSystem({ nodeId: "my-node" });

// Distributed (async, with ZeroMQ + Gossip)
const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
});
```

**System Interface:**

- `spawn(actorClass, options?)`: Spawn an actor and return a `TypedActorRef`
- `register(actorClass)`: Register an actor class for remote spawning
- `getActorByName(name)`: Look up a named actor (local or remote)
- `shutdown()`: Gracefully shut down the system and all actors
- `stop(ref)`: Stop an individual actor (cascading termination of children)
- `nodeId`: The unique ID of this node
- `transport`: Access to the underlying transport layer
- `cluster`: Access to the cluster membership interface
- `registry`: Access to the actor registry
- `system`: Escape hatch to the raw `ActorSystem` instance

### createActor

Define actors using a closure-based factory function.

```typescript
const MyActor = createActor((ctx, self, ...args) => {
  // Initialization logic here
  
  return self.onCall("ping", () => "pong");
});
```

The factory function receives:
- `ctx`: The actor context (spawn, watch, link, stash, etc.)
- `self`: The actor builder (register handlers, timers, etc.)
- `...args`: Arguments passed during `spawn`

**ActorContext (ctx) Methods:**
- `self`: Reference to this actor
- `parent`: Reference to the parent actor
- `spawn(actor, options?)`: Spawn a child actor
- `watch(ref)` / `unwatch(ref)`: Monitor other actors
- `link(ref)` / `unlink(ref)`: Bidirectional crash propagation
- `exit(reason?)`: Stop this actor (with optional reason)
- `setTrapExit(boolean)`: Enable/disable exit trapping
- `getActorByName(name)`: Look up a named actor (local or remote)
- `stash()` / `unstash()` / `unstashAll()` / `clearStash()`: Message stashing

**ActorBuilder (self) Methods:**
- `onCall(name, handler)`: Register a request-reply handler
- `onCast(name, handler)`: Register a fire-and-forget handler
- `onInfo(type, handler)`: Register a handler for system messages (`"down"`, `"exit"`, `"timeout"`, `"moved"`)
- `onTerminate(handler)`: Cleanup logic
- `onContinue(handler)`: Deferred initialization
- `sendAfter(msg, delay)` / `sendInterval(msg, interval)`: Timers
- `setIdleTimeout(ms)`: Configure idle timeout
- `migratable({ getState, setState })`: Enable actor migration
- `childSupervision(options)`: Configure supervision strategy for children

### TypedActorRef

When you spawn an actor created with `createActor`, you get a `TypedActorRef`. This provides full TypeScript autocompletion and type checking for `call` and `cast`.

```typescript
const counter = system.spawn(Counter, { args: [0] });

// TypeScript knows "get" and "increment" are valid calls
const value = await counter.call("get");
await counter.call("increment");

// TypeScript knows "set" is a valid cast and requires a number
counter.cast("set", 42);
```

### Type Inference

When you `return` the builder chain from a `createActor` factory, TypeScript automatically infers the handler types:

```typescript
// With return — full type inference
const Counter = createActor((ctx, self, initial: number) => {
  let count = initial;
  return self
    .onCall("get", () => count)
    .onCall("increment", () => ++count)
    .onCast("set", (v: number) => { count = v; });
});

const counter = system.spawn(Counter, { args: [0] });
counter.call("get");        // TypeScript knows this returns number
counter.cast("set", 42);    // TypeScript knows "set" expects a number
counter.call("typo");       // Type error! "typo" is not a valid method
```

Without `return`, the factory still works but handlers are untyped:

```typescript
// Without return — still works, but no type inference
const Untyped = createActor((ctx, self) => {
  self.onCall("get", () => 42);
});
```

**Timer methods** (`sendAfter`, `sendInterval`) return `TimerRef`, not the builder, so they must be called on a separate line before the `return`:

```typescript
const Heartbeat = createActor((ctx, self) => {
  self.sendInterval({ type: "tick" }, 1000);  // separate line (returns TimerRef)
  return self
    .onCast("tick", () => console.log("tick"))
    .onTerminate(() => console.log("stopped"));
});
```

### Module Augmentation (Typed `getActorByName`)

By default, `getActorByName` returns an untyped `ActorRef`. To get fully typed refs, augment the `ActorRegistry` interface:

```typescript
// types.d.ts (or any .ts file)
import "libeam";

declare module "libeam" {
  interface ActorRegistry {
    counter: typeof Counter;
    "chat-room": typeof ChatRoom;
  }
}
```

Now `getActorByName` returns typed refs when called with registered names:

```typescript
const counter = await system.getActorByName("counter");
if (counter) {
  const value = await counter.call("get");  // fully typed!
  counter.cast("set", 100);                 // fully typed!
}
```

**Utility types** for working with actor definitions:

- `ActorRefFrom<T>` — Extract `TypedActorRef` from an `ActorDefinition`
- `ExtractCalls<T>` — Extract call handler types from an `ActorDefinition`
- `ExtractCasts<T>` — Extract cast handler types from an `ActorDefinition`

```typescript
import { ActorRefFrom } from "libeam";

type CounterRef = ActorRefFrom<typeof Counter>;
// TypedActorRef<{ get: () => number; increment: () => number }, { set: (v: number) => void }>
```

### Feature Comparison

| Feature | Functional API | Class-Based API |
|---------|---------------|-----------------|
| State management | Closures | Class fields |
| Message handlers | `self.onCall()` / `self.onCast()` | `handleCall()` / `handleCast()` |
| Type safety | Automatic (TypedActorRef) | Manual typing |
| System setup | `createSystem()` | Manual wiring |
| Child supervision | `self.childSupervision()` | `childSupervision()` override |
| Deferred init | `self.onContinue()` | `handleContinue()` override |
| Message stashing | `ctx.stash()` / `ctx.unstashAll()` | `this.stash()` / `this.unstashAll()` |

## Class-Based API Reference

This section documents the low-level, class-based API. While the Functional API is recommended for most use cases, the class-based API provides full control and is used internally by the system.

### Actor

Base class for all actors.

```typescript
class MyActor extends Actor {
  // Called when actor starts. Receives spawn arguments.
  init(...args: any[]): void | Promise<void>;

  // Called when actor is stopped.
  terminate(): void | Promise<void>;

  // Handle synchronous requests (must return a value).
  handleCall(message: any): any | Promise<any>;

  // Handle asynchronous messages (fire-and-forget).
  handleCast(message: any): void | Promise<void>;

  // Reference to self for sending to other actors
  self: ActorRef;
}
```

### handleContinue

Called when `init()` returns `{ continue: data }` for deferred async initialization.

```typescript
class DatabaseActor extends Actor {
  private db!: DatabaseConnection;

  init() {
    // Return immediately, continue async work
    return { continue: "connect" };
  }

  async handleContinue(data: string) {
    if (data === "connect") {
      this.db = await connectToDatabase();
    }
  }
}
```

See `examples/high-level/handle_continue.ts` and `examples/low-level/handle_continue.ts` for more examples.

### Idle Timeout

Set a timeout that fires when the actor is idle (no messages received).

```typescript
class SessionActor extends Actor {
  init() {
    // Timeout after 30 seconds of inactivity
    this.setIdleTimeout(30000);
  }

  handleInfo(message: InfoMessage) {
    if (message.type === "timeout") {
      console.log(`Session idle for ${message.idleMs}ms, closing...`);
      this.exit(this.self, "normal");
    }
  }
}
```

**Methods:**

- `setIdleTimeout(timeoutMs: number): void` - Set idle timeout in milliseconds
- `getIdleTimeout(): number` - Get current idle timeout

The actor receives a `TimeoutMessage` via `handleInfo()` when the timeout fires.

See `examples/high-level/idle_timeout.ts` and `test/idle_timeout.test.ts` for more examples.

### Timers

Schedule delayed and periodic messages to yourself.

```typescript
class ReminderActor extends Actor {
  private reminderRef!: TimerRef;

  init() {
    // One-shot timer: remind after 5 seconds
    this.reminderRef = this.sendAfter({ type: "remind" }, 5000);
    
    // Periodic timer: tick every second
    this.sendInterval({ type: "tick" }, 1000);
  }

  handleCast(message: { type: string }) {
    if (message.type === "remind") {
      console.log("Time's up!");
    } else if (message.type === "tick") {
      console.log("Tick...");
    }
  }

  terminate() {
    // Clean up timers
    this.cancelTimer(this.reminderRef);
    this.cancelAllTimers();
  }
}
```

**Methods:**

- `sendAfter(message, delayMs): TimerRef` - Schedule one-shot message
- `sendInterval(message, intervalMs): TimerRef` - Schedule repeating message
- `cancelTimer(timerRef): boolean` - Cancel a specific timer
- `cancelAllTimers(): void` - Cancel all active timers

See `examples/high-level/timers.ts` and `examples/low-level/timers.ts` for more examples.

### Watching

Monitor another actor's lifecycle and receive notification when it terminates.

```typescript
class WorkerSupervisor extends Actor {
  private workerWatch!: WatchRef;

  init(workerRef: ActorRef) {
    // Start watching the worker
    this.workerWatch = this.watch(workerRef);
  }

  handleInfo(message: InfoMessage) {
    if (message.type === "down") {
      const down = message as DownMessage;
      console.log(`Worker ${down.actorRef.id} terminated: ${down.reason.type}`);
      
      // Unwatch when done
      this.unwatch(down.watchRef);
    }
  }
}
```

**Methods:**

- `watch(actorRef): WatchRef` - Start watching an actor
- `unwatch(watchRef): void` - Stop watching

**Behavior:**
- One-shot notification: You receive exactly one `DownMessage` when the watched actor terminates
- Auto-cleanup: The watch is automatically removed after the DOWN message is delivered
- Works across nodes: Can watch actors on remote nodes

See `examples/high-level/watching.ts` and `examples/low-level/actor_watching.ts` for more examples.

### Links

Bidirectional crash propagation between actors. If one linked actor crashes, the other crashes too (unless trapExit is enabled).

```typescript
class ParentActor extends Actor {
  private childLink!: LinkRef;

  init(childRef: ActorRef) {
    // Link to child - bidirectional crash propagation
    this.childLink = this.link(childRef);
    
    // Enable trap exit to receive ExitMessage instead of crashing
    this.setTrapExit(true);
  }

  handleInfo(message: InfoMessage) {
    if (message.type === "exit") {
      const exit = message as ExitMessage;
      console.log(`Linked actor exited: ${exit.reason.type}`);
      
      // Unlink when done
      this.unlink(exit.linkRef!);
    }
  }

  terminateChild() {
    // Send exit signal to linked actor
    this.exit(this.childLink.actorRef, "shutdown");
  }
}
```

**Methods:**

- `link(actorRef): LinkRef` - Create bidirectional link
- `unlink(linkRef): void` - Remove link
- `setTrapExit(trap: boolean): void` - Enable/disable exit trapping
- `isTrapExit(): boolean` - Check if exit trapping is enabled
- `exit(actorRef, reason?): void` - Send exit signal to actor

**Exit Reasons:**
- `"normal"` - No effect on linked actors
- `"kill"` - Always terminates, ignores trapExit
- Custom string - Delivered to linked actors with trapExit enabled

See `examples/high-level/links.ts` and `examples/low-level/actor_links.ts` for more examples.

### Message Stashing

Defer message processing until the actor is ready. Useful for state-dependent message handling.

```typescript
class StatefulActor extends Actor {
  private ready = false;
  private pendingMessages: any[] = [];

  init() {
    // Actor starts in "not ready" state
    this.ready = false;
  }

  async handleCall(message: any) {
    if (!this.ready) {
      // Stash message for later processing
      this.stash();
      return "stashed";
    }
    // Process message normally
    return this.processMessage(message);
  }

  setReady() {
    this.ready = true;
    // Replay all stashed messages
    this.unstashAll();
  }

  private processMessage(message: any) {
    return `Processed: ${message}`;
  }
}
```

**Methods:**

- `stash(): void` - Save current message to stash
- `unstash(): void` - Replay one stashed message (FIFO order)
- `unstashAll(): void` - Replay all stashed messages
- `clearStash(): void` - Discard all stashed messages

See `examples/high-level/message_stashing.ts` and `test/message_stashing.test.ts` for more examples.

### InfoMessage Types

System messages delivered via `handleInfo()`. Use type guards to distinguish between message variants.

```typescript
handleInfo(message: InfoMessage) {
  switch (message.type) {
    case "down":
      const down = message as DownMessage;
      console.log(`Actor ${down.actorRef.id} terminated: ${down.reason.type}`);
      break;
    case "exit":
      const exit = message as ExitMessage;
      console.log(`Linked actor exited: ${exit.reason.type}`);
      break;
    case "timeout":
      const timeout = message as TimeoutMessage;
      console.log(`Idle for ${timeout.idleMs}ms`);
      break;
    case "moved":
      const moved = message as MovedMessage;
      console.log(`Actor moved to ${moved.newNodeId}`);
      break;
  }
}
```

| Message | Type | Fields | Description |
|---------|------|--------|-------------|
| `DownMessage` | `"down"` | `watchRef`, `actorRef`, `reason` | Watched actor terminated |
| `ExitMessage` | `"exit"` | `linkRef?`, `actorRef`, `reason` | Linked actor exited (trapExit only) |
| `TimeoutMessage` | `"timeout"` | `idleMs` | Idle timeout fired |
| `MovedMessage` | `"moved"` | `watchRef?`, `linkRef?`, `actorRef`, `oldNodeId`, `newNodeId`, `newActorId` | Actor migrated to another node |

**TerminationReason:**
```typescript
type TerminationReason = 
  | { type: "normal" }
  | { type: "error"; error: any }
  | { type: "killed" };
```

### ActorRef

Location-transparent reference to an actor.

```typescript
// Request/response with timeout (default 5000ms)
const result = await actorRef.call(message, timeout?);

// Fire-and-forget
actorRef.cast(message);
```

### Agent

State management abstraction for simple key-value storage.

```typescript
// Create an agent
const counter = Agent.start(system, 0);

// Read state
const value = await counter.get();

// Update state (waits for completion)
await counter.update(n => n + 1);

// Fire-and-forget update
counter.cast(n => n + 1);

// Stop the agent
await counter.stop();
```

**Methods:**

- `Agent.start<T>(system, initialState, options?): Agent<T>` - Create an agent
- `get(timeout?): Promise<T>` - Get current state
- `update(fn, timeout?): Promise<T>` - Update state, returns new value
- `getAndUpdate(fn, timeout?): Promise<T>` - Update state, returns old value
- `cast(fn): void` - Fire-and-forget state update
- `stop(): Promise<void>` - Stop the agent
- `getRef(): ActorRef` - Access underlying actor reference

See `test/agent.test.ts` for more examples.

### DynamicSupervisor

On-demand supervised child spawning. Unlike static supervision trees where children are defined at init time, a DynamicSupervisor starts with zero children and allows adding them at runtime. All children are supervised with a one-for-one strategy.

```typescript
import { DynamicSupervisor } from "libeam";

// Start a dynamic supervisor
const dynSup = DynamicSupervisor.start(system);

// With options
const dynSup = DynamicSupervisor.start(system, {
  maxChildren: 100,  // Cap child count (default: Infinity)
  maxRestarts: 3,    // Per-child restart limit (default: 3)
  periodMs: 5000,    // Restart counting window (default: 5000)
});

// Named supervisor
const dynSup = DynamicSupervisor.start(system, {}, { name: "worker-pool" });

// Start children on demand
const workerRef = await dynSup.startChild(WorkerActor, { args: ["job-1"] });

// Functional actors return TypedActorRef with full type inference
const counter = await dynSup.startChild(Counter, { args: [0] });
await counter.call("get");       // fully typed!
counter.cast("set", 42);         // fully typed!

// Inspect children
const children = await dynSup.whichChildren();
// [{ ref: ActorRef, className: "WorkerActor", name?: string }]

const counts = await dynSup.countChildren();
// { specs: 2, active: 2 }

// Terminate a specific child
await dynSup.terminateChild(workerRef); // true if found, false otherwise

// Stop supervisor (cascades to all children)
await dynSup.stop();
```

**Methods:**

- `DynamicSupervisor.start(system, options?, spawnOptions?): DynamicSupervisor` - Create a supervisor
- `startChild(actorClass, options?): Promise<ActorRef | TypedActorRef>` - Spawn a supervised child
- `terminateChild(ref): Promise<boolean>` - Stop a child by ref
- `whichChildren(): Promise<ChildInfo[]>` - List active children with metadata
- `countChildren(): Promise<ChildCounts>` - Get child count statistics
- `stop(): Promise<void>` - Stop supervisor and all children
- `getRef(): ActorRef` - Access underlying actor reference

**Supervision behavior:**

- Children that crash are automatically restarted (one-for-one)
- If a child exceeds `maxRestarts` within `periodMs`, it is stopped permanently
- `startChild` throws `MaxChildrenError` when the `maxChildren` limit is reached
- Stopping the supervisor cascades to all children (depth-first termination)

See `test/dynamic_supervisor.test.ts` for more examples.

### GenStage

Demand-driven producer-consumer pipelines with back-pressure. Inspired by Elixir's [GenStage](https://hexdocs.pm/gen_stage). Consumers tell producers how many events they can handle; producers never emit more than requested.

```typescript
import { Producer, Consumer, ProducerConsumer, ConsumerSupervisor } from "libeam";

// Producer: emits sequential numbers on demand
const producer = Producer.start(system, {
  init: () => 0,
  handleDemand: (demand, counter) => {
    const events = Array.from({ length: demand }, (_, i) => counter + i);
    return [events, counter + demand];
  },
});

// Consumer: prints received events
const consumer = Consumer.start(system, {
  handleEvents: (events, _from, state) => {
    console.log("Received:", events);
    return state;
  },
});

// Subscribe with back-pressure options
await consumer.subscribe(producer.getRef(), { maxDemand: 100, minDemand: 50 });

// 3-stage pipeline with transformation
const multiplier = ProducerConsumer.start(system, {
  init: () => 10,
  handleEvents: (events, _from, factor) => {
    return [events.map(e => e * factor), factor];
  },
});

await multiplier.subscribe(producer.getRef(), { maxDemand: 50 });
await consumer.subscribe(multiplier.getRef(), { maxDemand: 50 });
```

**Stage Types:**

- `Producer` — Emits events in response to downstream demand. Buffers events when demand is zero.
- `Consumer` — Subscribes to producers and processes received events.
- `ProducerConsumer` — Receives events from upstream, transforms them, and dispatches downstream.
- `ConsumerSupervisor` — Subscribes to a producer and spawns a supervised worker per event. Back-pressure is tied to worker lifecycle.

**Producer Methods:**

- `Producer.start(system, callbacks, options?, spawnOptions?): Producer` — Create a producer
- `stop(): Promise<void>` — Stop the producer
- `demand(mode): void` — Switch demand mode (call with `"forward"` to resume after `demand: "accumulate"`)
- `getRef(): ActorRef` — Get the producer's ref (for consumer subscriptions)

**Consumer Methods:**

- `Consumer.start(system, callbacks, spawnOptions?): Consumer` — Create a consumer
- `subscribe(producerRef, options?): Promise<SubscriptionRef>` — Subscribe to a producer
- `cancel(ref): boolean` — Cancel a subscription
- `stop(): Promise<void>` — Stop and cancel all subscriptions
- `getRef(): ActorRef` — Get the consumer's ref

**ProducerConsumer Methods:**

- `ProducerConsumer.start(system, callbacks, producerOptions?, spawnOptions?): ProducerConsumer` — Create a stage
- `subscribe(producerRef, options?): Promise<SubscriptionRef>` — Subscribe to upstream
- `cancelUpstream(ref): boolean` — Cancel an upstream subscription
- `stop(): Promise<void>` — Stop and cancel all subscriptions
- `getRef(): ActorRef` — Get the stage's ref

**ConsumerSupervisor Methods:**

- `ConsumerSupervisor.start(system, childSpec, options?, spawnOptions?): ConsumerSupervisor` — Create a consumer supervisor
- `subscribe(producerRef, options?): Promise<SubscriptionRef>` — Subscribe to a producer
- `cancel(ref): boolean` — Cancel a subscription
- `whichChildren(): Promise<ChildInfo[]>` — List active worker children
- `countChildren(): Promise<ChildCounts>` — Get worker count statistics
- `stop(): Promise<void>` — Stop and cancel all subscriptions, terminate all workers
- `getRef(): ActorRef` — Get the supervisor's ref

**Subscription Options:**

- `maxDemand` — Max events in flight per subscription (default: 1000)
- `minDemand` — Threshold to request more events (default: 75% of maxDemand)
- `cancel` — Cancel behavior: `"permanent"` | `"transient"` | `"temporary"` (default)
- `partition` — Partition to subscribe to (required for PartitionDispatcher)

**Dispatcher Types:**

Producers can be configured with different dispatch strategies via `ProducerOptions.dispatcher`:

| Dispatcher | Description | Use Case |
|------------|-------------|----------|
| `{ type: "demand" }` | Sends to consumer with highest pending demand (default) | Work pools, load balancing |
| `{ type: "broadcast" }` | Sends all events to all consumers | Event buses, audit logging, fan-out |
| `{ type: "partition", partitions: N, hash? }` | Routes events by hash to fixed partitions | Ordered processing per key, sharding |

```typescript
// Default: DemandDispatcher (highest-demand-first)
const producer = Producer.start(system, callbacks);

// BroadcastDispatcher: all consumers get all events
const producer = Producer.start(system, callbacks, {
  dispatcher: { type: "broadcast" },
});

// PartitionDispatcher: hash-based routing
const producer = Producer.start(system, callbacks, {
  dispatcher: {
    type: "partition",
    partitions: 4,
    hash: (event) => event.userId % 4,  // route by user ID
  },
});

// Consumers subscribe to specific partitions
await consumer0.subscribe(producer.getRef(), { maxDemand: 10, partition: 0 });
await consumer1.subscribe(producer.getRef(), { maxDemand: 10, partition: 1 });
```

**BroadcastDispatcher behavior:**

- All consumers receive the same events (fan-out)
- Demand = `min(all consumer demands)` — slowest consumer throttles the pipeline
- Sequential subscribes are safe — initial demand is deferred to ensure all consumers register before events flow

**Demand Mode (`demand: "accumulate"`):**

For complex topologies where you need deterministic setup, start the producer in accumulate mode:

```typescript
const producer = Producer.start(system, callbacks, {
  dispatcher: { type: "broadcast" },
  demand: "accumulate",  // Pauses event production
});

// Subscribe all consumers (order doesn't matter)
await consumer1.subscribe(producer.getRef(), { maxDemand: 10 });
await consumer2.subscribe(producer.getRef(), { maxDemand: 10 });
await consumer3.subscribe(producer.getRef(), { maxDemand: 10 });

// Resume — now all consumers receive the same events
producer.demand("forward");
```

Inspired by Elixir's `{:producer, state, demand: :accumulate}`.

**PartitionDispatcher behavior:**

- One consumer per partition (subscribing to a taken partition throws)
- Hash function maps each event to a partition: `(event) => partitionIndex | null`
- Returning `null` from hash discards the event
- Events for partitions with no consumer are buffered per-partition
- Default hash: modulo for numbers, djb2 for strings, `event.key` for objects

**Back-pressure behavior:**

- Consumer sends initial demand of `maxDemand` on subscribe (deferred to next tick for broadcast safety)
- When pending demand drops to `minDemand`, consumer automatically re-asks
- Producer never emits more events than total demand from all consumers
- Excess events are buffered in the producer (configurable `bufferSize`, default: 10000)
- Multiple consumers receive events via the configured dispatcher

See `test/gen_stage.test.ts` for more examples.

**ConsumerSupervisor behavior:**

ConsumerSupervisor spawns one supervised worker per event. Demand is tied to worker lifecycle:

- On subscribe, sends initial demand of `maxDemand` to producer
- Each received event spawns a worker: `actorClass.init(...baseArgs, event)`
- `maxDemand` = max concurrent workers (never more active at once)
- When a worker exits (normal or crash), its demand slot is released
- Released slots accumulate; when they reach `minDemand`, more events are requested
- Workers are supervised with one-for-one strategy (configurable `maxRestarts`/`periodMs`)

```typescript
import { Producer, ConsumerSupervisor } from "libeam";

// Producer emits jobs
const producer = Producer.start(system, {
  init: () => 0,
  handleDemand: (demand, counter) => {
    const jobs = Array.from({ length: demand }, (_, i) => ({
      id: counter + i,
      payload: `task-${counter + i}`,
    }));
    return [jobs, counter + demand];
  },
});

// ConsumerSupervisor spawns a JobWorker per event
const supervisor = ConsumerSupervisor.start(system, {
  actorClass: JobWorker,
  args: ["base-config"],  // event appended as last arg
});

// max 10 concurrent workers, re-ask when 7 complete
await supervisor.subscribe(producer.getRef(), {
  maxDemand: 10,
  minDemand: 7,
});

// Inspect active workers
const workers = await supervisor.whichChildren();
const counts = await supervisor.countChildren();
```

### ActorSystem

Manages actor lifecycle on a node.

```typescript
const system = new ActorSystem(cluster, transport, registry, supervisionOptions?);

// Register actor classes for remote spawning
system.registerActorClass(MyActor);
system.registerActorClasses([ActorA, ActorB]);

// Spawn actors
const ref = system.spawn(MyActor, {
  name?: string,           // Optional registered name
  args?: any[],            // Arguments passed to init()
  strategy?: 'local' | 'round-robin'  // Placement strategy
});

// Stop an actor
await system.stop(actorRef);

// Start processing messages
await system.start();

// Check system state
system.isRunning();      // true if running and not shutting down
system.isShuttingDown(); // true if shutdown in progress

// Graceful shutdown
await system.shutdown({
  timeout: 5000,        // Max time to wait for actors (default: 5000ms)
  drainMailboxes: true  // Wait for pending messages (default: true)
});
```

### Supervision

Configure crash handling behavior:

```typescript
const system = new ActorSystem(cluster, transport, registry, {
  strategy: "Restart", // or 'Stop'
  maxRestarts: 3, // Max restarts within period
  periodMs: 5000, // Time window for restart counting
});
```

### Supervision Trees

Actors can spawn child actors, creating a supervision tree hierarchy. When a parent actor is stopped, all its children are automatically terminated first (cascading termination).

```typescript
class WorkerActor extends Actor {
  handleCall(message: any) {
    if (message.type === "work") {
      return `Processed: ${message.data}`;
    }
  }
  handleCast(message: any) {}
}

class SupervisorActor extends Actor {
  private workers: ActorRef[] = [];

  init(workerCount: number) {
    // Spawn child workers under this supervisor
    for (let i = 0; i < workerCount; i++) {
      const worker = this.spawn(WorkerActor, { name: `worker-${i}` });
      this.workers.push(worker);
    }
  }

  handleCall(message: any) {
    if (message.type === "get_worker_count") {
      return this.getChildren().length;
    }
    if (message.type === "dispatch") {
      // Round-robin to workers
      const worker = this.workers[message.index % this.workers.length];
      return worker.call({ type: "work", data: message.data });
    }
  }

  handleCast(message: any) {}
}

// Usage
const supervisor = system.spawn(SupervisorActor, { args: [3] });

// Supervisor has 3 child workers
const count = await supervisor.call({ type: "get_worker_count" }); // 3

// When supervisor is stopped, all workers are terminated first
await system.stop(supervisor); // Stops workers, then supervisor
```

#### Actor Context

Each actor has access to its context:

```typescript
class MyActor extends Actor {
  someMethod() {
    // Reference to parent actor (undefined for root actors)
    const parent = this.context.parent;

    // Set of child actor references
    const children = this.context.children;

    // Reference to the actor system
    const system = this.context.system;
  }
}
```

#### Child Management Methods

Actors have protected methods for managing children:

```typescript
class ParentActor extends Actor {
  handleCall(message: any) {
    if (message.type === "spawn_worker") {
      // Spawn a child actor
      const child = this.spawn(WorkerActor, {
        name: message.name,
        args: [message.config]
      });
      return child;
    }

    if (message.type === "stop_worker") {
      // Stop a specific child
      await this.stopChild(message.workerRef);
    }

    if (message.type === "list_workers") {
      // Get all children
      return this.getChildren();
    }
  }
}
```

#### Cascading Termination

When a parent is stopped:
1. All children are stopped recursively (depth-first)
2. Each child's `terminate()` is called
3. Children are removed from the system
4. Parent's `terminate()` is called last

```typescript
// Tree: root -> child1 -> grandchild
//            -> child2

await system.stop(rootRef);
// Termination order: grandchild, child1, child2, root
```

#### Child Supervision Strategies

Parent actors can define how their children should be supervised when they crash. Override the `childSupervision()` method to customize the behavior:

```typescript
import { Actor, ChildSupervisionOptions } from "libeam";

class MySupervisor extends Actor {
  // Override to customize child supervision
  childSupervision(): ChildSupervisionOptions {
    return {
      strategy: "one-for-all",  // or "one-for-one", "rest-for-one"
      maxRestarts: 3,           // Max restarts within period
      periodMs: 5000,           // Time window for restart counting
    };
  }

  init() {
    this.spawn(WorkerActor, { args: ["worker1"] });
    this.spawn(WorkerActor, { args: ["worker2"] });
    this.spawn(WorkerActor, { args: ["worker3"] });
  }

  handleCall(message: any) { return "ok"; }
  handleCast(message: any) {}
}
```

**Available Strategies:**

| Strategy | Behavior |
|----------|----------|
| `one-for-one` | Only restart the crashed child (default) |
| `one-for-all` | Restart all children if one crashes |
| `rest-for-one` | Restart the crashed child and all children spawned after it |

**one-for-one** (default): Isolates failures - only the crashed actor is restarted.

```typescript
// If worker2 crashes, only worker2 is restarted
// worker1 and worker3 are unaffected
```

**one-for-all**: Use when children have interdependencies and must be restarted together.

```typescript
// If any worker crashes, all workers are stopped and restarted
// Useful for tightly coupled processes (e.g., producer-consumer pairs)
```

**rest-for-one**: Use when children have ordered dependencies.

```typescript
// Children spawned in order: db -> cache -> api
// If cache crashes, cache and api are restarted (db is unaffected)
// If db crashes, all three are restarted
```

**Max Restarts:**

If a child exceeds `maxRestarts` within `periodMs`, it will be stopped permanently instead of restarted.

### Placement Strategies

Control where actors are spawned:

- `local`: Always spawn on the current node
- `round-robin`: Distribute across cluster members

Both strategies support an optional `role` filter. See [Node Roles](#node-roles) for details.

```typescript
// Spawn locally
system.spawn(MyActor, { strategy: "local" });

// Distribute across nodes
system.spawn(MyActor, { strategy: "round-robin" });

// Distribute only across nodes with the "worker" role
system.spawn(MyActor, { strategy: "round-robin", role: "worker" });
```

### Cluster Interface

Implement for custom cluster membership:

```typescript
interface Cluster {
  readonly nodeId: string;
  getMembers(): string[];
}
```

### Transport Interface

Implement for custom network transport:

```typescript
interface Transport {
  getNodeId(): string;
  connect(): Promise<void>;
  disconnect(): Promise<void>;

  // Point-to-point messaging
  request(nodeId: string, message: any, timeout: number): Promise<any>;
  send(nodeId: string, message: any): Promise<void>;

  // Pub/sub for registry propagation
  publish(topic: string, message: any): Promise<void>;
  subscribe(topic: string, handler: MessageHandler): Promise<Subscription>;

  // Message handlers
  onRequest(handler: RequestHandler): void;
  onMessage(handler: MessageHandler): void;

  // Peer management
  updatePeers(peers: Array<[nodeId: string, address: string]>): void;
}
```

## Example: Chat Application

A complete example showing actors communicating across nodes.

### Functional API (Recommended)

```typescript
import { createSystem, createActor, ActorRef } from "libeam";

const ChatRoom = createActor((ctx, self) => {
  const participants = new Map<string, ActorRef>();

  return self
    .onCall("getParticipants", () => Array.from(participants.keys()))
    .onCast("join", (name: string, ref: ActorRef) => {
      participants.set(name, ref);
      broadcast(`${name} joined the chat`);
    })
    .onCast("message", (from: string, text: string) => {
      broadcast(`[${from}] ${text}`);
    });

  function broadcast(text: string) {
    for (const ref of participants.values()) {
      ref.cast({ method: "notify", args: [text] });
    }
  }
});

const User = createActor((ctx, self, name: string, roomRef: ActorRef) => {
  // Join room on init (roomRef is untyped, so use raw message format)
  roomRef.cast({ method: "join", args: [name, ctx.self] });

  return self.onCast("notify", (text: string) => {
    console.log(`[${name}] ${text}`);
  });
});

// Usage:
const system = createSystem();
const room = system.spawn(ChatRoom);
system.spawn(User, { args: ["Alice", room] });
system.spawn(User, { args: ["Bob", room] });

// TypedActorRef — typed call/cast works directly on room
const members = await room.call("getParticipants"); // ["Alice", "Bob"]
```

### Class-Based API

```typescript
import {
  Actor,
  ActorRef,
  ActorSystem,
  InMemoryTransport,
  LocalRegistry,
  Cluster,
} from "libeam";

class ChatRoomActor extends Actor {
  private participants = new Map<string, ActorRef>();

  handleCast(
    message:
      | { type: "join"; name: string; ref: ActorRef }
      | { type: "message"; from: string; text: string },
  ) {
    if (message.type === "join") {
      this.participants.set(message.name, message.ref);
      this.broadcast(`${message.name} joined the chat`);
    } else if (message.type === "message") {
      this.broadcast(`[${message.from}] ${message.text}`);
    }
  }

  private broadcast(text: string) {
    for (const ref of this.participants.values()) {
      ref.cast({ type: "notification", text });
    }
  }
}

class UserActor extends Actor {
  private name = "";

  init(name: string, roomRef: ActorRef) {
    this.name = name;
    roomRef.cast({ type: "join", name, ref: this.self });
  }

  handleCast(message: { type: "notification"; text: string }) {
    console.log(`[${this.name}] ${message.text}`);
  }
}
```

Run the full example:

```bash
pnpm example:chat
```

## Graceful Shutdown

Proper shutdown ensures actors terminate cleanly and cluster peers are notified.

### Functional API (Recommended)

When using `createSystem`, a single call handles the entire shutdown sequence:

```typescript
const system = createSystem();
// ... spawn actors ...
await system.shutdown(); // Terminates actors, leaves cluster, disconnects transport
```

### Class-Based API

When manually wiring components, you must shut them down in order:

```typescript
async function shutdownNode(system, cluster, transport) {
  // 1. Shutdown actor system (terminates actors, unregisters names)
  await system.shutdown({
    timeout: 5000,        // Wait up to 5s for actors to terminate
    drainMailboxes: true  // Process pending messages first
  });

  // 2. Leave cluster gracefully (notifies peers)
  await cluster.leave();  // Broadcasts "leaving" status to peers

  // 3. Disconnect transport
  await transport.disconnect();
}

// Handle process signals
process.on("SIGTERM", () => shutdownNode(system, cluster, transport));
process.on("SIGINT", () => shutdownNode(system, cluster, transport));
```

### Shutdown Sequence

1. **ActorSystem.shutdown()**: Stops accepting new spawns, drains mailboxes, calls `terminate()` on all actors, unregisters named actors
2. **DistributedCluster.leave()**: Broadcasts "leaving" status to peers so they immediately remove this node from membership (instead of waiting for failure timeout)
3. **Transport.disconnect()**: Closes network connections

### Shutdown Options

```typescript
interface ShutdownOptions {
  timeout?: number;       // Max ms to wait for actors (default: 5000)
  drainMailboxes?: boolean; // Wait for pending messages (default: true)
}
```

## Cluster Readiness

Wait for the cluster to form before spawning actors that depend on peer connectivity:

```typescript
const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-secret",
});

// Wait for at least one peer to join (default: minMembers=2, timeout=30s)
await system.waitForCluster();

// Wait for a specific number of members
await system.waitForCluster({ minMembers: 3, timeout: 15000 });

// Wait for specific nodes
await system.waitForCluster({ nodes: ["gateway", "worker-1"] });

// Both conditions must be met (AND)
await system.waitForCluster({ minMembers: 3, nodes: ["gateway"] });
```

Or inline with system creation using the `ready` option:

```typescript
const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-secret",
  ready: { minMembers: 3, timeout: 15000 },
});
// System is guaranteed to have 3 members when createSystem resolves
```

`waitForCluster` uses gossip-based membership detection. If the timeout is reached before conditions are met, a `TimeoutError` is thrown. For local (non-distributed) systems, `waitForCluster()` resolves immediately.

## Node Roles

Nodes can declare roles to support heterogeneous clusters. Roles are propagated via gossip and used during actor placement to ensure actors land on appropriate nodes.

### Declaring Roles

```typescript
// This node is a gateway
const gateway = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  cookie: "my-secret",
  roles: ["gateway"],
});

// This node is a worker
const worker = await createSystem({
  type: "distributed",
  port: 5010,
  seedNodes: ["127.0.0.1:5002"],
  cookie: "my-secret",
  roles: ["worker", "compute"],
});
```

A node can have multiple roles. Roles are immutable — set once at startup and propagated to all peers via gossip.

### Role-Based Placement

The `role` option on `spawn()` filters candidate nodes before the placement strategy selects one:

```typescript
// Spawn on any node with the "worker" role (round-robin across workers)
const ref = system.spawn(MyActor, { strategy: "round-robin", role: "worker" });

// Assert the local node has the "gateway" role before spawning locally
const ref = system.spawn(Router, { role: "gateway" });
```

If no nodes in the cluster have the required role, a `NoRoleMatchError` is thrown immediately.

### Querying Roles

The cluster interface exposes role-based membership queries:

```typescript
// Get all nodes with a specific role
const workers = system.cluster.getMembersByRole("worker");
// ["node-abc", "node-def"]
```

### How It Works

Roles are stored in each node's gossip state (`PeerState.roles`) and propagated automatically via the existing gossip protocol. The placement engine filters `cluster.getMembersByRole(role)` before applying the selected strategy (`local` or `round-robin`). No additional network overhead — roles piggyback on the existing heartbeat cycle.

## Process Groups

Process groups provide named, dynamic groupings of actors. Any actor can join or leave groups at runtime, and you can broadcast messages to all members of a group. Inspired by Erlang's `pg` module.

### Basic Usage

```typescript
import { createSystem, createActor } from "libeam";

const Worker = createActor((ctx, self, id: string) => {
  return self
    .onCall("id", () => id)
    .onCast("work", (task: string) => {
      console.log(`Worker ${id} processing: ${task}`);
    });
});

const system = createSystem();

const w1 = system.spawn(Worker, { args: ["a"] });
const w2 = system.spawn(Worker, { args: ["b"] });
const w3 = system.spawn(Worker, { args: ["c"] });

// Add actors to a group
system.joinGroup("workers", w1);
system.joinGroup("workers", w2);
system.joinGroup("workers", w3);

// An actor can be in multiple groups
system.joinGroup("priority", w1);

// Get all members of a group
const members = system.getGroup("workers"); // [ActorRef, ActorRef, ActorRef]

// Broadcast a cast message to all members
system.broadcast("workers", { method: "work", args: ["job-42"] });

// Remove an actor from a group
system.leaveGroup("workers", w2);

// Groups are cleaned up automatically when actors stop
await system.stop(w1); // removed from "workers" and "priority"
```

### API

- `joinGroup(group, ref): void` — Add an actor to a named group. Idempotent — joining the same group twice has no effect.
- `leaveGroup(group, ref): void` — Remove an actor from a group. Safe to call even if the actor is not in the group.
- `getGroup(group): ActorRef[]` — Get all actor refs in a group. Returns `[]` for unknown groups.
- `broadcast(group, message): void` — Send a cast message to every actor in the group.

### Behavior

- **Auto-cleanup**: When an actor is stopped, it is automatically removed from all groups.
- **Distributed**: Group membership changes are propagated across nodes via the transport pub/sub layer. When a node leaves the cluster, all its members are removed from all groups.
- **Idempotent joins**: Calling `joinGroup` with the same actor and group multiple times is safe — only one membership entry is stored.

See `test/process_group.test.ts` for more examples.

## Authentication

Distributed systems can be secured with cookie-based authentication, inspired by Erlang's distribution cookie. When configured, nodes verify each other using HMAC-SHA256 signatures on gossip messages and CurveZMQ encryption on transport connections.

### Cookie Configuration

The cookie must be at least 16 characters for secure key derivation:

```typescript
const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-cluster-secret", // ≥16 characters
});
```

All nodes in a cluster must share the same cookie. Nodes with different cookies cannot communicate — gossip messages are silently dropped and transport connections are rejected.

### Distributed Configuration with Salt

For distributed setups, you can optionally customize the key derivation salt:

```typescript
const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: ["127.0.0.1:6002"],
  cookie: "my-cluster-secret",
  salt: "custom-salt", // Optional, default: "libeam-v2"
});
```

The salt is used in HKDF key derivation to separate gossip HMAC keys from CurveZMQ keypairs.

### Environment Variable

Instead of passing the cookie in code, set the `LIBEAM_COOKIE` environment variable:

```bash
LIBEAM_COOKIE=my-cluster-secret npx tsx app.ts
```

The precedence order is: `auth` option > `cookie` option > `LIBEAM_COOKIE` env var. If none are set, the system starts in open mode with a warning logged.

### Custom Authenticator

For advanced use cases, implement the `Authenticator` interface:

```typescript
import { Authenticator, CookieAuthenticator } from "libeam";

const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  auth: new CookieAuthenticator("my-secret"),
});
```

The `Authenticator` interface covers gossip message signing and verification (HMAC-SHA256). Transport-level security (encryption + authentication) is handled by CurveZMQ at the socket layer, driven by the cookie. See `src/auth.ts` for the full interface and `deriveKeys`, `z85Encode`, `z85Decode` utilities for advanced use cases.


### Cookie Rotation

Cookies can be rotated without restarting the cluster using the Consul-style keyring API. The rotation is operator-driven in three steps:

```typescript
const system = await createSystem({
  type: "distributed",
  port: 5000,
  seedNodes: [],
  cookie: "old-cluster-secret",
});

// Step 1: Install the new cookie on ALL nodes (gossip accepts both keys)
system.keyring!.install("new-cluster-secret");

// Step 2: Switch each node to the new cookie (recreates transport sockets)
await system.keyring!.use();

// Step 3: Remove the old cookie from the keyring on ALL nodes
system.keyring!.remove();
```

**Recommended procedure:**

1. **Install** the new cookie on every node. Order doesn't matter. After this step, gossip messages signed with either the old or new key are accepted.
2. **Use** the new cookie on each node (sequentially or in parallel). This triggers a brief transport reconnection window (~milliseconds) as ZeroMQ sockets are recreated with the new CurveZMQ keys. In-flight RPC requests during `use()` will fail with a `TransportError` — callers should retry.
3. **Remove** the old cookie from the keyring on every node. After this step, only the new key is accepted.

**Behavior during rotation:**

- Gossip continues uninterrupted during `install()` and `remove()`. Both keys are accepted.
- Transport has a brief reconnection window during `use()`. Pending requests are rejected with `TransportError`.
- Messages are delivered at-most-once. No duplicates are introduced by the rotation.
- `keyring` is only available on distributed systems. Local systems have `system.keyring === undefined`.

**Inspecting the keyring:**

```typescript
// List key fingerprints (SHA-256 hex of public key, never raw cookies)
console.log(system.keyring!.list());
// Before install: ["a1b2c3..."]
// After install:  ["a1b2c3...", "d4e5f6..."]
// After remove:   ["d4e5f6..."]
```

### Known Limitations

| Limitation | Details |
|------------|---------|
| Gossip UDP is authenticated but not encrypted | HMAC-SHA256 proves identity but does not encrypt message payloads. Use network-level isolation (VPN/firewall) for defense in depth. |
| CurveZMQ failures are silent | Wrong cookie = messages dropped with no error. Ensure all nodes share the same cookie. |
| v2 auth is not wire-compatible with v1 | Nodes running v1 and v2 cannot communicate. Upgrade all nodes together. |

## Logging

Libeam includes a structured logging system with configurable log levels and handlers.

### Configuration

```typescript
import { loggerConfig } from "libeam";

// Set log level (debug, info, warn, error, none)
loggerConfig.level = "debug";

// Custom log handler
loggerConfig.handler = (entry) => {
  // entry: { level, message, context, timestamp, error? }
  console.log(JSON.stringify(entry));
};
```

### Log Levels

- `debug`: Detailed debugging information
- `info`: General operational messages
- `warn`: Warning conditions
- `error`: Error conditions
- `none`: Disable all logging

### Component Loggers

Each component creates its own logger with context:

```typescript
import { createLogger } from "libeam";

const log = createLogger("MyComponent", nodeId);
log.info("Operation completed", { duration: 100 });
log.error("Operation failed", error, { operationId: "123" });
```

## Telemetry

Libeam includes a lightweight telemetry/instrumentation system inspired by Elixir's [`:telemetry`](https://hexdocs.pm/telemetry) library. It provides synchronous event emission with zero overhead when no handlers are attached — safe for hot paths like message processing.

### Basic Usage

```typescript
import { telemetry, TelemetryEvents } from "libeam";

// Attach a handler to actor lifecycle events
const handlerId = telemetry.attach("my-metrics", [
  TelemetryEvents.actor.spawn,
  [...TelemetryEvents.actor.stop, "stop"],
], (eventName, measurements, metadata) => {
  console.log(`Event: ${eventName.join(".")}`  , measurements, metadata);
});

// Detach when done
telemetry.detach(handlerId);
```

### Span API

Wrap a function in start/stop/exception telemetry events with automatic duration measurement:

```typescript
// Emits ["myapp", "db", "query", "start"] before, ["...", "stop"] after
const result = telemetry.span(
  ["myapp", "db", "query"],
  { table: "users" },
  () => db.query("SELECT * FROM users")
);
```

### Event Catalog

All events emitted by libeam:

| Event | Measurements | Metadata |
|-------|-------------|----------|
| `libeam.actor.spawn` | — | `actor_id`, `actor_class`, `name`, `node_id`, `parent_id?` |
| `libeam.actor.init.stop` | `duration_ms` | `actor_id`, `actor_class` |
| `libeam.actor.init.exception` | `duration_ms` | `actor_id`, `actor_class`, `error` |
| `libeam.actor.stop.stop` | `duration_ms` | `actor_id`, `name`, `children_stopped` |
| `libeam.actor.handle_call.start` | `system_time` | `actor_id` |
| `libeam.actor.handle_call.stop` | `duration_ms` | `actor_id` |
| `libeam.actor.handle_call.exception` | `duration_ms` | `actor_id`, `error` |
| `libeam.actor.handle_cast.start` | `system_time` | `actor_id` |
| `libeam.actor.handle_cast.stop` | `duration_ms` | `actor_id` |
| `libeam.actor.handle_cast.exception` | `duration_ms` | `actor_id`, `error` |
| `libeam.supervisor.crash` | — | `actor_id`, `error`, `strategy` |
| `libeam.supervisor.restart` | — | `actor_id`, `new_actor_id`, `attempt`, `strategy` |
| `libeam.supervisor.max_restarts` | `count` | `actor_id`, `max_restarts`, `period_ms` |
| `libeam.gen_stage.subscribe` | — | `producer_id`, `consumer_tag` |
| `libeam.gen_stage.cancel` | — | `producer_id`, `consumer_tag`, `reason` |
| `libeam.gen_stage.dispatch` | `event_count` | `producer_id` |
| `libeam.gen_stage.buffer_overflow` | `dropped_count`, `buffer_size` | `producer_id` |
| `libeam.mailbox.overflow` | — | `actor_id`, `message_type` |
| `libeam.cluster.join` | — | `peer_id`, `node_id` |
| `libeam.cluster.leave` | — | `peer_id`, `node_id` |
| `libeam.system.shutdown.stop` | `duration_ms`, `actor_count` | `node_id` |

### Metrics Integration

```typescript
import { telemetry, TelemetryEvents } from "libeam";

// Simple metrics collector
const metrics = {
  actorsSpawned: 0,
  actorsStopped: 0,
  callDurations: [] as number[],
  crashes: 0,
};

telemetry.attach("prometheus", [
  TelemetryEvents.actor.spawn,
  [...TelemetryEvents.actor.stop, "stop"],
  [...TelemetryEvents.actor.handleCall, "stop"],
  TelemetryEvents.supervisor.crash,
], (event, measurements) => {
  const name = event.join(".");
  if (name === "libeam.actor.spawn") metrics.actorsSpawned++;
  if (name === "libeam.actor.stop.stop") metrics.actorsStopped++;
  if (name === "libeam.actor.handle_call.stop") metrics.callDurations.push(measurements.duration_ms);
  if (name === "libeam.supervisor.crash") metrics.crashes++;
});
```

### Zero Overhead

When no handlers are attached, telemetry calls are effectively free — a single Map lookup that returns immediately. Hot paths like message processing (`handleCall`/`handleCast`) additionally gate on `hasHandlers()`, ensuring zero object allocation overhead.

### API

| Method | Description |
|--------|-------------|
| `telemetry.attach(id, eventNames, handler)` | Attach handler to events. Returns handler id. |
| `telemetry.detach(id)` | Remove handler. Returns true if found. |
| `telemetry.execute(eventName, measurements, metadata)` | Emit event synchronously. No-op if no handlers. |
| `telemetry.span(eventName, metadata, fn)` | Wrap function in start/stop/exception events. |
| `telemetry.hasHandlers(eventName)` | Check if any handlers are attached. |
| `telemetry.reset()` | Remove all handlers (test cleanup). |

## Error Handling

Libeam provides typed error classes for better error handling:

```typescript
import {
  LibeamError,
  ActorNotFoundError,
  RegistryLookupError,
  TimeoutError,
  SystemShuttingDownError,
  TransportError,
  PeerNotFoundError,
} from "libeam";

try {
  await actorRef.call({ type: "get" });
} catch (err) {
  if (err instanceof TimeoutError) {
    console.log(`Timed out after ${err.context?.timeoutMs}ms`);
  } else if (err instanceof ActorNotFoundError) {
    console.log(`Actor ${err.context?.actorId} not found`);
  }
}
```

### Error Types

| Error | Code | Description |
|-------|------|-------------|
| `ActorNotFoundError` | `ACTOR_NOT_FOUND` | Actor does not exist |
| `RegistryLookupError` | `REGISTRY_LOOKUP_FAILED` | Named actor not in registry |
| `TimeoutError` | `TIMEOUT` | Operation timed out |
| `SystemShuttingDownError` | `SYSTEM_SHUTTING_DOWN` | System is shutting down |
| `TransportError` | `TRANSPORT_ERROR` | Network transport failure |
| `PeerNotFoundError` | `PEER_NOT_FOUND` | Peer node not known |
| `ActorClassNotRegisteredError` | `ACTOR_CLASS_NOT_REGISTERED` | Actor class not registered for remote spawn |
| `AuthenticationError` | `AUTHENTICATION_FAILED` | Node authentication failed |
| `NoRoleMatchError` | `NO_ROLE_MATCH` | No nodes have the required role |
| `ActorNotMigratableError` | `ACTOR_NOT_MIGRATABLE` | Actor doesn't implement `Migratable` |
| `ActorHasChildrenError` | `ACTOR_HAS_CHILDREN` | Actor has child actors |
| `MaxChildrenError` | `MAX_CHILDREN` | DynamicSupervisor child limit reached |

## Health Checks

Libeam provides health check support for monitoring system status, useful for Kubernetes probes and monitoring systems.

### Component Health

Both `ActorSystem` and `DistributedCluster` implement `HealthCheckable`:

```typescript
import { ActorSystem, DistributedCluster } from "libeam";

// Get health from individual components
const systemHealth = system.getHealth();
// {
//   name: "ActorSystem",
//   status: "healthy",
//   message: "System is healthy",
//   details: { actorCount: 5, totalMailboxSize: 0, registeredClasses: 3 }
// }

const clusterHealth = cluster.getHealth();
// {
//   name: "Cluster",
//   status: "healthy",
//   message: "Connected to 2 peer(s)",
//   details: { peerCount: 3, peers: ["node1", "node2", "node3"] }
// }
```

### Health Aggregator

Use `HealthAggregator` to combine health from multiple components:

```typescript
import { HealthAggregator } from "libeam";

const health = new HealthAggregator(nodeId);
health.register("actorSystem", system);
health.register("cluster", cluster);

// Full health report
const report = await health.getHealth();
// {
//   status: "healthy",
//   timestamp: Date,
//   nodeId: "node1",
//   uptimeMs: 123456,
//   components: [...]
// }

// For Kubernetes probes
app.get("/health/live", (req, res) => {
  res.status(health.isAlive() ? 200 : 503).send();
});

app.get("/health/ready", async (req, res) => {
  const ready = await health.isReady();
  res.status(ready ? 200 : 503).send();
});

app.get("/health", async (req, res) => {
  const report = await health.getHealth();
  res.status(report.status === "unhealthy" ? 503 : 200).json(report);
});
```

### Health Status

| Status | Description |
|--------|-------------|
| `healthy` | Component is functioning normally |
| `degraded` | Component is working but with issues (e.g., shutting down, high load) |
| `unhealthy` | Component is not functioning |

## Actor Migration

Actors can be migrated between nodes while preserving their state and pending messages.

### Making Actors Migratable

Implement the `Migratable` interface to enable migration:

```typescript
import { Actor, Migratable } from "libeam";

interface CounterState {
  count: number;
  history: string[];
}

class CounterActor extends Actor implements Migratable {
  private count = 0;
  private history: string[] = [];

  handleCall(message: any): number {
    if (message.type === "get") return this.count;
    if (message.type === "increment") return ++this.count;
    return 0;
  }

  handleCast(message: any): void {
    if (message.type === "reset") this.count = 0;
  }

  getState(): CounterState {
    return { count: this.count, history: [...this.history] };
  }

  setState(state: CounterState): void {
    this.count = state.count;
    this.history = [...state.history];
  }
}
```

### Migrating an Actor

Use `system.migrate()` to move an actor to another node:

```typescript
const ref = system.spawn(CounterActor, { name: "my-counter", args: [100] });

ref.cast({ type: "increment" });
ref.cast({ type: "increment" });

const result = await system.migrate("my-counter", "node2");

if (result.success) {
  console.log(`Migrated to ${result.newNodeId}`);
  console.log(`New actor ID: ${result.newActorId}`);
} else {
  console.log(`Migration failed: ${result.error}`);
}
```

### Migration Process

1. **Pause mailbox** - Stop processing new messages on source node
2. **Reserve name** - Target node reserves the actor name (with TTL)
3. **Drain mailbox** - Collect pending messages
4. **Serialize state** - Call `getState()` on source actor
5. **Transfer** - Send state and pending messages to target
6. **Restore** - Create actor on target, call `init()` then `setState()`
7. **Inject messages** - Re-queue pending messages on target
8. **Notify watchers** - Send `MovedMessage` to all watchers/linked actors
9. **Cleanup** - Remove actor from source node

### Watcher Notifications

Actors watching a migrating actor receive a `MovedMessage`:

```typescript
class ObserverActor extends Actor {
  handleInfo(message: InfoMessage): void {
    if (message.type === "moved") {
      const moved = message as MovedMessage;
      console.log(`Actor moved: ${moved.oldNodeId} → ${moved.newNodeId}`);
      console.log(`New actor ID: ${moved.newActorId}`);
    }
  }
}
```

### Constraints

| Constraint | Reason |
|------------|--------|
| Actor must implement `Migratable` | State serialization required |
| Actor cannot have children | Child actors would be orphaned |
| State must be JSON-serializable | Transferred over network |
| Target node must have actor class registered | Cannot instantiate unknown class |

### Error Types

| Error | Code | Description |
|-------|------|-------------|
| `ActorNotMigratableError` | `ACTOR_NOT_MIGRATABLE` | Actor doesn't implement `Migratable` |
| `ActorHasChildrenError` | `ACTOR_HAS_CHILDREN` | Actor has child actors |
| `MigrationFailedError` | `MIGRATION_FAILED` | General migration failure |
| `NameReservationError` | `NAME_RESERVATION_FAILED` | Could not reserve name on target |

Run the migration example:

```bash
npx ts-node examples/low-level/actor_migration.ts
```

## CLI

libeam ships a CLI for managing clusters — start/stop nodes, rolling deploys with zero-downtime actor migration, drain, and status.

```bash
# Run without installing
npx libeam --help

# Or install globally
npm install -g libeam
libeam --help
```

### Config File

Create a `libeam.config.ts` in your project root (or run `libeam init`):

```typescript
// libeam.config.ts
export default {
  cluster: {
    cookie: process.env.LIBEAM_COOKIE ?? "change-me-in-production",
    seedNodes: [],
  },
  nodes: {
    gateway: {
      entry: "./src/gateway.ts",
      roles: ["gateway"],
      count: 1,
      port: 5000,
    },
    worker: {
      entry: "./src/worker.ts",
      roles: ["worker"],
      count: 3,
      port: "auto",
    },
  },
  deploy: {
    strategy: "rolling",
    drainTimeout: 30000,
    healthCheck: {
      interval: 1000,
      retries: 10,
    },
  },
};
```

Each node entry module exports a default function that creates and returns a system:

```typescript
// src/worker.ts
import { createSystem, createActor } from "libeam";

export default async function ({ port, cookie, seedNodes, roles }) {
  const system = await createSystem({
    type: "distributed",
    port,
    cookie,
    seedNodes,
    roles,
  });

  const Worker = createActor((ctx, self) => {
    return self.onCall("ping", () => "pong");
  });
  system.spawn(Worker, { name: "worker" });

  return system;
}
```

### Commands

| Command | Description |
|---------|-------------|
| `libeam init` | Scaffold a `libeam.config.ts` |
| `libeam start` | Start cluster nodes from config |
| `libeam start --role=worker` | Start only nodes with a specific role |
| `libeam start --daemon` | Start in background |
| `libeam stop` | Graceful shutdown |
| `libeam status` | Show cluster health and node status |
| `libeam nodes` | List cluster members |
| `libeam actors` | List actors across the cluster |
| `libeam actors --migratable` | Show only migratable actors |
| `libeam drain <node> --target=<node>` | Drain node, migrate actors to target |
| `libeam migrate <actor> <target>` | Migrate a single actor |
| `libeam migrate --all --from=<n> --to=<n>` | Bulk migrate all migratable actors |
| `libeam deploy --role=worker` | Rolling deploy for a role |
| `libeam deploy --all` | Rolling deploy entire cluster |
| `libeam deploy --all --dry-run` | Preview deploy without executing |
| `libeam eval "expression"` | Run JS expression with libeam available |

### Rolling Deploy

`libeam deploy` performs zero-downtime rolling deployments with automatic actor migration:

```
libeam deploy --role=worker

For each old worker node:
  1. Start new instance (new code)
  2. Health check — wait until ready
  3. Drain old instance → migrate actors to new
  4. Stop old instance
  5. Repeat for next node
```

Use `--dry-run` to preview the deployment plan without executing it.

## Docker Deployment

libeam clusters can be deployed as Docker containers, with one container per server. This is ideal for multi-server setups managed by orchestrators like [Coolify](https://coolify.io), Portainer, or plain Docker Compose.

> **Single-server clusters**: If all nodes run on one machine, prefer `libeam start` (the CLI supervisor) over Docker — it avoids unnecessary container overhead and networking complexity.

### Port Convention

Each libeam node uses 3 consecutive ports:

| Port | Protocol | Purpose |
|------|----------|---------|
| `N` | TCP | ZeroMQ RPC (ROUTER socket) |
| `N+1` | TCP | ZeroMQ PUB (event broadcasts) |
| `N+2` | UDP | Gossip (cluster membership) |

When you pass `port: 5000` to `createSystem`, it binds RPC on 5000, PUB on 5001, and gossip on 5002.

### Environment Variables

| Variable | Description | Example |
|----------|-------------|---------|
| `LIBEAM_COOKIE` | Cluster authentication secret | `my-cluster-secret` |
| `LIBEAM_PORT` | Base port (RPC, then +1 PUB, +2 gossip) | `5000` |
| `LIBEAM_ADVERTISE_ADDRESS` | Address this node advertises to peers | `10.0.0.1` |
| `LIBEAM_SEED_NODES` | Comma-separated gossip addresses to join | `10.0.0.1:5002,10.0.0.2:5002` |
| `LIBEAM_ROLES` | Comma-separated node roles | `worker,compute` |
| `LIBEAM_NODE_NAME` | Human-readable node identifier | `worker-1` |

`createSystem` reads `LIBEAM_COOKIE` and `LIBEAM_ADVERTISE_ADDRESS` from the environment automatically — explicit config values take precedence.

### Entry Module

Each container runs an entry module that creates and returns a system:

```typescript
// src/worker.ts
import { createSystem, createActor } from "libeam";
import { createServer } from "http";

export default async function start() {
  const system = await createSystem({
    type: "distributed",
    port: parseInt(process.env.LIBEAM_PORT ?? "5000"),
    seedNodes: process.env.LIBEAM_SEED_NODES?.split(",").filter(Boolean) ?? [],
    roles: process.env.LIBEAM_ROLES?.split(",").filter(Boolean),
    // LIBEAM_COOKIE and LIBEAM_ADVERTISE_ADDRESS are read from env automatically
  });

  // Spawn actors...
  const Worker = createActor((ctx, self) => {
    return self.onCall("ping", () => "pong");
  });
  system.spawn(Worker, { name: "worker" });

  // Health endpoint (required for orchestrator health checks)
  createServer((req, res) => {
    if (req.url === "/health") {
      const health = system.system.getHealth();
      res.writeHead(health.status === "healthy" ? 200 : 503);
      res.end(JSON.stringify(health));
    }
  }).listen(3000);

  return system;
}

// Run when executed directly
start();
```

### Dockerfile

```dockerfile
FROM node:22-alpine AS builder
WORKDIR /app
COPY package.json pnpm-lock.yaml ./
RUN corepack enable && pnpm install --frozen-lockfile
COPY . .
RUN pnpm build

FROM node:22-alpine
RUN apk add --no-cache curl
WORKDIR /app
COPY --from=builder /app/dist ./dist
COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/package.json ./
EXPOSE 5000 5001 5002/udp 3000
CMD ["node", "dist/worker.js"]
```

### docker-compose.yml

For a 3-node cluster (1 gateway + 2 workers), using `network_mode: host` for the simplest cross-server networking:

```yaml
services:
  libeam:
    build: .
    network_mode: host
    environment:
      - LIBEAM_COOKIE=${LIBEAM_COOKIE:?}
      - LIBEAM_PORT=${LIBEAM_PORT:-5000}
      - LIBEAM_ADVERTISE_ADDRESS=${SERVER_PRIVATE_IP:?}
      - LIBEAM_SEED_NODES=${SEED_NODES:-}
      - LIBEAM_ROLES=${NODE_ROLE:-worker}
      - LIBEAM_NODE_NAME=${NODE_NAME:-node}
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 10s
      timeout: 3s
      retries: 3
    restart: unless-stopped
```

Deploy this same compose file to each server, with per-server environment variables:

```bash
# Server 1 (gateway) — 10.0.0.1
LIBEAM_COOKIE=change-me-in-production
LIBEAM_PORT=5000
SERVER_PRIVATE_IP=10.0.0.1
SEED_NODES=
NODE_ROLE=gateway
NODE_NAME=gateway

# Server 2 (worker) — 10.0.0.2
LIBEAM_COOKIE=change-me-in-production
LIBEAM_PORT=5000
SERVER_PRIVATE_IP=10.0.0.2
SEED_NODES=10.0.0.1:5002
NODE_ROLE=worker
NODE_NAME=worker-1

# Server 3 (worker) — 10.0.0.3
LIBEAM_COOKIE=change-me-in-production
LIBEAM_PORT=5000
SERVER_PRIVATE_IP=10.0.0.3
SEED_NODES=10.0.0.1:5002,10.0.0.2:5002
NODE_ROLE=worker
NODE_NAME=worker-2
```

### Why `network_mode: host`?

With one container per server, host networking is the simplest option:

- **No port mapping needed** — the container binds directly to the host's network interfaces
- **UDP gossip works natively** — no bridge NAT translation issues
- **`advertiseAddress` is the server's real IP** — peers can reach it directly
- **No port conflicts** — there's only one libeam container per server

Bridge networking works too, but requires explicit port mapping (`ports: - "5000:5000"`, `"5002:5002/udp"`) and is unnecessary overhead when running one container per host.

### Firewall Rules

Open these ports between your servers (private network):

| Port | Protocol | Direction |
|------|----------|-----------|
| `5000` | TCP | Bidirectional (RPC) |
| `5001` | TCP | Bidirectional (PUB/SUB) |
| `5002` | UDP | Bidirectional (Gossip) |
| `3000` | TCP | Inbound only (health checks, optional) |

### Rolling Deploys with Docker

Docker orchestrators typically restart containers, causing brief per-node downtime. For a multi-node cluster, deploy one server at a time — the remaining nodes keep the cluster alive:

```bash
# 1. Drain the node before redeploying (optional, preserves migratable actor state)
docker exec libeam npx libeam drain worker-2 --target=worker-1

# 2. Redeploy via your orchestrator (Coolify, etc.)
# The node restarts, rejoins the cluster via gossip

# 3. Repeat for next server
```

For clusters where actor state doesn't need migration, simply redeploy each server sequentially — gossip handles re-joining automatically.

### Cluster Introspection via Docker

The CLI commands remain useful for inspecting running clusters:

```bash
# Check cluster membership
docker exec libeam npx libeam nodes

# List actors across the cluster
docker exec libeam npx libeam actors

# Drain a node before maintenance
docker exec libeam npx libeam drain worker-1 --target=worker-2
```

## Architecture

```
Node Instance:
├── DistributedCluster (membership via UDP gossip)
│   └── GossipProtocol → GossipUDP
├── Transport (ZeroMQ or InMemory)
│   ├── ROUTER socket (RPC requests)
│   ├── PUB socket (registry broadcasts)
│   ├── SUB socket (registry subscriptions)
│   └── DEALER pool (outgoing RPC)
├── RegistrySync (actor name → nodeId mapping)
│   └── VectorClock (conflict resolution)
└── ActorSystem
    ├── Supervisor (crash handling)
    └── PlacementEngine (actor placement)
```

## Testing

```bash
# Run all tests
pnpm test

# Watch mode
pnpm test:watch

# Type check
pnpm typecheck
```

## Performance

GenStage pipeline benchmarks measured on a single node (Apple Silicon, Node.js). Run with `pnpm bench`.

### Baseline: Producer → Consumer

| Scenario | Throughput | Mean Latency |
|----------|-----------|-------------|
| 10k events, maxDemand=100 | ~675 ops/s | ~1.5 ms |
| 10k events, maxDemand=1000 | ~723 ops/s | ~1.4 ms |

Larger `maxDemand` reduces demand round-trips, though the gap narrows with synchronous demand optimization.

### Fan-out: DemandDispatcher

| Consumers | Throughput | Mean Latency |
|-----------|-----------|-------------|
| 1 | ~687 ops/s | ~1.5 ms |
| 2 | ~695 ops/s | ~1.4 ms |
| 4 | ~681 ops/s | ~1.5 ms |
| 8 | ~739 ops/s | ~1.4 ms |

DemandDispatcher scales well — adding consumers has minimal overhead since events are distributed (not duplicated).

### Fan-out: BroadcastDispatcher

| Consumers | Throughput | Mean Latency | Events/consumer |
|-----------|-----------|-------------|-----------------|
| 1 | ~826 ops/s | ~1.2 ms | 2,000 |
| 2 | ~662 ops/s | ~1.5 ms | 2,000 |
| 4 | ~645 ops/s | ~1.6 ms | 2,000 |
| 8 | ~627 ops/s | ~1.6 ms | 2,000 |

Broadcast sends all events to every consumer, so cost scales linearly with consumer count.

### Routing: PartitionDispatcher

| Partitions | Throughput | Mean Latency |
|------------|-----------|-------------|
| 2 | ~510 ops/s | ~2.0 ms |
| 4 | ~532 ops/s | ~1.9 ms |
| 8 | ~516 ops/s | ~1.9 ms |

### Pipeline Depth

| Hops | Throughput | Mean Latency |
|------|-----------|-------------|
| P → C (1 hop) | ~680 ops/s | ~1.5 ms |
| P → PC → C (2 hops) | ~621 ops/s | ~1.6 ms |
| P → PC → PC → C (3 hops) | ~601 ops/s | ~1.7 ms |

Each ProducerConsumer hop adds ~0.1 ms of latency for 10k events.

### ConsumerSupervisor

| maxDemand | Throughput | Mean Latency | Events |
|-----------|-----------|-------------|--------|
| 5 | ~13 ops/s | ~75 ms | 200 |
| 10 | ~30 ops/s | ~33 ms | 200 |
| 20 | ~66 ops/s | ~15 ms | 200 |

Higher `maxDemand` allows more concurrent workers, reducing wall-clock time.

### Dispatcher Comparison (4 consumers, same workload)

| Dispatcher | Throughput | Mean Latency |
|-----------|-----------|-------------|
| DemandDispatcher | ~675 ops/s | ~1.5 ms |
| BroadcastDispatcher | ~654 ops/s | ~1.5 ms |
| PartitionDispatcher | ~515 ops/s | ~1.9 ms |

### Call Performance

Actor `call` (request/response) benchmarks. The sync fast path bypasses Promise + setTimeout + setImmediate when the target actor is idle, reducing per-call overhead from ~14µs to ~3µs.

| Scenario | Throughput | Per-call Latency |
|----------|-----------|-----------------|
| 10k sequential calls | ~312 ops/s | ~3.2 µs |
| 10k sequential echo | ~314 ops/s | ~3.2 µs |
| 10k parallel → 1 actor | ~253 ops/s | ~4.0 µs |
| 10k parallel → 4 actors | ~289 ops/s | ~3.5 µs |
| 10k parallel → 16 actors | ~289 ops/s | ~3.5 µs |
| 5k contention, 10 in-flight | ~583 ops/s | ~1.7 ms |
| 5k contention, 100 in-flight | ~599 ops/s | ~1.7 ms |
| 10k chain: A → B (1 hop) | ~153 ops/s | ~6.6 µs |
| 10k chain: A → B → C (2 hops) | ~99 ops/s | ~10.1 µs |
| 10k mixed 50/50 call/cast | ~27 ops/s | ~3.7 µs/op |

Sequential and parallel calls hit the sync fast path (~3µs). Contention scenarios fall back to the mailbox path when messages queue up. Each call-chain hop adds ~3.5µs.

## License

ISC
