# Events

Long Tail publishes structured events when workflows reach milestones. The default adapter uses Socket.IO, which is included in the Express server configuration and works out of the box with no additional infrastructure. The event system is pluggable: register additional adapters at startup to fan out events to NATS, SNS, webhooks, or any other pub/sub system alongside (or instead of) the default Socket.IO transport.

## Configuration via start()

The simplest way to enable event publishing is through the `start()` config:

```typescript
import { start } from '@hotmeshio/long-tail';

// Built-in NATS adapter
await start({
  database: { connectionString: process.env.DATABASE_URL },
  workers: [ ... ],
  events: { nats: { url: 'nats://localhost:4222' } },
});

// Custom adapters (multiple supported)
await start({
  database: { connectionString: process.env.DATABASE_URL },
  workers: [ ... ],
  events: { adapters: [new SnsEventAdapter(topicArn), new WebhookEventAdapter(url)] },
});
```

`start()` handles adapter connection and graceful disconnection on shutdown automatically.

## Dashboard Transport Selection

The dashboard auto-detects its event transport via `GET /api/settings`. By default, Socket.IO is reported — it works in-process with no additional infrastructure.

For multi-container deployments where the API and workers run as separate processes, set `EVENT_TRANSPORT=nats` to tell the dashboard to connect via NATS WebSocket instead of Socket.IO:

```yaml
# docker-compose.yml
services:
  app:
    environment:
      - EVENT_TRANSPORT=nats
      - NATS_URL=nats://nats:4222
      - NATS_WS_URL=ws://localhost:9222
      - NATS_TOKEN=your-token
```

Both adapters still publish events regardless of `EVENT_TRANSPORT` — the setting only controls what the dashboard listens on. This means server-side event consumers (callbacks, NATS subscribers) work independently of the dashboard transport.

## The event envelope

An event is a tiny **universal envelope** plus a topic-specific **payload** (`data`). Only three fields are universal; everything else is a per-family extension, added by the family that needs it.

### Universal envelope

```typescript
interface LTEventBase {
  id?: string;        // idempotency key — minted by eventRegistry.publish() if omitted
  type: string;       // the subject (system: `system.workflow.{id}.failed`; custom: `app.*`)
  timestamp: string;  // ISO 8601 — stamped by the publisher; ensured by eventRegistry
  source?: string;    // producer tag (optional)
  data?: Record<string, any>; // the payload — topic-specific shape
}
```

`type` and `timestamp` are the only required fields; `id` is minted if the publisher doesn't supply one. A **custom** event (e.g. `app.image.resized` from an MCP tool) is just this base plus `data` — **no workflow fields are injected.**

### Per-family extensions ("the system injects fields by type")

System families declare the fields they populate. Which fields exist is a function of the subject family, not a universal requirement:

| Family | Subject | Adds |
|---|---|---|
| workflow | `system.workflow.{id}.{started\|completed\|failed}` | `workflowId`, `workflowName`, `taskQueue`, `status` |
| task | `system.task.{taskId}.{created\|started\|completed\|escalated\|failed}` | + `taskId` |
| escalation | `system.escalation.{id}.{created\|resolved\|claimed\|released}` | `escalationId` (+ workflow context), `status` |
| activity | `system.activity.{wfId}.{activity}.{started\|completed\|failed}` | `activityName` (+ workflow context) |
| milestone | `system.milestone.{wfId}` | `milestones` (+ workflow context) |
| agent | `system.agent.{name}.{started\|completed\|failed\|…}` | `workflowId`=agentId, `workflowName`=agentName |
| knowledge / file | `system.knowledge.{domain}.*` / `system.file.*` | none — payload only |

`status` is a **per-family vocabulary**, not one global enum: workflow `started|in_progress|running|completed|failed`; task `created|in_progress|completed|needs_intervention|failed`; escalation `pending|claimed|released|resolved|cancelled`.

Consumers (adapters, the agent trigger registry, input_mapping) type against the broad **`LTEvent`** — the base plus every extension as **optional** — since they can't assume any extension is present. Producers (`lib/events/publish.ts`) construct the precise `LT*Event` family types, all assignable to `LTEvent`.

