import { QueryObserver, hashKey } from '@tanstack/query-core' import { deepEquals } from '@tanstack/db' import { GetKeyRequiredError, QueryClientRequiredError, QueryFnRequiredError, QueryKeyRequiredError, } from './errors' import { createWriteUtils } from './manual-sync' import { serializeLoadSubsetOptions } from './serialization' import type { BaseCollectionConfig, ChangeMessage, CollectionConfig, DeleteMutationFnParams, InsertMutationFnParams, LoadSubsetOptions, SyncConfig, SyncMetadataApi, UpdateMutationFnParams, UtilsRecord, } from '@tanstack/db' import type { FetchStatus, QueryClient, QueryFunctionContext, QueryKey, QueryObserverOptions, QueryObserverResult, } from '@tanstack/query-core' import type { StandardSchemaV1 } from '@standard-schema/spec' // Re-export for external use export type { SyncOperation } from './manual-sync' // Schema output type inference helper (matches electric.ts pattern) type InferSchemaOutput = T extends StandardSchemaV1 ? StandardSchemaV1.InferOutput extends object ? StandardSchemaV1.InferOutput : Record : Record // Schema input type inference helper (matches electric.ts pattern) type InferSchemaInput = T extends StandardSchemaV1 ? StandardSchemaV1.InferInput extends object ? StandardSchemaV1.InferInput : Record : Record type TQueryKeyBuilder = (opts: LoadSubsetOptions) => TQueryKey /** * Configuration options for creating a Query Collection * @template T - The explicit type of items stored in the collection * @template TQueryFn - The queryFn type * @template TError - The type of errors that can occur during queries * @template TQueryKey - The type of the query key * @template TKey - The type of the item keys * @template TSchema - The schema type for validation */ export interface QueryCollectionConfig< T extends object = object, TQueryFn extends (context: QueryFunctionContext) => any = ( context: QueryFunctionContext, ) => any, TError = unknown, TQueryKey extends QueryKey = QueryKey, TKey extends string | number = string | number, TSchema extends StandardSchemaV1 = never, TQueryData = Awaited>, > extends BaseCollectionConfig { /** The query key used by TanStack Query to identify this query */ queryKey: TQueryKey | TQueryKeyBuilder /** Function that fetches data from the server. Must return the complete collection state */ queryFn: TQueryFn extends ( context: QueryFunctionContext, ) => Promise> | Array ? (context: QueryFunctionContext) => Promise> | Array : TQueryFn /* Function that extracts array items from wrapped API responses (e.g metadata, pagination) */ select?: (data: TQueryData) => Array /** The TanStack Query client instance */ queryClient: QueryClient // Query-specific options /** Whether the query should automatically run (default: true) */ enabled?: QueryObserverOptions< TQueryData, TError, Array, TQueryData, TQueryKey >[`enabled`] refetchInterval?: QueryObserverOptions< TQueryData, TError, Array, TQueryData, TQueryKey >[`refetchInterval`] retry?: QueryObserverOptions< TQueryData, TError, Array, TQueryData, TQueryKey >[`retry`] retryDelay?: QueryObserverOptions< TQueryData, TError, Array, TQueryData, TQueryKey >[`retryDelay`] staleTime?: QueryObserverOptions< TQueryData, TError, Array, TQueryData, TQueryKey >[`staleTime`] gcTime?: QueryObserverOptions< TQueryData, TError, Array, TQueryData, TQueryKey >[`gcTime`] persistedGcTime?: number /** * Metadata to pass to the query. * Available in queryFn via context.meta * * @example * // Using meta for error context * queryFn: async (context) => { * try { * return await api.getTodos(userId) * } catch (error) { * // Use meta for better error messages * throw new Error( * context.meta?.errorMessage || 'Failed to load todos' * ) * } * }, * meta: { * errorMessage: `Failed to load todos for user ${userId}` * } */ meta?: Record } /** * Type for the refetch utility function * Returns the QueryObserverResult from TanStack Query */ export type RefetchFn = (opts?: { throwOnError?: boolean }) => Promise | void>> /** * Utility methods available on Query Collections for direct writes and manual operations. * Direct writes bypass the normal query/mutation flow and write directly to the synced data store. * @template TItem - The type of items stored in the collection * @template TKey - The type of the item keys * @template TInsertInput - The type accepted for insert operations * @template TError - The type of errors that can occur during queries */ export interface QueryCollectionUtils< TItem extends object = Record, TKey extends string | number = string | number, TInsertInput extends object = TItem, TError = unknown, > extends UtilsRecord { /** Manually trigger a refetch of the query */ refetch: RefetchFn /** Insert one or more items directly into the synced data store without triggering a query refetch or optimistic update */ writeInsert: (data: TInsertInput | Array) => void /** Update one or more items directly in the synced data store without triggering a query refetch or optimistic update */ writeUpdate: (updates: Partial | Array>) => void /** Delete one or more items directly from the synced data store without triggering a query refetch or optimistic update */ writeDelete: (keys: TKey | Array) => void /** Insert or update one or more items directly in the synced data store without triggering a query refetch or optimistic update */ writeUpsert: (data: Partial | Array>) => void /** Execute multiple write operations as a single atomic batch to the synced data store */ writeBatch: (callback: () => void) => void // Query Observer State (getters) /** Get the last error encountered by the query (if any); reset on success */ lastError: TError | undefined /** Check if the collection is in an error state */ isError: boolean /** * Get the number of consecutive sync failures. * Incremented only when query fails completely (not per retry attempt); reset on success. */ errorCount: number /** Check if query is currently fetching (initial or background) */ isFetching: boolean /** Check if query is refetching in background (not initial fetch) */ isRefetching: boolean /** Check if query is loading for the first time (no data yet) */ isLoading: boolean /** Get timestamp of last successful data update (in milliseconds) */ dataUpdatedAt: number /** Get current fetch status */ fetchStatus: `fetching` | `paused` | `idle` /** * Clear the error state and trigger a refetch of the query * @returns Promise that resolves when the refetch completes successfully * @throws Error if the refetch fails */ clearError: () => Promise } /** * Internal state object for tracking query observer and errors */ interface QueryCollectionState { lastError: any errorCount: number lastErrorUpdatedAt: number observers: Map< string, QueryObserver, any, Array, Array, any> > } type PersistedQueryRetentionEntry = | { queryHash: string mode: `ttl` expiresAt: number } | { queryHash: string mode: `until-revalidated` } const QUERY_COLLECTION_GC_PREFIX = `queryCollection:gc:` type PersistedScannedRowForQuery = { key: string | number value: TItem metadata?: unknown } type QuerySyncMetadataWithPersistedScan = SyncMetadataApi< string | number > & { row: SyncMetadataApi[`row`] & { scanPersisted?: (options?: { metadataOnly?: boolean }) => Promise>> } } /** * Implementation class for QueryCollectionUtils with explicit dependency injection * for better testability and architectural clarity */ class QueryCollectionUtilsImpl { private state: QueryCollectionState private refetchFn: RefetchFn // Write methods public refetch: RefetchFn public writeInsert: any public writeUpdate: any public writeDelete: any public writeUpsert: any public writeBatch: any constructor( state: QueryCollectionState, refetch: RefetchFn, writeUtils: ReturnType, ) { this.state = state this.refetchFn = refetch // Initialize methods to use passed dependencies this.refetch = refetch this.writeInsert = writeUtils.writeInsert this.writeUpdate = writeUtils.writeUpdate this.writeDelete = writeUtils.writeDelete this.writeUpsert = writeUtils.writeUpsert this.writeBatch = writeUtils.writeBatch } public async clearError() { this.state.lastError = undefined this.state.errorCount = 0 this.state.lastErrorUpdatedAt = 0 await this.refetchFn({ throwOnError: true }) } // Getters for error state public get lastError() { return this.state.lastError } public get isError() { return !!this.state.lastError } public get errorCount() { return this.state.errorCount } // Getters for QueryObserver state public get isFetching() { // check if any observer is fetching return Array.from(this.state.observers.values()).some( (observer) => observer.getCurrentResult().isFetching, ) } public get isRefetching() { // check if any observer is refetching return Array.from(this.state.observers.values()).some( (observer) => observer.getCurrentResult().isRefetching, ) } public get isLoading() { // check if any observer is loading return Array.from(this.state.observers.values()).some( (observer) => observer.getCurrentResult().isLoading, ) } public get dataUpdatedAt() { // compute the max dataUpdatedAt of all observers return Math.max( 0, ...Array.from(this.state.observers.values()).map( (observer) => observer.getCurrentResult().dataUpdatedAt, ), ) } public get fetchStatus(): Array { return Array.from(this.state.observers.values()).map( (observer) => observer.getCurrentResult().fetchStatus, ) } } /** * Creates query collection options for use with a standard Collection. * This integrates TanStack Query with TanStack DB for automatic synchronization. * * Supports automatic type inference following the priority order: * 1. Schema inference (highest priority) * 2. QueryFn return type inference (second priority) * * @template T - Type of the schema if a schema is provided otherwise it is the type of the values returned by the queryFn * @template TError - The type of errors that can occur during queries * @template TQueryKey - The type of the query key * @template TKey - The type of the item keys * @param config - Configuration options for the Query collection * @returns Collection options with utilities for direct writes and manual operations * * @example * // Type inferred from queryFn return type (NEW!) * const todosCollection = createCollection( * queryCollectionOptions({ * queryKey: ['todos'], * queryFn: async () => { * const response = await fetch('/api/todos') * return response.json() as Todo[] // Type automatically inferred! * }, * queryClient, * getKey: (item) => item.id, // item is typed as Todo * }) * ) * * @example * // Explicit type * const todosCollection = createCollection( * queryCollectionOptions({ * queryKey: ['todos'], * queryFn: async () => fetch('/api/todos').then(r => r.json()), * queryClient, * getKey: (item) => item.id, * }) * ) * * @example * // Schema inference * const todosCollection = createCollection( * queryCollectionOptions({ * queryKey: ['todos'], * queryFn: async () => fetch('/api/todos').then(r => r.json()), * queryClient, * schema: todoSchema, // Type inferred from schema * getKey: (item) => item.id, * }) * ) * * @example * // With persistence handlers * const todosCollection = createCollection( * queryCollectionOptions({ * queryKey: ['todos'], * queryFn: fetchTodos, * queryClient, * getKey: (item) => item.id, * onInsert: async ({ transaction }) => { * await api.createTodos(transaction.mutations.map(m => m.modified)) * }, * onUpdate: async ({ transaction }) => { * await api.updateTodos(transaction.mutations) * }, * onDelete: async ({ transaction }) => { * await api.deleteTodos(transaction.mutations.map(m => m.key)) * } * }) * ) * * @example * // The select option extracts the items array from a response with metadata * const todosCollection = createCollection( * queryCollectionOptions({ * queryKey: ['todos'], * queryFn: async () => fetch('/api/todos').then(r => r.json()), * select: (data) => data.items, // Extract the array of items * queryClient, * schema: todoSchema, * getKey: (item) => item.id, * }) * ) */ // Overload for when schema is provided and select present export function queryCollectionOptions< T extends StandardSchemaV1, TQueryFn extends (context: QueryFunctionContext) => any, TError = unknown, TQueryKey extends QueryKey = QueryKey, TKey extends string | number = string | number, TQueryData = Awaited>, >( config: QueryCollectionConfig< InferSchemaOutput, TQueryFn, TError, TQueryKey, TKey, T > & { schema: T select: (data: TQueryData) => Array> }, ): CollectionConfig< InferSchemaOutput, TKey, T, QueryCollectionUtils, TKey, InferSchemaInput, TError> > & { schema: T utils: QueryCollectionUtils< InferSchemaOutput, TKey, InferSchemaInput, TError > } // Overload for when no schema is provided and select present export function queryCollectionOptions< T extends object, TQueryFn extends (context: QueryFunctionContext) => any = ( context: QueryFunctionContext, ) => any, TError = unknown, TQueryKey extends QueryKey = QueryKey, TKey extends string | number = string | number, TQueryData = Awaited>, >( config: QueryCollectionConfig< T, TQueryFn, TError, TQueryKey, TKey, never, TQueryData > & { schema?: never // prohibit schema select: (data: TQueryData) => Array }, ): CollectionConfig< T, TKey, never, QueryCollectionUtils > & { schema?: never // no schema in the result utils: QueryCollectionUtils } // Overload for when schema is provided export function queryCollectionOptions< T extends StandardSchemaV1, TError = unknown, TQueryKey extends QueryKey = QueryKey, TKey extends string | number = string | number, >( config: QueryCollectionConfig< InferSchemaOutput, ( context: QueryFunctionContext, ) => Array> | Promise>>, TError, TQueryKey, TKey, T > & { schema: T }, ): CollectionConfig< InferSchemaOutput, TKey, T, QueryCollectionUtils, TKey, InferSchemaInput, TError> > & { schema: T utils: QueryCollectionUtils< InferSchemaOutput, TKey, InferSchemaInput, TError > } // Overload for when no schema is provided export function queryCollectionOptions< T extends object, TError = unknown, TQueryKey extends QueryKey = QueryKey, TKey extends string | number = string | number, >( config: QueryCollectionConfig< T, (context: QueryFunctionContext) => Array | Promise>, TError, TQueryKey, TKey > & { schema?: never // prohibit schema }, ): CollectionConfig< T, TKey, never, QueryCollectionUtils > & { schema?: never // no schema in the result utils: QueryCollectionUtils } export function queryCollectionOptions( config: QueryCollectionConfig< Record, (context: QueryFunctionContext) => any >, ): CollectionConfig< Record, string | number, never, QueryCollectionUtils > & { utils: QueryCollectionUtils } { const { queryKey, queryFn, select, queryClient, enabled, refetchInterval, retry, retryDelay, staleTime, gcTime, persistedGcTime, getKey, onInsert, onUpdate, onDelete, meta, ...baseCollectionConfig } = config // Default to eager sync mode if not provided const syncMode = baseCollectionConfig.syncMode ?? `eager` // Compute the base query key once for cache lookups. // All derived keys (from on-demand predicates or function-based queryKey) must // share this prefix so that queryCache.findAll({ queryKey: baseKey }) can find them. const baseKey: QueryKey = typeof queryKey === `function` ? (queryKey({}) as unknown as QueryKey) : (queryKey as unknown as QueryKey) /** * Validates that a derived query key extends the base key prefix. * TanStack Query uses prefix matching in findAll(), so all keys for this collection * must start with baseKey for stale cache updates to work correctly. */ const validateQueryKeyPrefix = (key: QueryKey): void => { if (typeof queryKey !== `function`) return const isValidPrefix = key.length >= baseKey.length && baseKey.every((segment, i) => deepEquals(segment, key[i])) if (!isValidPrefix) { console.warn( `[QueryCollection] queryKey function must return keys that extend the base key prefix. ` + `Base: ${JSON.stringify(baseKey)}, Got: ${JSON.stringify(key)}. ` + `This can cause stale cache issues.`, ) } } // Validate required parameters // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (!queryKey) { throw new QueryKeyRequiredError() } // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (!queryFn) { throw new QueryFnRequiredError() } // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (!queryClient) { throw new QueryClientRequiredError() } // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition if (!getKey) { throw new GetKeyRequiredError() } /** State object to hold error tracking and observer reference */ const state: QueryCollectionState = { lastError: undefined as any, errorCount: 0, lastErrorUpdatedAt: 0, observers: new Map< string, QueryObserver, any, Array, Array, any> >(), } // hashedQueryKey → queryKey const hashToQueryKey = new Map() // queryKey → Set const queryToRows = new Map>() // RowKey → Set const rowToQueries = new Map>() // queryKey → QueryObserver's unsubscribe function const unsubscribes = new Map void>() // queryKey → reference count (how many loadSubset calls are active) // Reference counting for QueryObserver lifecycle management // ========================================================= // Tracks how many live query subscriptions are using each QueryObserver. // Multiple live queries with identical predicates share the same QueryObserver for efficiency. // // Lifecycle: // - Increment: when createQueryFromOpts creates or reuses an observer // - Decrement: when subscription.unsubscribe() passes predicates to collection._sync.unloadSubset() // - Reset: when cleanupQuery() is triggered by TanStack Query's cache GC // // When refcount reaches 0, unloadSubset(): // 1. Computes the same queryKey from the predicates // 2. Uses existing machinery (queryToRows map) to find rows that query loaded // 3. Decrements refcount and GCs rows where count reaches 0 const queryRefCounts = new Map() // Helper function to add a row to the internal state const addRow = (rowKey: string | number, hashedQueryKey: string) => { const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() rowToQueriesSet.add(hashedQueryKey) rowToQueries.set(rowKey, rowToQueriesSet) const queryToRowsSet = queryToRows.get(hashedQueryKey) || new Set() queryToRowsSet.add(rowKey) queryToRows.set(hashedQueryKey, queryToRowsSet) } // Helper function to remove a row from the internal state const removeRow = (rowKey: string | number, hashedQuerKey: string) => { const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() rowToQueriesSet.delete(hashedQuerKey) rowToQueries.set(rowKey, rowToQueriesSet) const queryToRowsSet = queryToRows.get(hashedQuerKey) || new Set() queryToRowsSet.delete(rowKey) queryToRows.set(hashedQuerKey, queryToRowsSet) return rowToQueriesSet.size === 0 } const internalSync: SyncConfig[`sync`] = (params) => { const { begin, write, commit, markReady, collection, metadata } = params const persistedMetadata = metadata as | QuerySyncMetadataWithPersistedScan | undefined // Track whether sync has been started let syncStarted = false let startupRetentionSettled = false const retainedQueriesPendingRevalidation = new Set() const effectivePersistedGcTimes = new Map() const persistedRetentionTimers = new Map< string, ReturnType >() let persistedRetentionMaintenance = Promise.resolve() const getRowMetadata = (rowKey: string | number) => { return (metadata?.row.get(rowKey) ?? collection._state.syncedMetadata.get(rowKey)) as | Record | undefined } const getPersistedOwners = (rowKey: string | number) => { const rowMetadata = getRowMetadata(rowKey) const queryMetadata = rowMetadata?.queryCollection if (!queryMetadata || typeof queryMetadata !== `object`) { return new Set() } const owners = (queryMetadata as Record).owners if (!owners || typeof owners !== `object`) { return new Set() } return new Set(Object.keys(owners as Record)) } const setPersistedOwners = ( rowKey: string | number, owners: Set, ) => { if (!metadata) { return } const currentMetadata = { ...(getRowMetadata(rowKey) ?? {}) } if (owners.size === 0) { delete currentMetadata.queryCollection if (Object.keys(currentMetadata).length === 0) { metadata.row.delete(rowKey) } else { metadata.row.set(rowKey, currentMetadata) } return } metadata.row.set(rowKey, { ...currentMetadata, queryCollection: { owners: Object.fromEntries( Array.from(owners.values()).map((owner) => [owner, true]), ), }, }) } const parsePersistedQueryRetentionEntry = ( value: unknown, expectedHash: string, ): PersistedQueryRetentionEntry | undefined => { if (!value || typeof value !== `object`) { return undefined } const record = value as Record if (record.queryHash !== expectedHash) { return undefined } if (record.mode === `until-revalidated`) { return { queryHash: expectedHash, mode: `until-revalidated`, } } if ( record.mode === `ttl` && typeof record.expiresAt === `number` && Number.isFinite(record.expiresAt) ) { return { queryHash: expectedHash, mode: `ttl`, expiresAt: record.expiresAt, } } return undefined } const runPersistedRetentionMaintenance = (task: () => Promise) => { persistedRetentionMaintenance = persistedRetentionMaintenance.then( task, task, ) return persistedRetentionMaintenance } const cancelPersistedRetentionExpiry = (hashedQueryKey: string) => { const timer = persistedRetentionTimers.get(hashedQueryKey) if (timer) { clearTimeout(timer) persistedRetentionTimers.delete(hashedQueryKey) } } const getHydratedOwnedRowsForQueryBaseline = (hashedQueryKey: string) => { const knownRows = queryToRows.get(hashedQueryKey) if (knownRows) { return new Set(knownRows) } const ownedRows = new Set() for (const [rowKey] of collection._state.syncedData.entries()) { const owners = getPersistedOwners(rowKey) if (owners.size === 0) { continue } rowToQueries.set(rowKey, new Set(owners)) owners.forEach((owner) => { const queryToRowsSet = queryToRows.get(owner) || new Set() queryToRowsSet.add(rowKey) queryToRows.set(owner, queryToRowsSet) }) if (owners.has(hashedQueryKey)) { ownedRows.add(rowKey) } } return ownedRows } const loadPersistedBaselineForQuery = async ( hashedQueryKey: string, ): Promise< Map< string | number, { value: any owners: Set } > > => { const knownRows = queryToRows.get(hashedQueryKey) if ( knownRows && Array.from(knownRows).every((rowKey) => collection.has(rowKey)) ) { const baseline = new Map< string | number, { value: any; owners: Set } >() knownRows.forEach((rowKey) => { const value = collection.get(rowKey) const owners = rowToQueries.get(rowKey) if (value && owners) { baseline.set(rowKey, { value, owners: new Set(owners), }) } }) return baseline } const scanPersisted = persistedMetadata?.row.scanPersisted if (!scanPersisted) { const baseline = new Map< string | number, { value: any; owners: Set } >() getHydratedOwnedRowsForQueryBaseline(hashedQueryKey).forEach( (rowKey) => { const value = collection.get(rowKey) const owners = rowToQueries.get(rowKey) if (value && owners) { baseline.set(rowKey, { value, owners: new Set(owners), }) } }, ) return baseline } const baseline = new Map< string | number, { value: any; owners: Set } >() const scannedRows = await scanPersisted() scannedRows.forEach((row) => { const rowMetadata = row.metadata as Record | undefined const queryMetadata = rowMetadata?.queryCollection if (!queryMetadata || typeof queryMetadata !== `object`) { return } const owners = (queryMetadata as Record).owners if (!owners || typeof owners !== `object`) { return } const ownerSet = new Set(Object.keys(owners as Record)) if (ownerSet.size === 0) { return } rowToQueries.set(row.key, new Set(ownerSet)) ownerSet.forEach((owner) => { const queryToRowsSet = queryToRows.get(owner) || new Set() queryToRowsSet.add(row.key) queryToRows.set(owner, queryToRowsSet) }) if (ownerSet.has(hashedQueryKey)) { baseline.set(row.key, { value: row.value, owners: ownerSet, }) } }) return baseline } const cleanupPersistedPlaceholder = async (hashedQueryKey: string) => { if (!metadata) { return } const baseline = await loadPersistedBaselineForQuery(hashedQueryKey) const rowsToDelete: Array = [] begin() baseline.forEach(({ value: oldItem, owners }, rowKey) => { owners.delete(hashedQueryKey) setPersistedOwners(rowKey, owners) const needToRemove = removeRow(rowKey, hashedQueryKey) if (needToRemove) { rowsToDelete.push(oldItem) } }) rowsToDelete.forEach((row) => { write({ type: `delete`, value: row }) }) metadata.collection.delete( `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, ) commit() } const schedulePersistedRetentionExpiry = ( entry: PersistedQueryRetentionEntry, ) => { if (entry.mode !== `ttl`) { return } cancelPersistedRetentionExpiry(entry.queryHash) const delay = Math.max(0, entry.expiresAt - Date.now()) const timer = setTimeout(() => { persistedRetentionTimers.delete(entry.queryHash) void runPersistedRetentionMaintenance(async () => { const currentEntry = metadata?.collection.get( `${QUERY_COLLECTION_GC_PREFIX}${entry.queryHash}`, ) const parsedCurrentEntry = parsePersistedQueryRetentionEntry( currentEntry, entry.queryHash, ) if ( !parsedCurrentEntry || parsedCurrentEntry.mode !== `ttl` || parsedCurrentEntry.expiresAt > Date.now() ) { return } await cleanupPersistedPlaceholder(entry.queryHash) }) }, delay) persistedRetentionTimers.set(entry.queryHash, timer) } const consumePersistedQueryRetentionAtStartup = async () => { if (!metadata) { return } const retentionEntries = metadata.collection.list( QUERY_COLLECTION_GC_PREFIX, ) const now = Date.now() for (const { key, value } of retentionEntries) { const hashedQueryKey = key.slice(QUERY_COLLECTION_GC_PREFIX.length) const parsed = parsePersistedQueryRetentionEntry(value, hashedQueryKey) if (!parsed) { continue } if (parsed.mode === `ttl` && parsed.expiresAt <= now) { await cleanupPersistedPlaceholder(parsed.queryHash) } else if (parsed.mode === `ttl`) { schedulePersistedRetentionExpiry(parsed) } } } /** * Generate a consistent query key from LoadSubsetOptions. * CRITICAL: Must use identical logic in both createQueryFromOpts and unloadSubset * so that refcount increment/decrement operations target the same hashedQueryKey. * Inconsistent keys would cause refcount leaks and prevent proper cleanup. */ const generateQueryKeyFromOptions = (opts: LoadSubsetOptions): QueryKey => { if (typeof queryKey === `function`) { // Function-based queryKey: use it to build the key from opts return queryKey(opts) } else if (syncMode === `on-demand`) { // Static queryKey in on-demand mode: automatically append serialized predicates // to create separate cache entries for different predicate combinations const serialized = serializeLoadSubsetOptions(opts) return serialized !== undefined ? [...queryKey, serialized] : queryKey } else { // Static queryKey in eager mode: use as-is return queryKey } } const startupRetentionEntries = metadata?.collection.list( QUERY_COLLECTION_GC_PREFIX, ) const startupRetentionMaintenancePromise = !startupRetentionEntries || startupRetentionEntries.length === 0 ? (() => { startupRetentionSettled = true return Promise.resolve() })() : runPersistedRetentionMaintenance(async () => { try { await consumePersistedQueryRetentionAtStartup() } finally { startupRetentionSettled = true } }) const createQueryFromOpts = ( opts: LoadSubsetOptions = {}, queryFunction: typeof queryFn = queryFn, ): true | Promise => { if (!startupRetentionSettled) { return startupRetentionMaintenancePromise.then(() => { const resumed = createQueryFromOpts(opts, queryFunction) return resumed === true ? undefined : resumed }) } // Generate key using common function const key = generateQueryKeyFromOptions(opts) const hashedQueryKey = hashKey(key) const extendedMeta = { ...meta, loadSubsetOptions: opts } const retainedEntry = metadata?.collection.get( `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, ) if ( parsePersistedQueryRetentionEntry(retainedEntry, hashedQueryKey) !== undefined ) { retainedQueriesPendingRevalidation.add(hashedQueryKey) } cancelPersistedRetentionExpiry(hashedQueryKey) validateQueryKeyPrefix(key) if (state.observers.has(hashedQueryKey)) { // We already have a query for this queryKey // Increment reference count since another consumer is using this observer queryRefCounts.set( hashedQueryKey, (queryRefCounts.get(hashedQueryKey) || 0) + 1, ) // Get the current result and return based on its state const observer = state.observers.get(hashedQueryKey)! const currentResult = observer.getCurrentResult() if (currentResult.isSuccess) { // Data is already available, return true synchronously return true } else if (currentResult.isError) { // Error already occurred, reject immediately return Promise.reject(currentResult.error) } else { // Check QueryClient cache directly - observer's getCurrentResult() may show // a loading state even when data exists in cache. This happens because observer // state can lag behind the QueryClient cache during unsubscribe/resubscribe // cycles (e.g., when a live query is cleaned up and recreated). const cachedData = queryClient.getQueryData(key) if (cachedData !== undefined) { return true } // Query is still loading, wait for the first result return new Promise((resolve, reject) => { const unsubscribe = observer.subscribe((result) => { // Use a microtask in case `subscribe` is called synchronously, before `unsubscribe` is initialized queueMicrotask(() => { if (result.isSuccess) { unsubscribe() resolve() } else if (result.isError) { unsubscribe() reject(result.error) } }) }) }) } } const observerOptions: QueryObserverOptions< Array, any, Array, Array, any > = { queryKey: key, queryFn: queryFunction, meta: extendedMeta, structuralSharing: true, notifyOnChangeProps: `all`, // Only include options that are explicitly defined to allow QueryClient defaultOptions to be used ...(enabled !== undefined && { enabled }), ...(refetchInterval !== undefined && { refetchInterval }), ...(retry !== undefined && { retry }), ...(retryDelay !== undefined && { retryDelay }), ...(staleTime !== undefined && { staleTime }), ...(gcTime !== undefined && { gcTime }), } const localObserver = new QueryObserver< Array, any, Array, Array, any >(queryClient, observerOptions) const resolvedQueryGcTime = queryClient.getQueryCache().find({ queryKey: key, exact: true, })?.gcTime const effectivePersistedGcTime = persistedGcTime ?? resolvedQueryGcTime hashToQueryKey.set(hashedQueryKey, key) state.observers.set(hashedQueryKey, localObserver) if (effectivePersistedGcTime !== undefined) { effectivePersistedGcTimes.set(hashedQueryKey, effectivePersistedGcTime) } else { effectivePersistedGcTimes.delete(hashedQueryKey) } // Increment reference count for this query queryRefCounts.set( hashedQueryKey, (queryRefCounts.get(hashedQueryKey) || 0) + 1, ) // Check if data already exists in QueryClient cache (persisted within gcTime from // a previous observer). This avoids creating unnecessary promises and subscription // delays when recreating an observer for data that's already cached. const cachedData = queryClient.getQueryData(key) if (cachedData !== undefined) { // Still subscribe if sync is active so we receive future updates if (syncStarted || collection.subscriberCount > 0) { subscribeToQuery(localObserver, hashedQueryKey) } return true } // Create a promise that resolves when the query result is first available const readyPromise = new Promise((resolve, reject) => { const unsubscribe = localObserver.subscribe((result) => { // Use a microtask in case `subscribe` is called synchronously, before `unsubscribe` is initialized queueMicrotask(() => { if (result.isSuccess) { unsubscribe() resolve() } else if (result.isError) { unsubscribe() reject(result.error) } }) }) }) // If sync has started or there are subscribers to the collection, subscribe to the query straight away // This creates the main subscription that handles data updates if (syncStarted || collection.subscriberCount > 0) { subscribeToQuery(localObserver, hashedQueryKey) } return readyPromise } type UpdateHandler = Parameters[0] const applySuccessfulResult = ( queryKey: QueryKey, result: QueryObserverResult, persistedBaseline?: Map< string | number, { value: any owners: Set } >, ) => { const hashedQueryKey = hashKey(queryKey) if (collection.status === `cleaned-up`) { return } // Clear error state state.lastError = undefined state.errorCount = 0 const rawData = result.data const newItemsArray = select ? select(rawData) : rawData if ( !Array.isArray(newItemsArray) || newItemsArray.some((item) => typeof item !== `object`) ) { const errorMessage = select ? `@tanstack/query-db-collection: select() must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` : `@tanstack/query-db-collection: queryFn must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` console.error(errorMessage) return } const currentSyncedItems: Map = new Map( collection._state.syncedData.entries(), ) const shouldUsePersistedBaseline = persistedBaseline !== undefined const previouslyOwnedRows = shouldUsePersistedBaseline ? new Set(persistedBaseline.keys()) : getHydratedOwnedRowsForQueryBaseline(hashedQueryKey) const newItemsMap = new Map() newItemsArray.forEach((item) => { const key = getKey(item) newItemsMap.set(key, item) }) begin() if (metadata) { metadata.collection.delete( `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, ) } previouslyOwnedRows.forEach((key) => { const oldItem = shouldUsePersistedBaseline ? persistedBaseline.get(key)?.value : currentSyncedItems.get(key) if (!oldItem) { return } const newItem = newItemsMap.get(key) if (!newItem) { const owners = getPersistedOwners(key) owners.delete(hashedQueryKey) setPersistedOwners(key, owners) const needToRemove = removeRow(key, hashedQueryKey) if (needToRemove) { write({ type: `delete`, value: oldItem }) } } else if (!deepEquals(oldItem, newItem)) { write({ type: `update`, value: newItem }) } }) newItemsMap.forEach((newItem, key) => { const owners = getPersistedOwners(key) if (!owners.has(hashedQueryKey)) { owners.add(hashedQueryKey) setPersistedOwners(key, owners) } addRow(key, hashedQueryKey) if (!currentSyncedItems.has(key)) { write({ type: `insert`, value: newItem }) } }) commit() retainedQueriesPendingRevalidation.delete(hashedQueryKey) cancelPersistedRetentionExpiry(hashedQueryKey) // Mark collection as ready after first successful query result markReady() } const reconcileSuccessfulResult = async ( queryKey: QueryKey, result: QueryObserverResult, ) => { const hashedQueryKey = hashKey(queryKey) const persistedBaseline = await loadPersistedBaselineForQuery(hashedQueryKey) if (collection.status === `cleaned-up`) { return } applySuccessfulResult(queryKey, result, persistedBaseline) } // eslint-disable-next-line no-shadow const makeQueryResultHandler = (queryKey: QueryKey) => { const hashedQueryKey = hashKey(queryKey) const handleQueryResult: UpdateHandler = (result) => { if (result.isSuccess) { // Skip processing this result while data refreshes are deferred. // Optimistic state covers the gap. Once the barrier resolves, // trigger a fresh refetch to get authoritative data. if (collection.deferDataRefresh) { collection.deferDataRefresh.then(() => { const observer = state.observers.get(hashedQueryKey) if (observer) { observer.refetch().catch(() => { // Errors handled by the next handleQueryResult invocation }) } }) return } if (retainedQueriesPendingRevalidation.has(hashedQueryKey)) { void reconcileSuccessfulResult(queryKey, result).catch((error) => { console.error( `[QueryCollection] Error reconciling query ${String(queryKey)}:`, error, ) }) } else { applySuccessfulResult(queryKey, result) } } else if (result.isError) { const isNewError = result.errorUpdatedAt !== state.lastErrorUpdatedAt || result.error !== state.lastError if (isNewError) { state.lastError = result.error state.errorCount++ state.lastErrorUpdatedAt = result.errorUpdatedAt } console.error( `[QueryCollection] Error observing query ${String(queryKey)}:`, result.error, ) // Mark collection as ready even on error to avoid blocking apps markReady() } } return handleQueryResult } const isSubscribed = (hashedQueryKey: string) => { return unsubscribes.has(hashedQueryKey) } const subscribeToQuery = ( observer: QueryObserver, any, Array, Array, any>, hashedQueryKey: string, ) => { if (!isSubscribed(hashedQueryKey)) { const cachedQueryKey = hashToQueryKey.get(hashedQueryKey)! const handleQueryResult = makeQueryResultHandler(cachedQueryKey) const unsubscribeFn = observer.subscribe(handleQueryResult) unsubscribes.set(hashedQueryKey, unsubscribeFn) // Process the current result immediately if available // This ensures data is synced when resubscribing to a query with cached data const currentResult = observer.getCurrentResult() if (currentResult.isSuccess || currentResult.isError) { handleQueryResult(currentResult) } } } const subscribeToQueries = () => { state.observers.forEach(subscribeToQuery) } const unsubscribeFromQueries = () => { unsubscribes.forEach((unsubscribeFn) => { unsubscribeFn() }) unsubscribes.clear() } // Mark that sync has started syncStarted = true // Set up event listener for subscriber changes const unsubscribeFromCollectionEvents = collection.on( `subscribers:change`, ({ subscriberCount }) => { if (subscriberCount > 0) { subscribeToQueries() } else if (subscriberCount === 0) { unsubscribeFromQueries() } }, ) // If syncMode is eager, create the initial query without any predicates if (syncMode === `eager`) { // Catch any errors to prevent unhandled rejections const initialResult = createQueryFromOpts({}) if (initialResult instanceof Promise) { initialResult.catch(() => { // Errors are already handled by the query result handler }) } } else { if (startupRetentionSettled) { markReady() } else { // In on-demand mode, there is no initial query, but retained-placeholder // maintenance still needs to finish before the collection is treated as ready. void startupRetentionMaintenancePromise.then(() => { markReady() }) } } // Always subscribe when sync starts (this could be from preload(), startSync config, or first subscriber) // We'll dynamically unsubscribe/resubscribe based on subscriber count to maintain staleTime behavior subscribeToQueries() // Ensure we process any existing query data (QueryObserver doesn't invoke its callback automatically with initial state) state.observers.forEach((observer, hashedQueryKey) => { const cachedQueryKey = hashToQueryKey.get(hashedQueryKey)! const handleQueryResult = makeQueryResultHandler(cachedQueryKey) handleQueryResult(observer.getCurrentResult()) }) /** * Perform row-level cleanup and remove all tracking for a query. * Callers are responsible for ensuring the query is safe to cleanup. */ const cleanupQueryInternal = (hashedQueryKey: string) => { unsubscribes.get(hashedQueryKey)?.() unsubscribes.delete(hashedQueryKey) cancelPersistedRetentionExpiry(hashedQueryKey) retainedQueriesPendingRevalidation.delete(hashedQueryKey) const rowKeys = queryToRows.get(hashedQueryKey) ?? new Set() const nextOwnersByRow = new Map>() const rowsToDelete: Array = [] rowKeys.forEach((rowKey) => { const queries = rowToQueries.get(rowKey) if (!queries) { return } const nextOwners = new Set(queries) nextOwners.delete(hashedQueryKey) nextOwnersByRow.set(rowKey, nextOwners) if (nextOwners.size === 0 && collection.has(rowKey)) { rowsToDelete.push(collection.get(rowKey)) } }) const shouldWriteMetadata = metadata !== undefined && nextOwnersByRow.size > 0 const needsTransaction = shouldWriteMetadata || rowsToDelete.length > 0 if (needsTransaction) { begin() } nextOwnersByRow.forEach((owners, rowKey) => { if (owners.size === 0) { rowToQueries.delete(rowKey) } else { rowToQueries.set(rowKey, owners) } if (shouldWriteMetadata) { setPersistedOwners(rowKey, owners) } }) if (rowsToDelete.length > 0) { rowsToDelete.forEach((row) => { write({ type: `delete`, value: row }) }) } if (needsTransaction) { commit() } state.observers.delete(hashedQueryKey) queryToRows.delete(hashedQueryKey) hashToQueryKey.delete(hashedQueryKey) queryRefCounts.delete(hashedQueryKey) effectivePersistedGcTimes.delete(hashedQueryKey) } /** * Attempt to cleanup a query when it appears unused. * Respects refcounts and invalidateQueries cycles via hasListeners(). */ const cleanupQueryIfIdle = (hashedQueryKey: string) => { const refcount = queryRefCounts.get(hashedQueryKey) || 0 const observer = state.observers.get(hashedQueryKey) const effectivePersistedGcTime = effectivePersistedGcTimes.get(hashedQueryKey) if (refcount <= 0) { // Drop our subscription so hasListeners reflects only active consumers unsubscribes.get(hashedQueryKey)?.() unsubscribes.delete(hashedQueryKey) } const hasListeners = observer?.hasListeners() ?? false if (hasListeners) { // During invalidateQueries, TanStack Query keeps internal listeners alive. // Leave refcount at 0 but keep observer so it can resubscribe. queryRefCounts.set(hashedQueryKey, 0) return } // No listeners means the query is truly idle. // Even if refcount > 0, we treat hasListeners as authoritative to prevent leaks. // This can happen if subscriptions are GC'd without calling unloadSubset. if (refcount > 0) { console.warn( `[cleanupQueryIfIdle] Invariant violation: refcount=${refcount} but no listeners. Cleaning up to prevent leak.`, { hashedQueryKey }, ) } if ( effectivePersistedGcTime !== undefined && metadata && persistedMetadata?.row.scanPersisted ) { begin() metadata.collection.set( `${QUERY_COLLECTION_GC_PREFIX}${hashedQueryKey}`, { queryHash: hashedQueryKey, mode: effectivePersistedGcTime === Number.POSITIVE_INFINITY ? `until-revalidated` : `ttl`, ...(effectivePersistedGcTime === Number.POSITIVE_INFINITY ? {} : { expiresAt: Date.now() + effectivePersistedGcTime }), }, ) commit() if (effectivePersistedGcTime !== Number.POSITIVE_INFINITY) { schedulePersistedRetentionExpiry({ queryHash: hashedQueryKey, mode: `ttl`, expiresAt: Date.now() + effectivePersistedGcTime, }) } unsubscribes.get(hashedQueryKey)?.() unsubscribes.delete(hashedQueryKey) state.observers.delete(hashedQueryKey) hashToQueryKey.delete(hashedQueryKey) queryRefCounts.set(hashedQueryKey, 0) return } cleanupQueryInternal(hashedQueryKey) } /** * Force cleanup used by explicit collection cleanup. * Ignores refcounts/hasListeners and removes everything. */ const forceCleanupQuery = (hashedQueryKey: string) => { cleanupQueryInternal(hashedQueryKey) } // Subscribe to the query client's cache to handle queries that are GCed by tanstack query const unsubscribeQueryCache = queryClient .getQueryCache() .subscribe((event) => { const hashedKey = event.query.queryHash if (event.type === `removed`) { // Only cleanup if this is OUR query (we track it) if (hashToQueryKey.has(hashedKey)) { // TanStack Query GC'd this query after gcTime expired. // Use the guarded cleanup path to avoid deleting rows for active queries. cleanupQueryIfIdle(hashedKey) } } }) const cleanup = async () => { unsubscribeFromCollectionEvents() unsubscribeFromQueries() persistedRetentionTimers.forEach((timer) => { clearTimeout(timer) }) persistedRetentionTimers.clear() const allQueryKeys = [...hashToQueryKey.values()] const allHashedKeys = [...state.observers.keys()] // Force cleanup all queries (explicit cleanup path) // This ignores hasListeners and always cleans up for (const hashedKey of allHashedKeys) { forceCleanupQuery(hashedKey) } // Unsubscribe from cache events (cleanup already happened above) unsubscribeQueryCache() // Remove queries from TanStack Query cache await Promise.all( allQueryKeys.map(async (qKey) => { await queryClient.cancelQueries({ queryKey: qKey, exact: true }) queryClient.removeQueries({ queryKey: qKey, exact: true }) }), ) } /** * Unload a query subset - the subscription-based cleanup path (on-demand mode). * * Called when a live query subscription unsubscribes (via collection._sync.unloadSubset()). * * Flow: * 1. Receives the same predicates that were passed to loadSubset * 2. Computes the queryKey using generateQueryKeyFromOptions (same logic as loadSubset) * 3. Decrements refcount * 4. If refcount reaches 0: * - Checks hasListeners() to detect invalidateQueries cycles * - If hasListeners is true: resets refcount (TanStack Query keeping observer alive) * - If hasListeners is false: calls forceCleanupQuery() to perform row-level GC * * The hasListeners() check prevents premature cleanup during invalidateQueries: * - invalidateQueries causes temporary unsubscribe/resubscribe * - During unsubscribe, our refcount drops to 0 * - But observer.hasListeners() is still true (TanStack Query's internal listeners) * - We skip cleanup and reset refcount, allowing resubscribe to succeed * * We don't cancel in-flight requests. Unsubscribing from the observer is sufficient * to prevent late-arriving data from being processed. The request completes and is cached * by TanStack Query, allowing quick remounts to restore data without refetching. */ const unloadSubset = (options: LoadSubsetOptions) => { // 1. Same predicates → 2. Same queryKey const key = generateQueryKeyFromOptions(options) const hashedQueryKey = hashKey(key) // 3. Decrement refcount const currentCount = queryRefCounts.get(hashedQueryKey) || 0 const newCount = currentCount - 1 // Update refcount if (newCount <= 0) { queryRefCounts.set(hashedQueryKey, 0) cleanupQueryIfIdle(hashedQueryKey) } else { // Still have other references, just decrement queryRefCounts.set(hashedQueryKey, newCount) } } // Create deduplicated loadSubset wrapper for non-eager modes // This prevents redundant snapshot requests when multiple concurrent // live queries request overlapping or subset predicates const loadSubsetDedupe = syncMode === `eager` ? undefined : createQueryFromOpts return { loadSubset: loadSubsetDedupe, unloadSubset: syncMode === `eager` ? undefined : unloadSubset, cleanup, } } /** * Refetch the query data * * Uses queryObserver.refetch() because: * - Bypasses `enabled: false` to support manual/imperative refetch patterns (e.g., button-triggered fetch) * - Ensures clearError() works even when enabled: false * - Always refetches THIS specific collection (exact targeting via observer) * - Respects retry, retryDelay, and other observer options * * This matches TanStack Query's hook behavior where refetch() bypasses enabled: false. * See: https://tanstack.com/query/latest/docs/framework/react/guides/disabling-queries * * Used by both: * - utils.refetch() - for explicit user-triggered refetches * - Internal handlers (onInsert/onUpdate/onDelete) - after mutations to get fresh data * * @returns Promise that resolves when the refetch is complete, with QueryObserverResult */ const refetch: RefetchFn = async (opts) => { const allQueryKeys = [...hashToQueryKey.values()] const refetchPromises = allQueryKeys.map((qKey) => { const queryObserver = state.observers.get(hashKey(qKey))! return queryObserver.refetch({ throwOnError: opts?.throwOnError, }) }) return Promise.all(refetchPromises) } /** * Updates a single query key in the cache with new items, handling both direct arrays * and wrapped response formats (when `select` is used). */ const updateCacheDataForKey = (key: QueryKey, items: Array): void => { if (select) { // When `select` is used, the cache contains a wrapped response (e.g., { data: [...], meta: {...} }) // We need to update the cache while preserving the wrapper structure queryClient.setQueryData(key, (oldData: any) => { if (!oldData || typeof oldData !== `object`) { // No existing cache or not an object - don't corrupt the cache return oldData } if (Array.isArray(oldData)) { // Cache is already a raw array (shouldn't happen with select, but handle it) return items } // Use the select function to identify which property contains the items array. // This is more robust than guessing based on property order. const selectedArray = select(oldData) if (Array.isArray(selectedArray)) { // Find the property that matches the selected array by reference equality for (const propKey of Object.keys(oldData)) { if (oldData[propKey] === selectedArray) { // Found the exact property - create a shallow copy with updated items return { ...oldData, [propKey]: items } } } } // Fallback: check common property names used for data arrays if (Array.isArray(oldData.data)) { return { ...oldData, data: items } } if (Array.isArray(oldData.items)) { return { ...oldData, items: items } } if (Array.isArray(oldData.results)) { return { ...oldData, results: items } } // Last resort: find first array property for (const propKey of Object.keys(oldData)) { if (Array.isArray(oldData[propKey])) { return { ...oldData, [propKey]: items } } } // Couldn't safely identify the array property - don't corrupt the cache // Return oldData unchanged to avoid breaking select return oldData }) } else { // No select - cache contains raw array, just set it directly queryClient.setQueryData(key, items) } } /** * Updates the query cache with new items for ALL query keys matching this collection, * including stale/inactive cache entries from destroyed observers. * * This prevents ghost items: when an observer is destroyed but gcTime > 0, TanStack Query * keeps the cached data. If syncedData changes (via writeDelete/writeInsert/writeUpdate) * after the observer is destroyed, the stale cache becomes inconsistent. When a new observer * later picks up this stale cache, makeQueryResultHandler would create spurious sync * operations (re-inserting deleted items, reverting updated values, etc). * * By updating all cache entries (active and stale), we ensure the cache always reflects * the current syncedData state. */ const updateCacheData = (items: Array): void => { const allCached = queryClient.getQueryCache().findAll({ queryKey: baseKey }) if (allCached.length > 0) { for (const query of allCached) { updateCacheDataForKey(query.queryKey, items) } } else { // Fallback: no queries in cache yet, seed the base query key. // This handles the case where updateCacheData is called before any queries are created. updateCacheDataForKey(baseKey, items) } } // Create write context for manual write operations let writeContext: { collection: any queryClient: QueryClient queryKey: Array getKey: (item: any) => string | number begin: () => void write: (message: Omit, `key`>) => void commit: () => void updateCacheData?: (items: Array) => void } | null = null // Enhanced internalSync that captures write functions for manual use const enhancedInternalSync: SyncConfig[`sync`] = (params) => { const { begin, write, commit, collection } = params // Get the base query key for the context (handle both static and function-based keys) const contextQueryKey = typeof queryKey === `function` ? (queryKey({}) as unknown as Array) : (queryKey as unknown as Array) // Store references for manual write operations writeContext = { collection, queryClient, queryKey: contextQueryKey, getKey: getKey as (item: any) => string | number, begin, write, commit, updateCacheData, } // Call the original internalSync logic return internalSync(params) } // Create write utils using the manual-sync module const writeUtils = createWriteUtils( () => writeContext, ) // Create wrapper handlers for direct persistence operations that handle refetching const wrappedOnInsert = onInsert ? async (params: InsertMutationFnParams) => { const handlerResult = (await onInsert(params)) ?? {} const shouldRefetch = (handlerResult as { refetch?: boolean }).refetch !== false if (shouldRefetch) { await refetch() } return handlerResult } : undefined const wrappedOnUpdate = onUpdate ? async (params: UpdateMutationFnParams) => { const handlerResult = (await onUpdate(params)) ?? {} const shouldRefetch = (handlerResult as { refetch?: boolean }).refetch !== false if (shouldRefetch) { await refetch() } return handlerResult } : undefined const wrappedOnDelete = onDelete ? async (params: DeleteMutationFnParams) => { const handlerResult = (await onDelete(params)) ?? {} const shouldRefetch = (handlerResult as { refetch?: boolean }).refetch !== false if (shouldRefetch) { await refetch() } return handlerResult } : undefined // Create utils instance with state and dependencies passed explicitly const utils: any = new QueryCollectionUtilsImpl(state, refetch, writeUtils) return { ...baseCollectionConfig, getKey, syncMode, sync: { sync: enhancedInternalSync }, onInsert: wrappedOnInsert, onUpdate: wrappedOnUpdate, onDelete: wrappedOnDelete, utils, } }