/** * TanStack Query Collection Adapter * * Provides TanStack Query patterns for data fetching with collections. * Supports caching, stale time, and automatic refetching. */ import type { BaseRecord, QueryCollectionOptions, Collection, SyncState, } from '../types' import { validateIdentifier } from '@dotdo/postgres-shared/validation' /** * Extended options for QueryCollection with memory management */ export interface QueryCollectionOptionsWithMemory extends QueryCollectionOptions { maxSize?: number evictionPolicy?: 'lru' | 'fifo' maxSubscribers?: number /** Initial data to populate the collection synchronously */ initialData?: T[] } /** * QueryCollection - A collection that fetches data via TanStack Query patterns */ export class QueryCollection implements Collection { readonly id: string readonly table: string private items: Map = new Map() private itemAccessOrder: Map = new Map() private accessCounter = 0 private subscribers: Set<(items: T[]) => void> = new Set() private options: QueryCollectionOptionsWithMemory private syncState: SyncState = { connected: false, initialized: false, pendingCount: 0, } private lastFetchAt: number | undefined private refetchIntervalId: ReturnType | undefined private nextId = 1 private gcTimeoutId: ReturnType | undefined private _finalizeCallbacks: Array<() => void> | undefined constructor(options: QueryCollectionOptionsWithMemory) { // Validate table name to prevent SQL injection validateIdentifier(options.table, 'table') this.id = options.id this.table = options.table this.options = options // Populate initial data if provided if (options.initialData) { for (const item of options.initialData) { this.items.set(item.id, item) this.itemAccessOrder.set(item.id, ++this.accessCounter) if (typeof item.id === 'number' && item.id >= this.nextId) { this.nextId = item.id + 1 } } this.syncState = { ...this.syncState, initialized: true, } } if (options.cacheTime) { this.scheduleGC() } } /** * Get the query function for direct invocation */ getQueryFn(): (() => Promise) | undefined { return this.options.queryFn } /** * Get all items in the collection */ getAll(): T[] { return Array.from(this.items.values()) } /** * Get item by ID */ get(id: string | number): T | undefined { const item = this.items.get(id) if (item) { this.itemAccessOrder.set(id, ++this.accessCounter) } return item } /** * Check if the cached data is stale */ isStale(): boolean { if (!this.lastFetchAt) return true const staleTime = this.getEffectiveStaleTime() if (staleTime === Infinity) return false if (staleTime === 0) return true return Date.now() - this.lastFetchAt >= staleTime } /** * Get the effective stale time (supports dynamic staleTime based on data) */ private getEffectiveStaleTime(): number { const staleTime = this.options.staleTime if (typeof staleTime === 'function') { return (staleTime as (data: T[]) => number)(this.getAll()) } return staleTime ?? 0 } /** * Get optimistic view of collection data merged with optimistic store */ getOptimisticData(store: { getState: () => { applied: Array<{ type: string; recordId: string | number; data: Partial }> } }): T[] { let items = this.getAll() const state = store.getState() for (const mutation of state.applied) { if (mutation.type === 'insert') { items = [...items, mutation.data as T] } else if (mutation.type === 'update') { items = items.map(item => item.id === mutation.recordId ? { ...item, ...mutation.data } : item ) } else if (mutation.type === 'delete') { items = items.filter(item => item.id !== mutation.recordId) } } return items } /** * Fetch data from the query function */ async fetch(): Promise { // Check if data is still fresh (not stale) if (this.lastFetchAt) { const staleTime = this.getEffectiveStaleTime() if (staleTime === Infinity) { return this.getAll() } if (staleTime > 0) { const elapsed = Date.now() - this.lastFetchAt if (elapsed < staleTime) { return this.getAll() } } // staleTime === 0 means always refetch } try { const data = await this.options.queryFn() this.items.clear() this.itemAccessOrder.clear() for (const item of data) { this.items.set(item.id, item) this.itemAccessOrder.set(item.id, ++this.accessCounter) // Track highest numeric ID for auto-generation if (typeof item.id === 'number' && item.id >= this.nextId) { this.nextId = item.id + 1 } } this.lastFetchAt = Date.now() const { lastError: _omitted, ...restSyncState } = this.syncState this.syncState = { ...restSyncState, initialized: true, lastSyncAt: this.lastFetchAt, } return data } catch (error) { this.syncState = { ...this.syncState, lastError: error as Error, } throw error } } /** * Insert a new item */ async insert(data: Omit & { id?: T['id'] }): Promise { const id = data.id ?? this.generateId() const item = { ...data, id } as T if (this.options.maxSize && this.items.size >= this.options.maxSize) { this.evictOne() } this.items.set(id, item) this.itemAccessOrder.set(id, ++this.accessCounter) this.notifySubscribers() return item } /** * Update an existing item */ async update(id: string | number, data: Partial): Promise { const existing = this.items.get(id) if (!existing) { throw new Error(`Update failed: item with id "${id}" not found in collection "${this.id}" (table: ${this.table})`) } const updated = { ...existing, ...data, id } as T this.items.set(id, updated) this.itemAccessOrder.set(id, ++this.accessCounter) this.notifySubscribers() return updated } /** * Delete an item */ async delete(id: string | number): Promise { if (!this.items.has(id)) { throw new Error(`Delete failed: item with id "${id}" not found in collection "${this.id}" (table: ${this.table})`) } this.items.delete(id) this.itemAccessOrder.delete(id) this.notifySubscribers() } /** * Subscribe to changes */ subscribe(callback: (items: T[]) => void): () => void { this.subscribers.add(callback) if ( this.options.maxSubscribers && this.subscribers.size > this.options.maxSubscribers ) { console.warn( `[QueryCollection:${this.id}] Subscriber threshold exceeded (${this.subscribers.size}). ` + `This may indicate a memory leak. Ensure components properly unsubscribe on unmount.` ) } return () => { this.subscribers.delete(callback) } } /** * Subscribe with automatic cleanup after inactivity */ subscribeWithTimeout(callback: (items: T[]) => void, options: { timeoutMs: number }): () => void { let timeoutId: ReturnType | undefined let unsubscribed = false const wrappedCallback = (items: T[]) => { if (unsubscribed) return callback(items) // Reset timeout on each activity if (timeoutId) clearTimeout(timeoutId) timeoutId = setTimeout(() => { unsubscribed = true this.subscribers.delete(wrappedCallback) }, options.timeoutMs) } this.subscribers.add(wrappedCallback) // Start the inactivity timer timeoutId = setTimeout(() => { unsubscribed = true this.subscribers.delete(wrappedCallback) }, options.timeoutMs) return () => { unsubscribed = true if (timeoutId) clearTimeout(timeoutId) this.subscribers.delete(wrappedCallback) } } /** * Subscribe with pause/resume capability */ pausableSubscribe(callback: (items: T[]) => void): { pause: () => void; resume: () => void; unsubscribe: () => void } { let paused = false const wrappedCallback = (items: T[]) => { if (!paused) { callback(items) } } this.subscribers.add(wrappedCallback) return { pause: () => { paused = true }, resume: () => { paused = false }, unsubscribe: () => { this.subscribers.delete(wrappedCallback) }, } } /** * Weak subscribe - registers a callback that can be GC'd */ weakSubscribe(callback: (items: T[]) => void): void { this.subscribe(callback) } /** * Get the current subscriber count for debugging */ getSubscriberCount(): number { return this.subscribers.size } /** * Detect orphaned subscriptions for memory debugging */ detectOrphanedSubscriptions(): { count: number; warning: string } { const count = this.subscribers.size const warning = count > 0 ? `${count} active subscriber(s) detected. Ensure all subscriptions are properly cleaned up.` : 'No active subscribers.' return { count, warning } } /** * Register a finalization callback */ onFinalize(callback: () => void): void { this._finalizeCallbacks = this._finalizeCallbacks || [] this._finalizeCallbacks.push(callback) } /** * Get current sync state */ getSyncState(): SyncState { return { ...this.syncState } } /** * Start automatic refetching at the configured interval */ startAutoRefetch(): void { if (!this.options.refetchInterval) return if (this.refetchIntervalId) return this.refetchIntervalId = setInterval(() => { this.fetch().catch(() => { // Errors are tracked in syncState }) }, this.options.refetchInterval) } /** * Stop automatic refetching */ stopAutoRefetch(): void { if (this.refetchIntervalId) { clearInterval(this.refetchIntervalId) this.refetchIntervalId = undefined } } /** * Get the count of active timers for debugging */ getActiveTimerCount(): number { let count = 0 if (this.refetchIntervalId) count++ if (this.gcTimeoutId) count++ return count } /** * Clear all cached data */ clear(): void { this.items.clear() this.itemAccessOrder.clear() this.lastFetchAt = undefined this.syncState = { ...this.syncState, initialized: false, } this.notifySubscribers() if (this._finalizeCallbacks) { for (const cb of this._finalizeCallbacks) { try { cb() } catch { // Ignore errors in cleanup callbacks } } } } /** * Garbage collect stale data */ gc(): void { this.clear() } /** * Dispose of all resources - call when the collection is no longer needed */ dispose(): void { this.stopAutoRefetch() if (this.gcTimeoutId) { clearTimeout(this.gcTimeoutId) this.gcTimeoutId = undefined } this.subscribers.clear() this.items.clear() this.itemAccessOrder.clear() this._finalizeCallbacks = undefined } private evictOne(): void { if (this.items.size === 0) return const policy = this.options.evictionPolicy || 'lru' let idToEvict: string | number | undefined if (policy === 'lru') { let oldestOrder = Infinity for (const [id, order] of this.itemAccessOrder) { if (order < oldestOrder) { oldestOrder = order idToEvict = id } } } else { idToEvict = this.items.keys().next().value } if (idToEvict !== undefined) { this.items.delete(idToEvict) this.itemAccessOrder.delete(idToEvict) } } private scheduleGC(): void { if (this.gcTimeoutId) { clearTimeout(this.gcTimeoutId) } if (this.options.cacheTime) { this.gcTimeoutId = setTimeout(() => { this.clear() }, this.options.cacheTime) } } private generateId(): string | number { // Determine ID type from existing items or default to number const existingIds = Array.from(this.items.keys()) if (existingIds.length > 0 && typeof existingIds[0] === 'string') { return `${this.id}-${Date.now()}-${Math.random().toString(36).slice(2, 9)}` } return this.nextId++ } private notifySubscribers(): void { const items = this.getAll() for (const callback of this.subscribers) { callback(items) } } } /** * Factory function to create a QueryCollection. * * @param options - Configuration including table name, query function, and caching parameters * @returns A new QueryCollection instance configured with the given options * * @example * ```typescript * const users = createQueryCollection({ * id: 'users', * table: 'users', * queryFn: () => fetch('/api/users').then(r => r.json()), * staleTime: 30000, * }) * ``` */ export function createQueryCollection( options: QueryCollectionOptions ): QueryCollection { return new QueryCollection(options) }