/** * observer-queue/flushDriver.ts — RFC-001 Block 4: armed-once microtask batcher. * * Pattern: Kernel-style bottom-half. Producers only set a flag ("work * pending") and return; the actual work runs at the next * scheduling checkpoint (a microtask), drains under a time * budget, and re-arms itself if backlog remains. Same shape as * the detach module's `microtaskBatchDriver` — accumulate during * the current sync slice, drain at the boundary. * Role: The scheduler of the deferred-observer pipeline. Owns WHEN * delivery happens; knows nothing about envelopes or listeners * (the dispatcher, Block 5, injects `depth`/`processNext`). * Pure module — zero imports, zero engine knowledge. * * Scheduling semantics (normative, RFC-001 §5 + amendment A1): * - `arm()` is idempotent: at most ONE pending flush exists (armed flag). * N captures between checkpoints ⇒ exactly 1 flush. * - A flush drains a SNAPSHOT: at most `depth()`-at-flush-start items. * Events enqueued BY listeners during the flush exceed the snapshot and * land at the NEXT checkpoint — listener-driven cascades cannot starve * the event loop. * - `flushBudgetMs` (default 2; `Infinity` = full snapshot drain): the * flush stops once the budget is exhausted, counts `budgetExhausted`, * and re-arms. At least ONE item is processed per flush regardless of * budget — guaranteed progress under any clock. * - If backlog remains after the flush (budget cut OR listener enqueues), * the driver re-arms for the next checkpoint. * * Why stage boundaries make this safe: the engine `await`s every stage, so * the microtask queue runs at EVERY stage boundary — flushes are at most * "one beat behind" the producing stage. See * `docs/guides/execution-model.md` ("Stage boundaries are scheduling * points") and the FAQ in `docs/design/rfc-001-deferred-observers.md`. * * Testability: `now` (clock) and `schedule` (checkpoint primitive) are * injectable — tests pump flushes deterministically with a fake clock and * a captured-callback scheduler; production uses `performance.now` and * `queueMicrotask`. */ /** Result of one flush (also delivered to `onFlushEnd`). */ export interface FlushOutcome { /** Items processed in this flush. */ readonly processed: number; /** True when the time budget cut the flush before the snapshot drained. */ readonly budgetExhausted: boolean; /** True when backlog remained and the driver re-armed itself. */ readonly rearmed: boolean; } /** Result of a synchronous {@link FlushDriver.flushSync} drain. */ export interface FlushSyncResult { /** Items processed across all rounds. */ readonly drained: number; /** Items still queued when `maxRounds` stopped a runaway cascade. */ readonly remaining: number; } export interface FlushDriverOptions { /** Current backlog of the queue this driver drains. */ readonly depth: () => number; /** Process exactly ONE queued item. Precondition: `depth() > 0`. */ readonly processNext: () => void; /** * Per-flush time budget in ms. Default 2. `Infinity` drains the full * snapshot every checkpoint. Must be > 0. */ readonly flushBudgetMs?: number; /** Clock — default `performance.now` (falls back to `Date.now`). */ readonly now?: () => number; /** Checkpoint primitive — default `queueMicrotask`. */ readonly schedule?: (cb: () => void) => void; /** Fires before the first item of every flush (incl. `flushSync`). */ readonly onFlushStart?: () => void; /** Fires after every flush with its outcome (incl. `flushSync`). */ readonly onFlushEnd?: (outcome: FlushOutcome) => void; } export interface FlushDriverStats { /** Completed flushes (zero-work wakeups are not counted). */ readonly flushes: number; /** Flushes cut short by `flushBudgetMs` (A1 — backlog visibility). */ readonly budgetExhausted: number; /** Duration of the most recent flush, ms. */ readonly lastFlushMs: number; /** p95 over the last {@link FLUSH_SAMPLE_WINDOW} flush durations, ms. */ readonly p95FlushMs: number; /** True while a flush is scheduled but not yet run. */ readonly armed: boolean; } /** Rolling sample window for the p95 flush-duration stat (A4). */ export declare const FLUSH_SAMPLE_WINDOW = 128; export declare class FlushDriver { private readonly depth; private readonly processNext; private readonly flushBudgetMs; private readonly now; private readonly schedule; private readonly onFlushStart?; private readonly onFlushEnd?; private armed; private flushes; private budgetExhaustedCount; private lastFlushMs; private readonly samples; private sampleWriteIdx; constructor(opts: FlushDriverOptions); /** * Request a flush at the next checkpoint. Idempotent — while one flush * is pending, further arms are free no-ops (the armed-once invariant). */ arm(): void; /** * Synchronous full drain — the terminal-flush primitive (end of run / * shutdown). Repeats snapshot rounds until the queue is empty so * listener-enqueued cascades drain too, capped at `maxRounds` so a * listener that enqueues forever cannot hang the process (`remaining` * reports what the cap left behind). */ flushSync(opts?: { maxRounds?: number; }): FlushSyncResult; getStats(): FlushDriverStats; /** The microtask body — see the module-header semantics. */ private flush; private recordFlush; private p95FlushMs; }