/** * observer-queue/deferredDispatcher.ts — RFC-001 Block 5: deferred delivery façade. * * Pattern: capture → enqueue → (microtask) flush → invoke, with per-listener * error isolation. Composes the whole pure pipeline: MergedQueue * (Block 3, which captures via Block 1) + FlushDriver (Block 4) + * a listener registry with timing/inflight accounting. * Role: The object the engine wiring (Block 6) will hold. Producers call * `capture()` (cheap, never throws, never blocks); listeners * receive envelopes at the next checkpoint, "one beat behind". * Pure module — zero engine imports. * * Delivery semantics (normative, RFC-001 §5 + amendments A2/A4): * - Per-listener FIFO: every listener sees envelopes in seq order * (invocation order; an async listener's COMPLETION order is its own * concern) — EXCEPT under `'block'` overflow, where a refused enqueue * is delivered inline and overtakes the queued backlog. `seq` always * records true arrival order, so order-sensitive consumers re-sort; * see the `'block'` caveat below. * - Error isolation: a throwing listener (sync) or rejecting listener * (async) never affects siblings or the producer. Both failure modes * route to the injected `onError`; a throwing `onError` is itself * swallowed. * - The flush NEVER awaits a listener. Async continuations are tracked in * an inflight set; `drain({ timeoutMs })` settles them * (`Promise.allSettled` + deadline, shaped like `flushAllDetached`). * - `'block'` overflow: a refused enqueue is delivered synchronously * INLINE from `capture()` — re-introducing blocking delivery by the * consumer's explicit choice. Ordering caveat (documented + tested): an * inline event overtakes the queued backlog — `'block'` trades global * ordering for zero loss and bounded memory. `seq` still tells the * true arrival order. * - Listener registry is idempotent by id (same id replaces, different * ids coexist) — mirrors the repo-wide recorder ID contract. Stats * accumulate per id across replacement; `removeListener` keeps the * id's accumulated stats for post-run reports. * - Events captured BEFORE any listener attaches stay queued — a listener * attached before the next checkpoint still receives the backlog. * * Per-listener time accounting (amendment A2 — "name the hog"): cumulative * `totalMs` and per-checkpoint `lastFlushMs` of SYNC time per listener id — * the time that actually blocks the flush. An async listener's continuation * time is intentionally not attributed (it does not block delivery). */ import { type CaptureEnvelope, type CaptureHooks, type CapturePolicy } from '../capture/envelope.js'; import { type FlushSyncResult } from './flushDriver.js'; import { type EnqueueInput } from './mergedQueue.js'; import { type OverflowPolicy } from './ring.js'; /** * One deferred observer. May return a Promise — the dispatcher tracks it in * the inflight set but NEVER awaits it during a flush. */ export type DeferredListener = (envelope: CaptureEnvelope) => void | Promise; export interface DispatchErrorContext { readonly listenerId: string; readonly envelope: CaptureEnvelope; /** `'sync'` = listener threw; `'async'` = returned promise rejected. */ readonly phase: 'sync' | 'async'; } /** Injected error sink — the wiring layer routes these (Block 6). */ export type DispatchErrorHandler = (error: unknown, context: DispatchErrorContext) => void; export interface DeferredDispatcherOptions { /** Queue bound — default 10 000 (see `MergedQueue`). */ readonly maxQueue?: number; /** Overflow policy — default `'drop-oldest'`. */ readonly overflow?: OverflowPolicy; /** `'sample'` overflow only — admit 1 in this many saturated arrivals. */ readonly sampleEvery?: number; /** Default capture policy — default `'summary'`. */ readonly capturePolicy?: CapturePolicy; /** Per-flush time budget, ms (A1) — default 2; `Infinity` = full drain. */ readonly flushBudgetMs?: number; /** Listener-failure sink. No default — without it, failures are silent. */ readonly onError?: DispatchErrorHandler; /** Capture seams (dev-warn, capturedAt clock) — see `CaptureHooks`. */ readonly hooks?: CaptureHooks; /** Timing clock for budget + per-listener accounting. Injectable. */ readonly now?: () => number; /** Checkpoint primitive — default `queueMicrotask`. Injectable. */ readonly schedule?: (cb: () => void) => void; } /** Per-listener accounting (A2/A4). */ export interface ListenerStats { /** Envelopes delivered (invocations, including ones that threw). */ readonly events: number; /** Cumulative sync delivery time, ms. */ readonly totalMs: number; /** Sync delivery time since the last flush started, ms. */ readonly lastFlushMs: number; } /** The Block 9 observability surface (amendment A4) — pure getter. */ export interface DispatcherStats { /** Current backlog. */ readonly depth: number; /** Events LOST (overflow) — never silent; also visible as seq gaps. */ readonly drops: number; /** Completed checkpoint flushes. */ readonly flushes: number; /** Flushes cut short by `flushBudgetMs` (A1). */ readonly budgetExhausted: number; /** p95 flush duration, ms (rolling window). */ readonly p95FlushMs: number; /** `'block'`-policy refusals delivered synchronously inline. */ readonly inlineDeliveries: number; /** Async listener continuations not yet settled. */ readonly inflight: number; /** Per-listener time accounting — "name the hog" (A2). */ readonly perListener: Readonly>; } /** Result of {@link DeferredDispatcher.drain} — `flushAllDetached` shape. */ export interface DrainResult { /** Async continuations seen settling fulfilled. Best-effort count — a * continuation that settles between checks is drained but may not be * counted (same semantics as `flushAllDetached`). */ readonly done: number; /** Continuations whose listener promise rejected (routed to onError). */ readonly failed: number; /** Still in flight (or queued) when the deadline expired. `0` = drained. */ readonly pending: number; } export declare class DeferredDispatcher { private readonly queue; private readonly driver; private readonly listeners; private readonly listenerStats; /** Tracked async continuations — resolve `true` (ok) / `false` (failed). */ private readonly inflight; private readonly onError?; private readonly now; private inlineDeliveries; constructor(opts?: DeferredDispatcherOptions); /** Idempotent by id — same id replaces (stats continue), ids coexist. */ addListener(id: string, listener: DeferredListener): void; /** Stop delivering to `id`. Accumulated stats are kept for reports. */ removeListener(id: string): void; /** * Producer entry point: capture the event (seq-stamped, payload per * policy) and stage it for the next checkpoint. Cheap; NEVER throws; * never blocks — except under `'block'` overflow, where a refused * enqueue is delivered synchronously inline (explicit consumer choice). */ capture(input: EnqueueInput, policy?: CapturePolicy): void; /** * Terminal flush — synchronously deliver everything queued (end of run / * shutdown). Async listener continuations are NOT awaited; follow with * `drain()` for that. */ flushNow(opts?: { maxRounds?: number; }): FlushSyncResult; /** * Flush the backlog, then settle all inflight async continuations — * `Promise.allSettled` under a deadline, shaped like `flushAllDetached`. * Loops while continuations spawn new captures, until quiescent or the * deadline expires. */ drain(opts?: { timeoutMs?: number; }): Promise; /** A4 — the stats object Block 9 consumes. Pure getter, fresh snapshot. */ getStats(): DispatcherStats; private deliverNext; /** Invoke every listener with full error isolation + time accounting. */ private deliver; /** Track an async continuation; route its rejection; never reject. */ private track; /** The error sink must never become an error source. */ private safeOnError; }