/** * runner/DeferredObserverTier.ts — RFC-001 Blocks 6–9: the engine wiring of * the deferred-observer pipeline. * * Pattern: Thin adapter between the executor's three observer channels and * the PURE `observer-queue` module. The pure module stays * engine-free (it imports nothing from the engine — the engine * imports IT); this tier owns every engine-flavored concern: * - the `isDevMode()`-gated, deduplicated `CaptureHooks.warn` * binding (the dev-warn seam, RFC-001 §"Resolution"); * - routing `DeferredDispatcher.onError` into the existing * recorder error channel (`onError` on sibling observers); * - the capture TAPS — synthetic recorders placed on the * existing inline dispatch lists so the three dispatch sites * (`ScopeFacade._invokeHook`, `ScopeFacade.emitEvent`, * `FlowRecorderDispatcher`) need NO per-site tier logic: a * tap's hook body IS `dispatcher.capture(...)`, and because * the tap sits in the same loop as inline recorders it sees * exactly the post-redaction event object inline observers * see — capture can never observe a pre-redaction value; * - the terminal flush (Block 8) with honest stranding * accounting, and the Block 9 stats surface. * * Role: One instance per `FlowChartExecutor`, created LAZILY on the * first `delivery: 'deferred'` attach (zero allocation when nobody * opts in — mirrors the emit fast-path precedent). Holds the ONE * `DeferredDispatcher` (one merged queue, total order across * channels) plus the registry of deferred recorders. * * Delivery: a deferred recorder's hooks are invoked through the SAME * `invokeRecorderHook` helper the inline tier uses (RFC-001 §9 mitigation) — * one beat behind, with `envelope.payload` materialized per the capture * policy (`'summary'` default — bounded, reference-free; `'clone'` — full * structural copy, event-shape compatible with inline; `'ref'` — the live * event object, dev-warned). * * Channel filter: a registration remembers which channels would have * reached the recorder inline (scope-list recorders see `scope` + `emit` * envelopes; flow-list recorders see `flow` envelopes) and skips the rest — * same reach as the inline tier, one beat behind. */ import type { FlowRecorder } from '../engine/narrative/types.js'; import { type CaptureChannel, type CapturePolicy, type DispatcherStats, type DrainResult, type OverflowPolicy } from '../observer-queue/index.js'; import type { ScopeRecorder } from '../scope/types.js'; /** Delivery tier for an attached observer (RFC-001). */ export type ObserverDelivery = 'inline' | 'deferred'; /** * Options bag accepted by every `attach*Recorder` call. * * `delivery: 'deferred'` opts the recorder into the bounded capture queue * ("one beat behind"); absent / `'inline'` keeps the historical synchronous * call — byte-identical to the pre-RFC path. * * The remaining fields configure the executor's ONE shared dispatcher and * are applied when the FIRST deferred attach creates it; later attaches * passing different values get a dev-mode warning and keep the original * configuration (one queue per executor — per-recorder queues would break * the total cross-channel order). */ export interface AttachRecorderOptions { /** `'deferred'` = capture → queue → next-checkpoint delivery. Default `'inline'`. */ readonly delivery?: ObserverDelivery; /** * Payload materialization at capture time. Default `'clone'` — your * recorder's hooks receive the SAME event shape as inline delivery * (`structuredClone`d; `e.key`/`e.value`/`e.runtimeStageId` all work * unchanged), so `{ delivery: 'deferred' }` is a drop-in port. * `'summary'` — a bounded type/size/preview digest (`PayloadSummary`) * instead of the event shape: cheaper capture for telemetry-only * consumers that don't read domain fields. `'ref'` — pass-through by * reference; the caller asserts immutability (dev-mode warns). */ readonly capture?: CapturePolicy; /** Queue bound — default 10 000. */ readonly maxQueue?: number; /** Overflow policy at `maxQueue` — default `'drop-oldest'`. */ readonly overflow?: OverflowPolicy; /** `'sample'` overflow only — admit 1 in this many saturated arrivals. */ readonly sampleEvery?: number; /** Per-checkpoint flush budget, ms (A1) — default 2; `Infinity` = full drain. */ readonly flushBudgetMs?: number; } /** * The Block 9 observability surface — `snapshot.observerStats`. The A4 * dispatcher stats plus the terminal-flush stranding count from Block 8. * Present on `RuntimeSnapshot` only when a deferred observer was attached. */ export interface ObserverStats extends DispatcherStats { /** * Envelopes still queued when a terminal flush hit its runaway-cascade * round cap (Block 8). `0` in any sane run — a non-zero value means a * listener kept enqueueing work at end-of-run and delivery was cut off * (also dev-warned at the moment it happened). Never silent. */ readonly terminalStranded: number; } /** Result shape of `executor.drainObservers()` — see `DrainResult`. */ export type ObserverDrainResult = DrainResult; /** Well-known ids of the synthetic capture taps (internal, documented for debugging). */ export declare const DEFERRED_SCOPE_TAP_ID = "__deferred-scope-tap__"; export declare const DEFERRED_FLOW_TAP_ID = "__deferred-flow-tap__"; export declare class DeferredObserverTier { private readonly dispatcher; private readonly registrations; private readonly appliedConfig; /** Dedup memory for the dev-warn seam (one warning per unique message). */ private readonly warnedMessages; private terminalStranded; constructor(options?: AttachRecorderOptions); /** * Register a recorder for deferred delivery on the scope-list channels * (`scope` + `emit`) and/or the flow channel. Idempotent by id — same id * replaces the recorder object; channel lists MERGE across calls (same as * the inline tier, where `attachScopeRecorder(x)` + `attachFlowRecorder(x)` * lands `x` on both lists). Later attaches passing dispatcher-level * options that differ from the first attach's configuration keep the * original config and dev-warn. */ register(recorder: ScopeRecorder | FlowRecorder, lists: { scope?: boolean; flow?: boolean; }, options?: AttachRecorderOptions): void; /** * Remove the given channel lists from a registration (mirrors the inline * tier, where `detachScopeRecorder` / `detachFlowRecorder` each clear one * list). When no channels remain, the listener is fully removed. */ removeFromLists(id: string, lists: { scope?: boolean; flow?: boolean; }): void; /** True when `id` is registered for deferred delivery (any channel). */ has(id: string): boolean; /** Deferred recorders whose reach includes the scope list (scope+emit). */ scopeListRecorders(): ScopeRecorder[]; /** Deferred recorders whose reach includes the flow channel. */ flowListRecorders(): FlowRecorder[]; /** Reset deferred recorders before a fresh run (same contract as inline). */ clearRecorders(): void; /** * Build the synthetic scope-channel tap — a `ScopeRecorder` placed on the * normal scope-recorder list whose hooks capture into the queue. Built * fresh per traverser so it reflects the current registrations. Only * methods some deferred recorder actually implements are present (no * wasted captures). Returns `undefined` when nothing is registered for * the scope list. * * Redaction ordering: the tap is invoked from the SAME loops as inline * recorders (`_invokeHook` / `emitEvent`), which receive events AFTER the * redaction decision — so a captured payload can never contain a * pre-redaction value the inline tier would not have seen. */ buildScopeTap(): ScopeRecorder | undefined; /** * Build the synthetic flow-channel tap — a `FlowRecorder` appended to the * flow-recorders list handed to the traverser. Same contract as * {@link buildScopeTap}. */ buildFlowTap(): FlowRecorder | undefined; /** * Direct capture for executor-synthesized events that bypass the dispatch * sites (e.g. the synthetic `onResume` the executor fires on resume). */ capture(channel: CaptureChannel, method: string, runtimeStageId: string, runId: string, payload: unknown): void; /** * Synchronously deliver everything still queued — called by the executor * at the OUTERMOST run boundary (resolve, reject, pause), BEFORE `run()` * returns / rethrows / the checkpoint becomes available. Inspects * `flushSync`'s `remaining` (reviewer N1): `flushSync` already loops * snapshot rounds up to its runaway-cascade cap, so a non-zero remainder * means a pathological self-enqueueing listener — counted in * `observerStats.terminalStranded` and dev-warned, never silent. */ terminalFlush(): void; /** * Flush the backlog, then settle async listener continuations under a * deadline — the serverless / graceful-shutdown pattern (RFC-001 §11). */ drain(opts?: { timeoutMs?: number; }): Promise; /** The `snapshot.observerStats` payload — A4 stats + Block 8 stranding. */ getStats(): ObserverStats; private byChannel; /** Normal property lookup — invocation parity with the inline tier. */ private anyImplements; /** Scope/emit events carry their own ids (`runtimeStageId` + `pipelineId`). */ private captureScopeEvent; /** Flow events carry ids on `traversalContext` (absent on a few events). */ private captureFlowEvent; /** * Route a deferred listener failure into the existing recorder error * channel: every OTHER registered observer (deferred siblings first, in * registration order) receives a scope-shaped `onError` event — the same * contract the inline tier honors when a recorder throws mid-dispatch. * The error sink must never become an error source: sink throws are * swallowed (isolation is absolute). */ private routeListenerError; /** Dispatcher-level options are first-attach-wins; differing later values dev-warn. */ private warnOnConfigConflict; /** `isDevMode()`-gated, deduplicated warner — the bound warn seam. */ private devWarnDeduped; }