### Who mints it

`lib/events/publish.ts` holds one typed constructor per family (the central, principled mint). `eventRegistry.publish()` is the dispatch: it guarantees the universals (mints `id`, stamps `timestamp` if absent) and fans out to every adapter. For custom events it leaves the envelope minimal.

A milestone is a name/value pair:

```typescript
interface LTMilestone {
  name: string;
  value: string | number | boolean | Record<string, any>;
}
```

## LTEventAdapter

Adapters implement three methods:

```typescript
interface LTEventAdapter {
  connect(): Promise<void>;
  publish(event: LTEvent): Promise<void>;
  disconnect(): Promise<void>;
}
```

`connect()` is called once during startup. `publish()` is called for each event. `disconnect()` is called during graceful shutdown.

## Event Registry

`eventRegistry` is a singleton that manages adapters and dispatches events. The lifecycle is register, connect, publish, disconnect.

```typescript
import { eventRegistry, NatsEventAdapter } from '@hotmeshio/long-tail';

// 1. Register adapters (before connect)
eventRegistry.register(new NatsEventAdapter());

// 2. Connect all adapters
await eventRegistry.connect();

// ... application runs, events are published automatically ...

// 3. Disconnect during shutdown
await eventRegistry.disconnect();
```

### Behavior

- **Multiple adapters.** Call `register()` more than once to fan out events to several systems simultaneously.
- **Best-effort delivery.** `publish()` uses `Promise.allSettled`. A failure in one adapter does not affect the others and does not throw. Errors are logged.
- **Idempotent connect.** Calling `connect()` a second time is a no-op.
- **`hasAdapters`** returns `true` if at least one adapter is registered. The publishing functions check this flag and short-circuit when no adapters exist.
- **`clear()`** removes all adapters and resets connection state. Intended for test teardown.

## NATS Adapter

The built-in `NatsEventAdapter` publishes events as JSON-encoded strings to NATS subjects.

### Constructor Options

| Option          | Default                                      | Description                      |
|-----------------|----------------------------------------------|----------------------------------|
| `url`           | `process.env.NATS_URL` or `nats://localhost:4222` | NATS server URL             |
| `subjectPrefix` | `lt.events`                                  | Prefix for NATS subject names    |

### Subject Format

Events are published to `{subjectPrefix}.{event.type}`. A milestone event with the default prefix lands on:

```
lt.events.milestone
```

### Usage

```typescript
import { eventRegistry, NatsEventAdapter } from '@hotmeshio/long-tail';

// Default: connects to nats://localhost:4222, publishes to lt.events.*
eventRegistry.register(new NatsEventAdapter());

// Custom server and prefix
eventRegistry.register(new NatsEventAdapter({
  url: 'nats://nats.prod.internal:4222',
  subjectPrefix: 'myapp.events',
}));

await eventRegistry.connect();
```

On disconnect, the adapter calls `drain()` on the NATS connection, ensuring in-flight publishes complete before closing.

## Callback Adapter

`CallbackEventAdapter` delivers events via registered callbacks. Use it when Long Tail runs as an embedded package and you want event subscriptions without socket.io or NATS.

```typescript
import { eventRegistry, CallbackEventAdapter } from '@hotmeshio/long-tail';

const adapter = new CallbackEventAdapter();
eventRegistry.register(adapter);
await adapter.connect();

// Exact match
const unsub = adapter.on('escalation.claimed', (event) => {
  console.log('claimed:', event.escalationId);
});

// Category wildcard — matches all task.* events
adapter.on('task.*', (event) => { ... });

// Global wildcard
adapter.on('*', (event) => { ... });

// Unsubscribe
unsub();
```

The adapter is registered automatically by `start()`. If you use `createClient()`, the SDK exposes it as `lt.events.on()` — see the [SDK guide](sdk.md) for details.

## In-Memory Adapter

`InMemoryEventAdapter` captures events in an array. It exists for testing.

