{"version":3,"file":"channel-registry.cjs","names":["#rootBus","#entries","#thread","StreamStore"],"sources":["../../src/stream/channel-registry.ts"],"sourcesContent":["/**\n * Framework-agnostic ref-counted subscription cache.\n *\n * # What this module is\n *\n * Every framework binding (React, Vue, Svelte, Angular) owns one\n * {@link ChannelRegistry} per {@link StreamController}. The registry\n * is the single layer that:\n *\n *   1. Deduplicates server-side subscriptions across components — N\n *      hooks reading the same projection share one\n *      `thread.subscribe(...)` call and one {@link StreamStore}.\n *   2. Lazily opens / tears down subscriptions in step with mounting\n *      and unmounting consumers (ref counting on `spec.key`).\n *   3. Survives thread swaps — `controller.hydrate(newThreadId)`\n *      rebinds every live entry against the new thread without\n *      changing store identity, so React's\n *      `useSyncExternalStore` (and equivalents in other frameworks)\n *      keep working.\n *\n * # Why ref counting matters\n *\n * Most projections back at least one server subscription. Without\n * deduplication, every additional consumer of e.g. `useMessages(sub)`\n * would open its own SSE/WebSocket subscription, paying the same\n * payload N times. The registry guarantees we only ever pay once per\n * `spec.key`, regardless of how many consumers attach.\n *\n * # Why store identity is preserved on rebind\n *\n * Framework reactivity primitives subscribe to a store *instance* and\n * memoise their last seen snapshot. If we minted a new store on every\n * thread swap, every bound component would silently lose its\n * subscription. Instead, the registry keeps the same {@link StreamStore}\n * but resets its value to `spec.initial` and re-runs `spec.open()` —\n * consumers observe a clean slate without re-subscribing.\n *\n * @see ProjectionSpec - The contract every projection implements.\n * @see StreamStore - The observable store handed to consumers.\n */\nimport { StreamStore } from \"./store.js\";\nimport type {\n  AcquiredProjection,\n  ProjectionRuntime,\n  ProjectionSpec,\n  RootEventBus,\n  ThreadStream,\n} from \"./types.js\";\n\n/**\n * Internal record kept for each unique `spec.key` actively held by at\n * least one consumer.\n *\n * We intentionally store `initial` and `open` separately from `spec`\n * so the registry never depends on the spec object's identity — two\n * specs sharing the same `key` but produced from different factory\n * calls (e.g. fresh objects on each render) still collapse onto the\n * same entry.\n */\ninterface Entry {\n  /** Stable identity used for deduplication. */\n  readonly key: string;\n  /** Observable store handed back to every consumer of this key. */\n  readonly store: StreamStore<unknown>;\n  /** Initial snapshot reapplied on dispose / thread rebind. */\n  readonly initial: unknown;\n  /** Factory that opens the underlying subscription against a thread. */\n  readonly open: ProjectionSpec<unknown>[\"open\"];\n  /** Live consumers of this entry. Drops to 0 → entry is torn down. */\n  refCount: number;\n  /**\n   * Active runtime returned by `open()`. Undefined while detached\n   * (no thread bound yet, or a rebind is in progress).\n   */\n  runtime: ProjectionRuntime | undefined;\n}\n\n/**\n * Ref-counted, thread-aware projection registry.\n *\n * Owns the `spec.key → (store, runtime)` mapping for one\n * {@link StreamController}. Lifecycle:\n *\n *   - `acquire(spec)`  → +1 ref, returns `{ store, release }`. The\n *      first acquire opens the projection's runtime; subsequent\n *      acquires for the same key share both the store and the\n *      runtime.\n *   - `release()`      → -1 ref. When the last consumer releases,\n *      the entry is removed and its runtime disposed.\n *   - `bind(thread)`   → swap or detach the underlying thread; every\n *      live entry's runtime is recreated against the new thread,\n *      keeping the same store identity.\n *   - `dispose()`      → tear everything down (idempotent). Safe to\n *      call multiple times.\n *\n * The registry is intentionally not generic over a state shape —\n * different consumers can hold projections producing different\n * snapshot types, so the registry keys everything as `unknown` and\n * lets {@link acquire} reapply the caller's `T` at the boundary.\n */\nexport class ChannelRegistry {\n  /** Currently bound thread, or `undefined` while detached. */\n  #thread: ThreadStream | undefined;\n\n  /** Read-only fan-out of the controller's root subscription. */\n  readonly #rootBus: RootEventBus;\n\n  /** All live entries, keyed by `spec.key`. */\n  readonly #entries = new Map<string, Entry>();\n\n  /**\n   * Construct a registry bound to the controller's root event bus.\n   *\n   * The bus is forwarded to every projection's `open()` so root-scoped\n   * projections can avoid opening a second server subscription when\n   * their channel set is already covered by the root pump.\n   *\n   * @param rootBus - Read-only fan-out of the root subscription.\n   */\n  constructor(rootBus: RootEventBus) {\n    this.#rootBus = rootBus;\n  }\n\n  /**\n   * Rebind every live entry to a new {@link ThreadStream} (or detach\n   * when `thread == null`).\n   *\n   * Each live entry has its current runtime disposed (best-effort)\n   * and its store reset to `entry.initial` so consumers see a clean\n   * slate during the swap. When `thread != null`, a fresh runtime is\n   * opened against the new thread.\n   *\n   * Critically the {@link StreamStore} *instance* is preserved across\n   * the rebind: framework subscribers (e.g. React's\n   * `useSyncExternalStore`) keep observing the same store reference,\n   * so their subscriptions survive the swap.\n   *\n   * No-op when called with the currently bound thread.\n   *\n   * @param thread - The thread stream to bind, or `undefined` to detach.\n   */\n  bind(thread: ThreadStream | undefined): void {\n    if (this.#thread === thread) return;\n    const previous = this.#thread;\n    this.#thread = thread;\n    for (const entry of this.#entries.values()) {\n      // Tear down any active runtime from the previous thread.\n      if (entry.runtime != null && previous != null) {\n        void tryDispose(entry.runtime);\n      }\n      entry.runtime = undefined;\n      entry.store.setValue(entry.initial);\n      if (thread != null) {\n        entry.runtime = entry.open({\n          thread,\n          store: entry.store,\n          rootBus: this.#rootBus,\n        });\n      }\n    }\n  }\n\n  /** Currently bound thread (may be `undefined` pre-mount). */\n  get thread(): ThreadStream | undefined {\n    return this.#thread;\n  }\n\n  /**\n   * Acquire a ref-counted projection.\n   *\n   * If no entry exists for `spec.key`, one is created (allocating a\n   * {@link StreamStore} seeded with `spec.initial`) and — when a\n   * thread is currently bound — its runtime is opened immediately.\n   * If an entry already exists, its ref count is incremented and the\n   * existing store is returned.\n   *\n   * The returned `release()` is idempotent: calling it more than once\n   * is a no-op. When the ref count drops to zero, the entry is removed\n   * and its runtime disposed (best-effort, never throws into callers).\n   *\n   * Safe to call from any framework lifecycle hook. Subsequent calls\n   * for the same `spec.key` always return the same `store` reference\n   * for the lifetime of the controller, so consumers can rely on store\n   * identity.\n   *\n   * @typeParam T - Snapshot type produced by this projection.\n   * @param spec - Projection contract; the registry keys off `spec.key`.\n   * @returns A `{ store, release }` handle.\n   */\n  acquire<T>(spec: ProjectionSpec<T>): AcquiredProjection<T> {\n    let entry = this.#entries.get(spec.key);\n    if (entry == null) {\n      const store = new StreamStore<T>(spec.initial);\n      const newEntry: Entry = {\n        key: spec.key,\n        store: store as StreamStore<unknown>,\n        initial: spec.initial as unknown,\n        open: spec.open as ProjectionSpec<unknown>[\"open\"],\n        refCount: 0,\n        runtime: undefined,\n      };\n      // Open the runtime immediately when a thread is already bound.\n      // Otherwise it will be opened lazily by the next `bind()` call.\n      if (this.#thread != null) {\n        newEntry.runtime = spec.open({\n          thread: this.#thread,\n          store,\n          rootBus: this.#rootBus,\n        });\n      }\n      this.#entries.set(spec.key, newEntry);\n      entry = newEntry;\n    }\n    entry.refCount += 1;\n\n    let released = false;\n    return {\n      store: entry.store as StreamStore<T>,\n      release: () => {\n        if (released) return;\n        released = true;\n        const current = this.#entries.get(spec.key);\n        if (current == null) return;\n        current.refCount -= 1;\n        if (current.refCount <= 0) {\n          this.#entries.delete(spec.key);\n          if (current.runtime != null) void tryDispose(current.runtime);\n        }\n      },\n    };\n  }\n\n  /**\n   * Tear everything down.\n   *\n   * Detaches the bound thread (so no further `bind()` calls reopen\n   * runtimes) and disposes every live runtime in parallel. Safe to\n   * call multiple times — subsequent calls find an empty registry\n   * and resolve immediately.\n   */\n  async dispose(): Promise<void> {\n    this.#thread = undefined;\n    const entries = [...this.#entries.values()];\n    this.#entries.clear();\n    await Promise.all(\n      entries.map(async (entry) => {\n        if (entry.runtime != null) await tryDispose(entry.runtime);\n      })\n    );\n  }\n\n  /**\n   * Number of live entries. Diagnostic-only — callers should not\n   * branch on this value at runtime; it exists for tests asserting\n   * that consumers properly release their projections.\n   */\n  get size(): number {\n    return this.#entries.size;\n  }\n}\n\n/**\n * Best-effort runtime disposal.\n *\n * `dispose()` should never throw, but a misbehaving projection should\n * not be able to wedge the entire registry. We swallow disposal\n * errors so the surrounding `bind()` / `release()` / `dispose()`\n * paths always make progress.\n *\n * @param runtime - Runtime returned by {@link ProjectionSpec.open}.\n */\nasync function tryDispose(runtime: ProjectionRuntime): Promise<void> {\n  try {\n    await runtime.dispose();\n  } catch {\n    // Best-effort — dispose should never throw, but we don't want a\n    // bad projection to wedge the registry.\n  }\n}\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAoGA,IAAa,kBAAb,MAA6B;;CAE3B;;CAGA;;CAGA,2BAAoB,IAAI,KAAoB;;;;;;;;;;CAW5C,YAAY,SAAuB;AACjC,QAAA,UAAgB;;;;;;;;;;;;;;;;;;;;CAqBlB,KAAK,QAAwC;AAC3C,MAAI,MAAA,WAAiB,OAAQ;EAC7B,MAAM,WAAW,MAAA;AACjB,QAAA,SAAe;AACf,OAAK,MAAM,SAAS,MAAA,QAAc,QAAQ,EAAE;AAE1C,OAAI,MAAM,WAAW,QAAQ,YAAY,KAClC,YAAW,MAAM,QAAQ;AAEhC,SAAM,UAAU,KAAA;AAChB,SAAM,MAAM,SAAS,MAAM,QAAQ;AACnC,OAAI,UAAU,KACZ,OAAM,UAAU,MAAM,KAAK;IACzB;IACA,OAAO,MAAM;IACb,SAAS,MAAA;IACV,CAAC;;;;CAMR,IAAI,SAAmC;AACrC,SAAO,MAAA;;;;;;;;;;;;;;;;;;;;;;;;CAyBT,QAAW,MAAgD;EACzD,IAAI,QAAQ,MAAA,QAAc,IAAI,KAAK,IAAI;AACvC,MAAI,SAAS,MAAM;GACjB,MAAM,QAAQ,IAAIG,cAAAA,YAAe,KAAK,QAAQ;GAC9C,MAAM,WAAkB;IACtB,KAAK,KAAK;IACH;IACP,SAAS,KAAK;IACd,MAAM,KAAK;IACX,UAAU;IACV,SAAS,KAAA;IACV;AAGD,OAAI,MAAA,UAAgB,KAClB,UAAS,UAAU,KAAK,KAAK;IAC3B,QAAQ,MAAA;IACR;IACA,SAAS,MAAA;IACV,CAAC;AAEJ,SAAA,QAAc,IAAI,KAAK,KAAK,SAAS;AACrC,WAAQ;;AAEV,QAAM,YAAY;EAElB,IAAI,WAAW;AACf,SAAO;GACL,OAAO,MAAM;GACb,eAAe;AACb,QAAI,SAAU;AACd,eAAW;IACX,MAAM,UAAU,MAAA,QAAc,IAAI,KAAK,IAAI;AAC3C,QAAI,WAAW,KAAM;AACrB,YAAQ,YAAY;AACpB,QAAI,QAAQ,YAAY,GAAG;AACzB,WAAA,QAAc,OAAO,KAAK,IAAI;AAC9B,SAAI,QAAQ,WAAW,KAAW,YAAW,QAAQ,QAAQ;;;GAGlE;;;;;;;;;;CAWH,MAAM,UAAyB;AAC7B,QAAA,SAAe,KAAA;EACf,MAAM,UAAU,CAAC,GAAG,MAAA,QAAc,QAAQ,CAAC;AAC3C,QAAA,QAAc,OAAO;AACrB,QAAM,QAAQ,IACZ,QAAQ,IAAI,OAAO,UAAU;AAC3B,OAAI,MAAM,WAAW,KAAM,OAAM,WAAW,MAAM,QAAQ;IAC1D,CACH;;;;;;;CAQH,IAAI,OAAe;AACjB,SAAO,MAAA,QAAc;;;;;;;;;;;;;AAczB,eAAe,WAAW,SAA2C;AACnE,KAAI;AACF,QAAM,QAAQ,SAAS;SACjB"}