import { NegativeActiveSubscribersError } from '../errors' import { createSingleRowRefProxy, toExpression, } from '../query/builder/ref-proxy.js' import { CollectionSubscription } from './subscription.js' import type { StandardSchemaV1 } from '@standard-schema/spec' import type { ChangeMessage, SubscribeChangesOptions } from '../types' import type { CollectionLifecycleManager } from './lifecycle.js' import type { CollectionSyncManager } from './sync.js' import type { CollectionEventsManager } from './events.js' import type { CollectionImpl } from './index.js' import type { CollectionStateManager } from './state.js' import type { WithVirtualProps } from '../virtual-props.js' export class CollectionChangesManager< TOutput extends object = Record, TKey extends string | number = string | number, TSchema extends StandardSchemaV1 = StandardSchemaV1, TInput extends object = TOutput, > { private lifecycle!: CollectionLifecycleManager private sync!: CollectionSyncManager private events!: CollectionEventsManager private collection!: CollectionImpl private state!: CollectionStateManager public activeSubscribersCount = 0 public changeSubscriptions = new Set() public batchedEvents: Array> = [] public shouldBatchEvents = false /** * Creates a new CollectionChangesManager instance */ constructor() {} public setDeps(deps: { lifecycle: CollectionLifecycleManager sync: CollectionSyncManager events: CollectionEventsManager collection: CollectionImpl state: CollectionStateManager }) { this.lifecycle = deps.lifecycle this.sync = deps.sync this.events = deps.events this.collection = deps.collection this.state = deps.state } /** * Emit an empty ready event to notify subscribers that the collection is ready * This bypasses the normal empty array check in emitEvents */ public emitEmptyReadyEvent(): void { // Emit empty array directly to all subscribers for (const subscription of this.changeSubscriptions) { subscription.emitEvents([]) } } /** * Enriches a change message with virtual properties ($synced, $origin, $key, $collectionId). * Uses the "add-if-missing" pattern to preserve virtual properties from upstream collections. */ private enrichChangeWithVirtualProps( change: ChangeMessage, ): ChangeMessage, TKey> { return this.state.enrichChangeMessage(change) } /** * Emit events either immediately or batch them for later emission */ public emitEvents( changes: Array>, forceEmit = false, ): void { // Skip batching for user actions (forceEmit=true) to keep UI responsive if (this.shouldBatchEvents && !forceEmit) { // Add events to the batch this.batchedEvents.push(...changes) return } // Either we're not batching, or we're forcing emission (user action or ending batch cycle) let rawEvents = changes if (forceEmit) { // Force emit is used to end a batch (e.g. after a sync commit). Combine any // buffered optimistic events with the final changes so subscribers see the // whole picture, even if the sync diff is empty. if (this.batchedEvents.length > 0) { rawEvents = [...this.batchedEvents, ...changes] } this.batchedEvents = [] this.shouldBatchEvents = false } if (rawEvents.length === 0) { return } // Enrich all change messages with virtual properties // This uses the "add-if-missing" pattern to preserve pass-through semantics const enrichedEvents: Array< ChangeMessage, TKey> > = rawEvents.map((change) => this.enrichChangeWithVirtualProps(change)) // Emit to all listeners for (const subscription of this.changeSubscriptions) { subscription.emitEvents(enrichedEvents) } } /** * Subscribe to changes in the collection */ public subscribeChanges( callback: ( changes: Array>>, ) => void, options: SubscribeChangesOptions = {}, ): CollectionSubscription { // Start sync and track subscriber this.addSubscriber() // Compile where callback to whereExpression if provided if (options.where && options.whereExpression) { throw new Error( `Cannot specify both 'where' and 'whereExpression' options. Use one or the other.`, ) } const { where, ...opts } = options let whereExpression = opts.whereExpression if (where) { const proxy = createSingleRowRefProxy>() const result = where(proxy) whereExpression = toExpression(result) } const subscription = new CollectionSubscription(this.collection, callback, { ...opts, whereExpression, onUnsubscribe: () => { this.removeSubscriber() this.changeSubscriptions.delete(subscription) }, }) // Register status listener BEFORE requesting snapshot to avoid race condition. // This ensures the listener catches all status transitions, even if the // loadSubset promise resolves synchronously or very quickly. if (options.onStatusChange) { subscription.on(`status:change`, options.onStatusChange) } if (options.includeInitialState) { subscription.requestSnapshot({ trackLoadSubsetPromise: false, orderBy: options.orderBy, limit: options.limit, onLoadSubsetResult: options.onLoadSubsetResult, }) } else if (options.includeInitialState === false) { // When explicitly set to false (not just undefined), mark all state as "seen" // so that all future changes (including deletes) pass through unfiltered. subscription.markAllStateAsSeen() } // Add to batched listeners this.changeSubscriptions.add(subscription) return subscription } /** * Increment the active subscribers count and start sync if needed */ private addSubscriber(): void { const previousSubscriberCount = this.activeSubscribersCount this.activeSubscribersCount++ this.lifecycle.cancelGCTimer() // Start sync if collection was cleaned up if ( this.lifecycle.status === `cleaned-up` || this.lifecycle.status === `idle` ) { this.sync.startSync() } this.events.emitSubscribersChange( this.activeSubscribersCount, previousSubscriberCount, ) } /** * Decrement the active subscribers count and start GC timer if needed */ private removeSubscriber(): void { const previousSubscriberCount = this.activeSubscribersCount this.activeSubscribersCount-- if (this.activeSubscribersCount === 0) { this.lifecycle.startGCTimer() } else if (this.activeSubscribersCount < 0) { throw new NegativeActiveSubscribersError() } this.events.emitSubscribersChange( this.activeSubscribersCount, previousSubscriberCount, ) } /** * Clean up the collection by stopping sync and clearing data * This can be called manually or automatically by garbage collection */ public cleanup(): void { this.batchedEvents = [] this.shouldBatchEvents = false } }