```typescript
import { eventRegistry, InMemoryEventAdapter } from '@hotmeshio/long-tail';

const adapter = new InMemoryEventAdapter();
eventRegistry.register(adapter);
await eventRegistry.connect();

// ... run a workflow ...

// Inspect captured events
expect(adapter.events).toContainEqual(
  expect.objectContaining({
    type: 'milestone',
    workflowName: 'reviewContent',
  })
);

// Reset between tests
adapter.clear();
```

After each test, call `eventRegistry.clear()` to remove the adapter and reset connection state.

## Custom Adapters

For production, route events to your own pub/sub system.

Implement `LTEventAdapter` to route events to any pub/sub system. The registry handles error isolation, so adapters can throw freely -- failures are caught, logged, and do not propagate.

### Example: SNS

```typescript
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns';
import type { LTEventAdapter, LTEvent } from '@hotmeshio/long-tail';

class SnsEventAdapter implements LTEventAdapter {
  private client: SNSClient;
  private topicArn: string;

  constructor(topicArn: string, region = 'us-east-1') {
    this.topicArn = topicArn;
    this.client = new SNSClient({ region });
  }

  async connect(): Promise<void> {
    // SNS client is ready on construction; nothing to do.
  }

  async publish(event: LTEvent): Promise<void> {
    await this.client.send(
      new PublishCommand({
        TopicArn: this.topicArn,
        Message: JSON.stringify(event),
        MessageAttributes: {
          eventType: {
            DataType: 'String',
            StringValue: event.type,
          },
        },
      }),
    );
  }

  async disconnect(): Promise<void> {
    this.client.destroy();
  }
}
```

### Example: Webhook

```typescript
import type { LTEventAdapter, LTEvent } from '@hotmeshio/long-tail';

class WebhookEventAdapter implements LTEventAdapter {
  constructor(private url: string) {}

  async connect(): Promise<void> {}

  async publish(event: LTEvent): Promise<void> {
    await fetch(this.url, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(event),
    });
  }

  async disconnect(): Promise<void> {}
}
```

## When Events Fire

Regardless of which adapter receives them, all events originate from three places in the system.

Milestone events are published from three call sites, distinguished by the `source` field:

### 1. Workflow interceptor (`source: 'interceptor'`)

When a workflow returns `{ type: 'return', milestones, data }`, the interceptor's `handleCompletion` function publishes a milestone event before signaling the parent orchestrator. If the workflow is a re-run following an escalation, the interceptor appends two additional milestones -- `escalated: true` and `resolved_by_human: true` -- before publishing.

### 2. Orchestrator activity (`source: 'orchestrator'`)

When the orchestrator completes a task via `ltCompleteTask`, it publishes a milestone event with the task's milestones. This covers orchestrated workflows where the parent orchestrator is responsible for recording task completion.

### 3. Activity interceptor (`source: 'activity'`)

The activity interceptor inspects every activity result. If the result contains a `milestones` array, it publishes a milestone event with `activityName` set. This allows individual activities -- not just entire workflows -- to report progress.

### Delivery Semantics

All three call sites use `publishMilestoneEvent()`, which is fire-and-forget. It returns immediately, never throws, and swallows errors. Events are a non-durable side effect: they are not replayed on workflow recovery. If the process crashes between task completion and event publication, the event is lost. Design downstream consumers accordingly.

## Multi-Container Idempotency

In a multi-container deployment, every container receives every published event. Agent subscriptions that react to events use **deterministic workflow IDs** to ensure exactly-once execution across the fleet. See [Agents: Distributed Safety](agents.md#distributed-safety) for the full mechanism.

Cron schedules use a different approach — HotMesh's `Virtual.cron()` uses internal JetStream consumer groups to elect a single container per tick.

## Related

- [Topic Catalog](topics.md) — persistent registry of known topics with schemas and discovery
- [Agents: Subscriptions](agents.md#subscriptions) — wiring topics to reactive workflows
- [Agents: Distributed Safety](agents.md#distributed-safety) — how deterministic IDs prevent duplicate reactions
- [Topics HTTP API](api/http/topics.md) — REST endpoints for the catalog
- [Topics SDK](api/sdk/topics.md) — `TopicService` programmatic access
