/** * Push-based subscribable primitives for the query system. * * Two primitives: * - Subscribable — any value + "changed" notifications * - SubscribableArray — array + incremental delta events (added/removed) * * Key property: **lazy subscribe** — upstream subscriptions only activate * when the first subscriber attaches, and deactivate when the last leaves. * This means one-off reads (.value / .items) have zero subscription overhead. */ export type Unsubscribe = () => void // ═══════════════════════════════════════════════════════════════ // Subscribable — generic single-value // ═══════════════════════════════════════════════════════════════ export interface Subscribable { /** Current value — plain read, no side effects */ readonly value: T /** * Subscribe to change notifications. * - First call activates upstream subscriptions (lazy). * - Callback does NOT fire immediately — read .value for current state. * - Callback fires whenever .value changes. * - Last unsubscribe deactivates upstream. */ subscribe(cb: () => void, type?: 'derived' | 'reaction'): Unsubscribe /** Tear down all internal subscriptions */ dispose(): void } /** * Implementation of Subscribable with lazy upstream activation. */ export class SubscribableImpl implements Subscribable { private _value: T private _derivedSubscribers: (() => void)[] = [] private _subscribers: (() => void)[] = [] private _upstreamActive = false private _activateUpstream: (() => Unsubscribe) | null private _deactivateUpstream: Unsubscribe | null = null private _equals: (a: T, b: T) => boolean constructor( initialValue: T, activateUpstream?: () => Unsubscribe, opts?: { equals?: false | ((a: T, b: T) => boolean) }, ) { this._value = initialValue this._activateUpstream = activateUpstream ?? null this._equals = opts?.equals === false ? () => false : (opts?.equals ?? ((a, b) => a === b)) } get value(): T { return this._value } subscribe(cb: () => void, type?: 'derived' | 'reaction'): Unsubscribe { if (!this._upstreamActive && this._activateUpstream) { this._deactivateUpstream = this._activateUpstream() this._upstreamActive = true } const list = type === 'derived' ? this._derivedSubscribers : this._subscribers list.push(cb) // No immediate callback — subscriber reads .value for current state return () => { const idx = list.indexOf(cb) if (idx >= 0) list.splice(idx, 1) if (this._derivedSubscribers.length === 0 && this._subscribers.length === 0 && this._upstreamActive) { this._deactivateUpstream?.() this._deactivateUpstream = null this._upstreamActive = false } } } /** Update value and notify subscribers (skips if equals check passes) */ _set(value: T) { if (this._equals(value, this._value)) return this._value = value this._notify() } private _notify() { const derived = [...this._derivedSubscribers] for (const sub of derived) sub() const subs = [...this._subscribers] for (const sub of subs) sub() } dispose() { this._deactivateUpstream?.() this._deactivateUpstream = null this._derivedSubscribers.length = 0 this._subscribers.length = 0 this._upstreamActive = false } } // ═══════════════════════════════════════════════════════════════ // SubscribableArray — array with delta events // ═══════════════════════════════════════════════════════════════ /** Delta events — mirrors ThreadEvent shape */ export type ArrayEvent = | { init: readonly T[] } | { added: readonly T[]; removed: readonly T[] | null } /** Type guard for init events. Same logic as thread's isInitEvent. */ export function isArrayInitEvent(event: ArrayEvent): event is { init: readonly T[] } { return (event as any).init !== undefined } export interface SubscribableArray { /** * Current snapshot — plain readonly array. * * NOTE: only stays current while at least one subscriber is active * (upstream is lazily activated on first `.subscribe()`). With no * subscribers this returns the initial snapshot from construction * and does NOT reflect later mutations. Tests/consumers that want * to observe updates must hold a subscription (`subscribe(() => {})` * is enough). */ readonly items: readonly T[] /** Length shortcut */ readonly length: number /** * Subscribe to delta events. * - First call activates upstream subscriptions (lazy). * - No init event on subscribe — read .items for current state. * - Receives `{ init }` only on genuine resets (triggerRemap). * - Last unsubscribe deactivates upstream. */ subscribe(cb: (event: ArrayEvent) => void, type?: 'derived' | 'reaction'): Unsubscribe /** Tear down all internal subscriptions */ dispose(): void } /** * Implementation of SubscribableArray with lazy upstream activation. * * Constructor takes initial items (computed synchronously at query time) * and an optional activation function that sets up upstream subscriptions. * The activation function is only called on first `.subscribe()`. */ export class SubscribableArrayImpl implements SubscribableArray { private _items: T[] private _derivedSubscribers: ((event: ArrayEvent) => void)[] = [] private _subscribers: ((event: ArrayEvent) => void)[] = [] private _upstreamActive = false private _activateUpstream: (() => Unsubscribe) | null private _deactivateUpstream: Unsubscribe | null = null constructor( initialItems: T[], activateUpstream?: () => Unsubscribe, ) { this._items = initialItems this._activateUpstream = activateUpstream ?? null } get items(): readonly T[] { return this._items } get length(): number { return this._items.length } subscribe(cb: (event: ArrayEvent) => void, type?: 'derived' | 'reaction'): Unsubscribe { // Activate upstream on first subscriber (lazy) if (!this._upstreamActive && this._activateUpstream) { this._deactivateUpstream = this._activateUpstream() this._upstreamActive = true } const list = type === 'derived' ? this._derivedSubscribers : this._subscribers list.push(cb) // No init event — subscriber reads .items for current state return () => { const idx = list.indexOf(cb) if (idx >= 0) list.splice(idx, 1) // Deactivate upstream when last subscriber leaves if (this._derivedSubscribers.length === 0 && this._subscribers.length === 0 && this._upstreamActive) { this._deactivateUpstream?.() this._deactivateUpstream = null this._upstreamActive = false } } } /** Push items and notify subscribers */ _push(...items: T[]) { this._items.push(...items) this._notify({ added: items, removed: null }) } /** Remove items and notify subscribers */ _remove(items: readonly T[]) { for (const item of items) { const idx = this._items.indexOf(item) if (idx >= 0) this._items.splice(idx, 1) } this._notify({ added: [], removed: items }) } /** Full reset — replace all items */ _reset(items: T[]) { this._items = items this._notify({ init: [...this._items] }) } private _notify(event: ArrayEvent) { // Snapshot: subscriber callbacks may synchronously unsubscribe during iteration const derived = [...this._derivedSubscribers] for (const sub of derived) sub(event) const subs = [...this._subscribers] for (const sub of subs) sub(event) } dispose() { this._deactivateUpstream?.() this._deactivateUpstream = null this._derivedSubscribers.length = 0 this._subscribers.length = 0 this._upstreamActive = false } }