/** * TanStack Sync Collection * * Provides local-first sync with background synchronization. * Uses Electric-style shape subscriptions for real-time data. */ import type { BaseRecord, SyncCollectionOptions, Collection, SyncState, PendingMutation, Conflict, ConflictResolver, } from '../types' import { validateIdentifier } from '@dotdo/postgres-shared/validation' /** * Extended SyncCollection options with additional features */ interface ExtendedSyncCollectionOptions extends SyncCollectionOptions { retryOnReconnect?: boolean onConflict?: ConflictResolver batchMutations?: { maxBatchSize: number batchDelayMs: number } onConnectionChange?: (event: { connected: boolean; previousState: SyncState }) => void autoReconnect?: { enabled: boolean maxRetries: number initialDelay: number maxDelay: number backoffMultiplier: number } healthCheck?: { interval: number timeout: number } offlineSupport?: boolean } /** * SyncCollection - A collection that maintains real-time sync */ export class SyncCollection implements Collection { readonly id: string readonly table: string readonly syncUrl: string private items: Map = new Map() private subscribers: Set<(items: T[]) => void> = new Set() private options: ExtendedSyncCollectionOptions private syncState: SyncState = { connected: false, initialized: false, pendingCount: 0, } private pollIntervalId: ReturnType | undefined private pendingMutations: PendingMutation[] = [] private debug: boolean private retryOnReconnect: boolean private onConflict?: ConflictResolver private batchConfig?: { maxBatchSize: number; batchDelayMs: number } private batchTimeout?: ReturnType private pendingBatch: PendingMutation[] = [] private onConnectionChange?: (event: { connected: boolean; previousState: SyncState }) => void private autoReconnectConfig?: { enabled: boolean; maxRetries: number; initialDelay: number; maxDelay: number; backoffMultiplier: number } private _healthCheckConfig?: { interval: number; timeout: number } private offlineSupport: boolean = false private isCurrentlyOffline: boolean = false private syncStateChangeHandlers: Array<(event: string, state: SyncState) => void> = [] private healthStatus: { status: 'healthy' | 'degraded' | 'unhealthy'; lastCheckAt: number; latencyMs: number; consecutiveFailures: number } = { status: 'healthy', lastCheckAt: 0, latencyMs: 0, consecutiveFailures: 0, } constructor(options: ExtendedSyncCollectionOptions) { // Validate table name to prevent SQL injection validateIdentifier(options.table, 'table') this.id = options.id this.table = options.table this.syncUrl = options.syncUrl this.options = options this.debug = options.debug ?? false this.retryOnReconnect = options.retryOnReconnect ?? false this.onConflict = options.onConflict this.batchConfig = options.batchMutations this.onConnectionChange = options.onConnectionChange this.autoReconnectConfig = options.autoReconnect this._healthCheckConfig = options.healthCheck this.offlineSupport = options.offlineSupport ?? false } /** * Get the health check configuration */ getHealthCheckConfig(): { interval: number; timeout: number } | undefined { return this._healthCheckConfig } /** * Expose fetchData for testing */ async fetchData(): Promise { const url = this.buildSyncUrl() const response = await fetch(url) if (!response.ok) { throw new Error(`Sync fetch failed for collection "${this.id}" (table: ${this.table}): HTTP ${response.status}`) } const data = await response.json() as T[] await this.mergeData(data) } /** * Debug log helper - only logs if debug mode is enabled */ private debugLog(message: string, data?: unknown): void { if (this.debug) { const timestamp = new Date().toISOString() const prefix = `[SyncCollection:${this.id}]` if (data !== undefined) { console.log(`${timestamp} ${prefix} ${message}`, data) } else { console.log(`${timestamp} ${prefix} ${message}`) } } } /** * Get all items in the collection */ getAll(): T[] { return Array.from(this.items.values()) } /** * Get item by ID */ get(id: string | number): T | undefined { return this.items.get(id) } /** * Connect to the sync endpoint and start syncing */ async connect(): Promise { this.debugLog('Connecting to sync endpoint', { url: this.syncUrl, table: this.table }) const previousState = { ...this.syncState } this.emitSyncStateEvent('connecting', this.syncState) try { await this.fetchData() const { lastError: _omitted, ...restSyncState } = this.syncState // If retryOnReconnect is enabled and sync succeeded, clear pending mutations if (this.retryOnReconnect && this.pendingMutations.length > 0) { this.debugLog('Clearing pending mutations on reconnect (retryOnReconnect)', { count: this.pendingMutations.length, }) this.pendingMutations = [] } this.syncState = { ...restSyncState, connected: true, initialized: true, lastSyncAt: Date.now(), pendingCount: this.pendingMutations.length, } this.isCurrentlyOffline = false this.healthStatus = { status: 'healthy', lastCheckAt: Date.now(), latencyMs: 0, consecutiveFailures: 0, } // Emit connection change if (this.onConnectionChange) { this.onConnectionChange({ connected: true, previousState }) } this.emitSyncStateEvent('connected', this.syncState) this.debugLog('Sync connected successfully', { itemCount: this.items.size, pendingCount: this.pendingMutations.length, }) this.notifySubscribers() // Start polling if configured if (this.options.pollInterval) { this.debugLog('Starting polling', { interval: this.options.pollInterval }) this.startPolling() } // Start batch processing if configured if (this.batchConfig && this.pendingBatch.length > 0) { this.scheduleBatchFlush() } } catch (error) { this.syncState = { ...this.syncState, connected: false, lastError: error as Error, } this.debugLog('Sync connection failed', { error: (error as Error).message }) // Auto-reconnect if configured if (this.autoReconnectConfig?.enabled) { this.scheduleReconnect(1) } throw error } } /** * Schedule an auto-reconnect attempt */ private scheduleReconnect(attempt: number): void { if (!this.autoReconnectConfig) return if (attempt > this.autoReconnectConfig.maxRetries) return const delay = Math.min( this.autoReconnectConfig.initialDelay * Math.pow(this.autoReconnectConfig.backoffMultiplier, attempt - 1), this.autoReconnectConfig.maxDelay ) setTimeout(async () => { try { await this.connect() } catch { // Will schedule next reconnect in connect() catch block } }, delay) } /** * Register a sync state change handler */ onSyncStateChange(handler: (event: string, state: SyncState) => void): () => void { this.syncStateChangeHandlers.push(handler) return () => { const idx = this.syncStateChangeHandlers.indexOf(handler) if (idx >= 0) this.syncStateChangeHandlers.splice(idx, 1) } } /** * Get health status */ getHealthStatus(): { status: 'healthy' | 'degraded' | 'unhealthy'; lastCheckAt: number; latencyMs: number; consecutiveFailures: number } { return { ...this.healthStatus } } /** * Check if the collection is currently offline */ isOffline(): boolean { return this.isCurrentlyOffline } /** * Disconnect from sync */ disconnect(): void { const previousState = { ...this.syncState } this.emitSyncStateEvent('disconnecting', this.syncState) this.stopPolling() if (this.batchTimeout) { clearTimeout(this.batchTimeout) this.batchTimeout = undefined } this.syncState = { ...this.syncState, connected: false, } if (this.onConnectionChange) { this.onConnectionChange({ connected: false, previousState }) } this.emitSyncStateEvent('disconnected', this.syncState) this.subscribers.clear() } /** * Clear all local data and pending mutations */ clear(): void { this.items.clear() this.pendingMutations = [] this.updatePendingCount() this.debugLog('Cache cleared') } /** * Get the current subscriber count for debugging */ getSubscriberCount(): number { return this.subscribers.size } /** * Insert a new item */ async insert(data: Omit & { id?: T['id'] }): Promise { const id = data.id ?? this.generateId() const item = { ...data, id } as T this.items.set(id, item) // Track as pending mutation const mutationId = this.generateMutationId() const mutation: PendingMutation = { id: mutationId, type: 'insert', collectionId: this.id, data: item, timestamp: Date.now(), } this.pendingMutations.push(mutation) this.updatePendingCount() this.debugLog('Mutation added: insert', { mutationId, itemId: id, pendingCount: this.pendingMutations.length, }) // Batch mutations if configured if (this.batchConfig && this.syncState.connected) { this.pendingBatch.push(mutation) this.scheduleBatchFlush() } // If offline support is enabled, try to sync the mutation and detect offline state if (this.offlineSupport && this.syncState.connected) { try { const url = this.buildSyncUrl() await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ type: 'insert', data: item }), }) } catch { this.isCurrentlyOffline = true this.debugLog('Offline detected during insert', { itemId: id }) } } this.notifySubscribers() return item } /** * Update an existing item */ async update(id: string | number, data: Partial): Promise { const existing = this.items.get(id) if (!existing) { throw new Error(`Update failed: item with id "${id}" not found in collection "${this.id}" (table: ${this.table})`) } const updated = { ...existing, ...data, id } as T this.items.set(id, updated) // Track as pending mutation const mutationId = this.generateMutationId() this.pendingMutations.push({ id: mutationId, type: 'update', collectionId: this.id, data: { id, ...data } as Partial, timestamp: Date.now(), }) this.updatePendingCount() this.debugLog('Mutation added: update', { mutationId, itemId: id, pendingCount: this.pendingMutations.length, }) this.notifySubscribers() return updated } /** * Delete an item */ async delete(id: string | number): Promise { if (!this.items.has(id)) { throw new Error(`Delete failed: item with id "${id}" not found in collection "${this.id}" (table: ${this.table})`) } this.items.delete(id) // Track as pending mutation const mutationId = this.generateMutationId() this.pendingMutations.push({ id: mutationId, type: 'delete', collectionId: this.id, data: { id } as Partial, timestamp: Date.now(), }) this.updatePendingCount() this.debugLog('Mutation added: delete', { mutationId, itemId: id, pendingCount: this.pendingMutations.length, }) this.notifySubscribers() } /** * Subscribe to changes */ subscribe(callback: (items: T[]) => void): () => void { this.subscribers.add(callback) return () => { this.subscribers.delete(callback) } } /** * Get current sync state */ getSyncState(): SyncState { return { ...this.syncState } } /** * Get the pending mutation count */ getPendingMutationCount(): number { return this.pendingMutations.length } /** * Dispose of all resources - call when the collection is no longer needed */ dispose(): void { this.disconnect() this.items.clear() this.pendingMutations = [] } private buildSyncUrl(): string { const url = new URL(this.syncUrl) url.searchParams.set('table', this.table) if (this.options.shapeParams) { for (const [key, value] of Object.entries(this.options.shapeParams)) { url.searchParams.set(key, value) } } return url.toString() } private async mergeData(data: T[]): Promise { // Simple merge strategy - remote data wins for non-pending items const pendingIds = new Set( this.pendingMutations .filter(m => m.type !== 'delete') .map(m => (m.data as BaseRecord).id) ) for (const item of data) { // Check for conflicts with pending mutations if (pendingIds.has(item.id) && this.onConflict) { const mutation = this.pendingMutations.find( m => (m.data as BaseRecord).id === item.id ) if (mutation) { const conflictType = mutation.type === 'insert' ? 'insert-insert' : 'update-update' const conflict: Conflict = { type: conflictType as 'insert-insert' | 'update-update', collectionId: this.id, recordId: item.id, localValue: mutation.data, remoteValue: item as Partial, localTimestamp: mutation.timestamp, remoteTimestamp: Date.now(), } const resolved = await this.onConflict(conflict) if (resolved) { this.items.set(item.id, resolved) } continue } } // Don't overwrite items with pending local mutations if (!pendingIds.has(item.id)) { this.items.set(item.id, item) } } // Handle remote deletes const remoteIds = new Set(data.map(item => item.id)) for (const localId of this.items.keys()) { if (!remoteIds.has(localId) && !pendingIds.has(localId)) { // Item was deleted remotely and no local pending changes // Keep it if it's a locally inserted item const isPendingInsert = this.pendingMutations.some( m => m.type === 'insert' && (m.data as BaseRecord).id === localId ) if (!isPendingInsert) { this.items.delete(localId) } } } // Clear confirmed mutations to prevent memory leak this.clearConfirmedMutations(data) } /** * Clear pending mutations that have been confirmed by the server. * - Insert/Update mutations are confirmed when the item appears in sync response * - Delete mutations are confirmed when the item does NOT appear in sync response */ private clearConfirmedMutations(serverData: T[]): void { const remoteIds = new Set(serverData.map(item => item.id)) const beforeCount = this.pendingMutations.length this.pendingMutations = this.pendingMutations.filter(mutation => { const itemId = (mutation.data as BaseRecord).id let shouldKeep = true if (mutation.type === 'insert' || mutation.type === 'update') { // Insert/Update is confirmed if the item exists on the server shouldKeep = !remoteIds.has(itemId) } else if (mutation.type === 'delete') { // Delete is confirmed if the item does NOT exist on the server shouldKeep = remoteIds.has(itemId) } if (!shouldKeep) { this.debugLog('Mutation confirmed and cleared', { mutationId: mutation.id, type: mutation.type, itemId, }) } return shouldKeep }) const afterCount = this.pendingMutations.length const clearedCount = beforeCount - afterCount if (clearedCount > 0) { this.debugLog('Mutations cleared after sync', { clearedCount, remainingCount: afterCount, }) } this.updatePendingCount() } private scheduleBatchFlush(): void { if (this.batchTimeout) return if (!this.batchConfig) return this.batchTimeout = setTimeout(async () => { this.batchTimeout = undefined if (this.pendingBatch.length === 0) return const batch = this.pendingBatch.splice(0, this.batchConfig!.maxBatchSize) const url = this.buildSyncUrl() try { await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ batch: batch.map(m => ({ type: m.type, data: m.data })) }), }) } catch { // Re-add failed batch items this.pendingBatch.unshift(...batch) } // Schedule next flush if items remain if (this.pendingBatch.length > 0) { this.scheduleBatchFlush() } }, this.batchConfig.batchDelayMs) } private startPolling(): void { if (this.pollIntervalId) return this.pollIntervalId = setInterval(async () => { this.debugLog('Polling sync endpoint') try { await this.fetchData() const { lastError: _omitted, ...restSyncState } = this.syncState // If we were offline and successfully synced, clear pending mutations if (this.offlineSupport && this.isCurrentlyOffline) { this.isCurrentlyOffline = false this.pendingMutations = [] this.debugLog('Came back online, cleared pending mutations') } this.syncState = { ...restSyncState, lastSyncAt: Date.now(), pendingCount: this.pendingMutations.length, } this.debugLog('Sync poll succeeded', { itemCount: this.items.size, pendingCount: this.pendingMutations.length, }) this.notifySubscribers() } catch (error) { if (this.offlineSupport) { this.isCurrentlyOffline = true } this.syncState = { ...this.syncState, lastError: error as Error, } this.debugLog('Sync poll failed', { error: (error as Error).message }) } }, this.options.pollInterval) } private stopPolling(): void { if (this.pollIntervalId) { clearInterval(this.pollIntervalId) this.pollIntervalId = undefined } } private generateId(): string { return `${this.id}-${Date.now()}-${Math.random().toString(36).slice(2, 9)}` } private generateMutationId(): string { return `mut-${Date.now()}-${Math.random().toString(36).slice(2, 9)}` } private updatePendingCount(): void { this.syncState = { ...this.syncState, pendingCount: this.pendingMutations.length, } } /** * Emit a sync state change event to all registered handlers */ private emitSyncStateEvent(event: string, state: SyncState): void { for (const handler of this.syncStateChangeHandlers) { handler(event, { ...state }) } } private notifySubscribers(): void { const items = this.getAll() for (const callback of this.subscribers) { callback(items) } } } /** * Factory function to create a SyncCollection. * * @param options - Configuration including table name, sync URL, and polling parameters * @returns A new SyncCollection instance configured for real-time sync * * @example * ```typescript * const todos = createSyncCollection({ * id: 'todos', * table: 'todos', * syncUrl: 'https://db.postgres.do/v1/shape', * pollInterval: 5000, * }) * await todos.connect() * ``` */ export function createSyncCollection( options: SyncCollectionOptions ): SyncCollection { return new SyncCollection(options) } /** * SyncEngine - Manages multiple sync collections */ export interface SyncEngineOptions { baseUrl: string } export class SyncEngine { private baseUrl: string private collections: Map> = new Map() constructor(options: SyncEngineOptions) { this.baseUrl = options.baseUrl } /** * Register a collection for syncing */ registerCollection( options: Omit, 'syncUrl'> & { syncUrl?: string } ): SyncCollection { const syncUrl = options.syncUrl || `${this.baseUrl}/v1/shape` const collection = new SyncCollection({ ...options, syncUrl, }) this.collections.set(options.id, collection as unknown as SyncCollection) return collection } /** * Get a registered collection */ getCollection(id: string): SyncCollection | undefined { return this.collections.get(id) as SyncCollection | undefined } /** * Get all collection IDs */ getCollectionIds(): string[] { return Array.from(this.collections.keys()) } /** * Connect all collections */ async connectAll(): Promise { await Promise.all( Array.from(this.collections.values()).map(c => c.connect()) ) } /** * Disconnect all collections */ disconnectAll(): void { for (const collection of this.collections.values()) { collection.disconnect() } } /** * Get total pending mutations across all collections */ getTotalPendingMutations(): number { let total = 0 for (const collection of this.collections.values()) { total += collection.getSyncState().pendingCount } return total } /** * Dispose all resources */ async dispose(): Promise { this.disconnectAll() this.collections.clear() } }