/** * TanStack Query v5 Adapter * * Provides integration between postgres.do queries and TanStack Query v5, * including query key generation, query options factories, and mutation patterns. * * @example * ```typescript * import { createQueryAdapter } from '@dotdo/tanstack' * * const adapter = createQueryAdapter({ * baseUrl: 'https://db.postgres.do', * database: 'mydb', * }) * * // Use with TanStack Query * const { data } = useQuery(adapter.queryOptions('SELECT * FROM users')) * * // Mutations * const { mutate } = useMutation(adapter.mutationOptions()) * mutate({ sql: 'INSERT INTO users (name) VALUES ($1)', params: ['Alice'] }) * ``` */ import type { BaseRecord, Collection } from './types' import { POSTGRES_IDENTIFIER_PATTERN, MAX_IDENTIFIER_LENGTH, } from '@dotdo/postgres-shared/validation' // ============================================================================ // Constants // ============================================================================ /** Default stale time: data is always considered stale (refetch on every access) */ const DEFAULT_STALE_TIME_MS = 0 /** Default garbage collection time: cached data expires after 5 minutes */ const DEFAULT_GC_TIME_MS = 5 * 60 * 1000 /** Default base URL for the postgres.do API */ const DEFAULT_BASE_URL = 'https://db.postgres.do' /** Number of consecutive failures before the circuit breaker opens */ const CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5 /** Default retry delay between query attempts in milliseconds */ const DEFAULT_RETRY_DELAY_MS = 100 /** Minimum HTTP status code considered a server error (retryable) */ const SERVER_ERROR_STATUS_THRESHOLD = 500 /** * Validates a PostgreSQL database name * Uses the same rules as identifiers but with database-specific error messages * * @param name - The database name to validate * @throws Error if the database name is invalid */ function validateDatabaseName(name: string): void { if (!name || name.length === 0) { throw new Error('Invalid database name: cannot be empty') } if (name.length > MAX_IDENTIFIER_LENGTH) { throw new Error( `Invalid database name "${name}": exceeds maximum length of ${MAX_IDENTIFIER_LENGTH} characters` ) } if (!POSTGRES_IDENTIFIER_PATTERN.test(name)) { throw new Error( `Invalid database name "${name}": must match PostgreSQL identifier rules (start with letter or underscore, contain only letters, numbers, and underscores)` ) } } // ============================================================================ // Types // ============================================================================ /** * Query key structure for postgres.do queries */ export type PostgresQueryKey = readonly [ 'postgres', string, // database string, // normalized SQL ...unknown[], // params ] /** * Configuration for the TanStack Query adapter */ export interface QueryAdapterConfig { /** Base URL for postgres.do API */ baseUrl?: string /** Database identifier */ database: string /** Default stale time in milliseconds */ defaultStaleTime?: number /** Default cache time (gcTime) in milliseconds */ defaultGcTime?: number /** Custom fetch function */ fetch?: typeof globalThis.fetch /** Key prefix for multi-tenant isolation */ keyPrefix?: string } /** * Query parameters for SELECT queries */ export interface QueryParams { /** SQL query string */ sql: string /** Query parameters */ params?: unknown[] /** Override stale time for this query */ staleTime?: number /** Override gc time for this query */ gcTime?: number /** Refetch interval */ refetchInterval?: number | false /** Whether to enable the query */ enabled?: boolean /** Timeout in milliseconds */ timeout?: number /** AbortSignal for query cancellation */ signal?: AbortSignal /** Number of retries on transient failures */ retry?: number /** Delay between retries in milliseconds */ retryDelay?: number /** Fetch policy: cache-first or network-only */ fetchPolicy?: 'cache-first' | 'network-only' /** Refetch on window focus */ refetchOnWindowFocus?: boolean /** Refetch on reconnect */ refetchOnReconnect?: boolean /** Whether to refetch in background when tab is hidden */ refetchIntervalInBackground?: boolean /** Retry backoff strategy */ retryBackoff?: 'exponential' | 'linear' /** Maximum retry delay in milliseconds */ maxRetryDelay?: number /** Placeholder data while loading */ placeholderData?: unknown[] | (() => unknown[]) /** Keep previous data while refetching */ keepPreviousData?: boolean /** Transform/select function for results */ select?: (data: unknown[]) => unknown[] } /** * Mutation parameters for INSERT/UPDATE/DELETE */ export interface MutationParams { /** SQL mutation string */ sql: string /** Mutation parameters */ params?: unknown[] } /** * Mutation context for optimistic updates */ export interface MutationContext { /** Previous data before mutation */ previousData?: T /** Query keys that will be invalidated */ invalidateKeys?: readonly PostgresQueryKey[] } /** * Result from a postgres.do query */ export interface QueryResult { rows: T[] fields?: Array<{ name: string; dataTypeID: number }> rowCount?: number command?: string } /** * Query options compatible with TanStack Query v5 * @see https://tanstack.com/query/v5/docs/framework/react/reference/queryOptions */ export interface PostgresQueryOptions { queryKey: PostgresQueryKey queryFn: () => Promise staleTime?: number gcTime?: number refetchInterval?: number | false enabled?: boolean refetchOnWindowFocus?: boolean refetchOnReconnect?: boolean refetchIntervalInBackground?: boolean placeholderData?: TData | (() => TData) select?: (data: TData) => unknown } /** * Mutation options compatible with TanStack Query v5 * @see https://tanstack.com/query/v5/docs/framework/react/reference/useMutation */ export interface PostgresMutationOptions< TData = QueryResult, TError = Error, TVariables = MutationParams, TContext = MutationContext, > { mutationKey?: readonly unknown[] mutationFn: (variables: TVariables) => Promise onMutate?: (variables: TVariables) => Promise | TContext | undefined onSuccess?: (data: TData, variables: TVariables, context: TContext | undefined) => Promise | void onError?: (error: TError, variables: TVariables, context: TContext | undefined) => Promise | void onSettled?: ( data: TData | undefined, error: TError | null, variables: TVariables, context: TContext | undefined ) => Promise | void } /** * Cache invalidation pattern */ export interface InvalidationPattern { /** Tables to invalidate (extracts from SQL) */ tables?: string[] /** Exact query keys to invalidate */ queryKeys?: readonly PostgresQueryKey[] /** Whether to invalidate all queries for the database */ all?: boolean } // ============================================================================ // Query Key Generation // ============================================================================ /** * Normalize query parameters: accepts either a SQL string or a QueryParams object. * Always returns a QueryParams object. */ export function normalizeQueryParams(params: QueryParams | string): QueryParams { return typeof params === 'string' ? { sql: params } : params } /** * Normalize SQL for consistent query key generation. * Removes extra whitespace and ensures consistent formatting of * parentheses and commas for reliable cache key matching. */ export function normalizeSQL(sql: string): string { return sql .trim() .replace(/\s+/g, ' ') .replace(/\(\s+/g, '(') .replace(/\s+\)/g, ')') .replace(/,\s*/g, ', ') } /** * Generate a query key from SQL and parameters * Follows TanStack Query best practices for key structure */ export function createQueryKey( database: string, sql: string, params?: unknown[] ): PostgresQueryKey { const normalizedSQL = normalizeSQL(sql) if (params && params.length > 0) { return ['postgres', database, normalizedSQL, ...params] as const } return ['postgres', database, normalizedSQL] as const } /** * Extract table names from SQL query for invalidation * Supports SELECT, INSERT, UPDATE, DELETE, FROM, JOIN */ export function extractTablesFromSQL(sql: string): string[] { const tables = new Set() // Match patterns: FROM table, INTO table, UPDATE table, JOIN table const patterns = [ /FROM\s+([a-z_][a-z0-9_]*)/gi, /INTO\s+([a-z_][a-z0-9_]*)/gi, /UPDATE\s+([a-z_][a-z0-9_]*)/gi, /JOIN\s+([a-z_][a-z0-9_]*)/gi, /DELETE\s+FROM\s+([a-z_][a-z0-9_]*)/gi, ] for (const pattern of patterns) { let match while ((match = pattern.exec(sql)) !== null) { const tableName = match[1] if (tableName) { tables.add(tableName.toLowerCase()) } } } return Array.from(tables) } /** * Determine mutation type from SQL */ export function getMutationType(sql: string): 'insert' | 'update' | 'delete' | 'unknown' { const trimmed = sql.trim().toUpperCase() if (trimmed.startsWith('INSERT')) return 'insert' if (trimmed.startsWith('UPDATE')) return 'update' if (trimmed.startsWith('DELETE')) return 'delete' return 'unknown' } /** * Check if SQL is a read-only query */ export function isReadOnlyQuery(sql: string): boolean { const trimmed = sql.trim().toUpperCase() return trimmed.startsWith('SELECT') || trimmed.startsWith('WITH') } // ============================================================================ // Query Adapter // ============================================================================ /** * Structured error with query context */ export class PostgresQueryError extends Error { sql?: string code?: string params?: unknown[] constraint?: string detail?: string isRetryable?: boolean constructor(message: string, context?: { sql?: string code?: string params?: unknown[] constraint?: string detail?: string isRetryable?: boolean status?: number }) { super(message) this.name = 'PostgresQueryError' this.sql = context?.sql this.code = context?.code this.params = context?.params this.constraint = context?.constraint this.detail = context?.detail // 4xx errors (client errors) are not retryable, 5xx (server errors) are retryable this.isRetryable = context?.isRetryable ?? (context?.status !== undefined ? context.status >= SERVER_ERROR_STATUS_THRESHOLD : false) } } /** * Circuit breaker state */ export interface CircuitState { state: 'closed' | 'open' | 'half-open' consecutiveFailures: number lastFailureAt?: number } /** * TanStack Query adapter for postgres.do */ export class QueryAdapter { private config: Required> & { keyPrefix?: string } private fetchFn: typeof globalThis.fetch private errorHandlers: Array<(event: { error: Error; sql: string; params?: unknown[] }) => void> = [] private circuitState: CircuitState = { state: 'closed', consecutiveFailures: 0 } private queryCache: Map = new Map() private inflightQueries: Map> = new Map() private seedCacheMap: Map = new Map() constructor(config: QueryAdapterConfig) { // Validate database name to prevent SQL injection validateDatabaseName(config.database) this.config = { baseUrl: config.baseUrl ?? DEFAULT_BASE_URL, database: config.database, defaultStaleTime: config.defaultStaleTime ?? DEFAULT_STALE_TIME_MS, defaultGcTime: config.defaultGcTime ?? DEFAULT_GC_TIME_MS, fetch: config.fetch ?? globalThis.fetch, keyPrefix: config.keyPrefix, } this.fetchFn = this.config.fetch } /** * Register an error event handler */ onError(handler: (event: { error: Error; sql: string; params?: unknown[] }) => void): void { this.errorHandlers.push(handler) } /** * Get the circuit breaker state */ getCircuitState(): CircuitState { return { ...this.circuitState } } /** * Seed the cache with known data */ seedCache(sql: string, data: T[]): void { const key = normalizeSQL(sql) this.seedCacheMap.set(key, data as unknown[]) const cacheKey = this.getCacheKey(sql) this.queryCache.set(cacheKey, { data, timestamp: Date.now() }) } /** * Create a mutation queue for sequential execution */ createMutationQueue(options: { concurrency: number }): { add: (params: MutationParams) => Promise } { const queue: Array<{ params: MutationParams; resolve: (v: unknown) => void; reject: (e: unknown) => void }> = [] let running = 0 const processQueue = async (): Promise => { while (queue.length > 0 && running < options.concurrency) { const item = queue.shift() if (!item) break running++ try { const result = await this.query(item.params.sql, item.params.params) item.resolve(result) } catch (e) { item.reject(e) } finally { running-- processQueue() } } } return { add: (params: MutationParams) => { return new Promise((resolve, reject) => { queue.push({ params, resolve, reject }) processQueue() }) }, } } private getCacheKey(sql: string, params?: unknown[]): string { return JSON.stringify([normalizeSQL(sql), ...(params ?? [])]) } private emitError(error: Error, sql: string, params?: unknown[]): void { this.circuitState.consecutiveFailures++ this.circuitState.lastFailureAt = Date.now() if (this.circuitState.consecutiveFailures >= CIRCUIT_BREAKER_FAILURE_THRESHOLD) { this.circuitState.state = 'open' } for (const handler of this.errorHandlers) { handler({ error, sql, params }) } } private resetCircuit(): void { this.circuitState.consecutiveFailures = 0 this.circuitState.state = 'closed' } /** * Execute a query against postgres.do */ async query(sql: string, params?: unknown[], options?: { signal?: AbortSignal; timeout?: number }): Promise> { const url = new URL(`/${this.config.database}/query`, this.config.baseUrl) const fetchPromise = this.fetchFn(url.toString(), { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ sql, params }), }) // Create a promise that resolves to the response, handling timeout and abort let response: Response try { if (options?.timeout) { const timeoutPromise = new Promise((_resolve, reject) => { setTimeout(() => reject(new Error('Query timeout exceeded')), options.timeout) }) response = await Promise.race([fetchPromise, timeoutPromise]) as Response } else if (options?.signal) { if (options.signal.aborted) { throw new Error('Query aborted') } const abortPromise = new Promise((_resolve, reject) => { options.signal!.addEventListener('abort', () => reject(new Error('Query aborted'))) }) response = await Promise.race([fetchPromise, abortPromise]) as Response } else { response = await fetchPromise } } catch (error) { if ((error as Error).message?.includes('timeout')) throw error if ((error as Error).message?.includes('abort')) throw error throw error } if (!response.ok) { const status = (response as { status?: number }).status const errorData = await response.json().catch(() => ({ message: 'Query failed' })) as { message?: string code?: string constraint?: string detail?: string } const error = new PostgresQueryError(errorData.message ?? 'Query failed', { sql, code: errorData.code, params, constraint: errorData.constraint, detail: errorData.detail, status, isRetryable: status !== undefined ? status >= SERVER_ERROR_STATUS_THRESHOLD : false, }) this.emitError(error, sql, params) throw error } this.resetCircuit() return response.json() as Promise> } /** * Create query options for TanStack Query v5 * * @example * ```typescript * const queryOptions = adapter.queryOptions({ * sql: 'SELECT * FROM users WHERE active = $1', * params: [true], * staleTime: 30000, * }) * * // Use with useQuery * const { data } = useQuery(queryOptions) * ``` */ queryOptions(params: QueryParams | string): PostgresQueryOptions { const opts = normalizeQueryParams(params) const { sql, params: queryParams, staleTime, gcTime, refetchInterval, enabled, timeout, signal, retry, retryDelay, fetchPolicy, refetchOnWindowFocus, refetchOnReconnect, refetchIntervalInBackground, } = opts if (!isReadOnlyQuery(sql)) { console.warn('queryOptions should be used for SELECT queries. Use mutationOptions for INSERT/UPDATE/DELETE.') } // Build query key with optional keyPrefix const baseKey = createQueryKey(this.config.database, sql, queryParams) const queryKey = this.config.keyPrefix ? ['postgres', this.config.keyPrefix, this.config.database, normalizeSQL(sql), ...(queryParams ?? [])] as unknown as PostgresQueryKey : baseKey const cacheKey = this.getCacheKey(sql, queryParams) const adapter = this const queryFn = async (): Promise => { // Check seeded cache const seededKey = normalizeSQL(sql) const seeded = adapter.seedCacheMap.get(seededKey) if (seeded && (staleTime === Infinity || (adapter.queryCache.has(cacheKey)))) { return seeded as T[] } // Handle cache-first policy if (fetchPolicy === 'cache-first') { const cached = adapter.queryCache.get(cacheKey) if (cached) { const elapsed = Date.now() - cached.timestamp const effectiveStaleTime = staleTime ?? adapter.config.defaultStaleTime if (elapsed < effectiveStaleTime) { return cached.data as T[] } } } // Deduplication: if there's an inflight query with the same key, reuse it const inflight = adapter.inflightQueries.get(cacheKey) if (inflight) { return inflight as Promise } const executeQuery = async (): Promise => { let lastError: Error | undefined const maxAttempts = (retry ?? 0) + 1 const startTime = Date.now() for (let attempt = 0; attempt < maxAttempts; attempt++) { try { if (signal?.aborted) { throw new Error('Query aborted') } const result = await adapter.query(sql, queryParams, { signal, timeout }) const rows = result.rows // Cache the result adapter.queryCache.set(cacheKey, { data: rows, timestamp: Date.now() }) // Attach non-enumerable metadata Object.defineProperty(rows, '__meta', { value: { executionTimeMs: Date.now() - startTime, queryKey, }, enumerable: false, configurable: true, writable: true, }) return rows } catch (e) { lastError = e as Error if (attempt < maxAttempts - 1) { const delay = retryDelay ?? DEFAULT_RETRY_DELAY_MS await new Promise(resolve => setTimeout(resolve, delay)) } } } throw lastError! } const promise = executeQuery() adapter.inflightQueries.set(cacheKey, promise) try { const result = await promise return result } finally { adapter.inflightQueries.delete(cacheKey) } } const options: PostgresQueryOptions = { queryKey, queryFn, staleTime: staleTime ?? this.config.defaultStaleTime, gcTime: gcTime ?? this.config.defaultGcTime, } if (refetchInterval !== undefined) { options.refetchInterval = refetchInterval } if (enabled !== undefined) { options.enabled = enabled } if (refetchOnWindowFocus !== undefined) { options.refetchOnWindowFocus = refetchOnWindowFocus } if (refetchOnReconnect !== undefined) { options.refetchOnReconnect = refetchOnReconnect } if (refetchIntervalInBackground !== undefined) { options.refetchIntervalInBackground = refetchIntervalInBackground } return options } /** * Create mutation options for TanStack Query v5 * * @example * ```typescript * const mutationOptions = adapter.mutationOptions({ * invalidate: { tables: ['users'] }, * onSuccess: (data) => console.log('Inserted:', data), * }) * * // Use with useMutation * const { mutate } = useMutation(mutationOptions) * mutate({ sql: 'INSERT INTO users (name) VALUES ($1)', params: ['Alice'] }) * ``` */ mutationOptions(options?: { /** Invalidation pattern after mutation */ invalidate?: InvalidationPattern /** Called before mutation for optimistic updates */ onMutate?: (variables: MutationParams) => Promise | MutationContext | undefined /** Called on successful mutation */ onSuccess?: (data: QueryResult, variables: MutationParams, context: MutationContext | undefined) => void /** Called on mutation error */ onError?: (error: Error, variables: MutationParams, context: MutationContext | undefined) => void /** Called when mutation settles (success or error) */ onSettled?: ( data: QueryResult | undefined, error: Error | null, variables: MutationParams, context: MutationContext | undefined ) => void /** Called with progress for bulk operations */ onProgress?: (progress: { processed: number; total: number }) => void /** Called when optimistic update is applied */ onOptimisticUpdate?: (event: { type: string; variables: MutationParams }) => void /** Deduplication: prevent duplicate concurrent mutations */ deduplicate?: boolean }): PostgresMutationOptions, Error, MutationParams, MutationContext> { const inflightMutations = new Map>>() const adapter = this const mutationOpts: PostgresMutationOptions, Error, MutationParams, MutationContext> = { mutationKey: ['postgres', this.config.database, 'mutation'], mutationFn: async (variables: MutationParams) => { // Deduplication if (options?.deduplicate) { const dedupKey = JSON.stringify([variables.sql, variables.params]) const inflight = inflightMutations.get(dedupKey) if (inflight) { return inflight } const promise = adapter.query(variables.sql, variables.params).then(result => { // Report progress for bulk operations if (options?.onProgress && result.rowCount) { options.onProgress({ processed: result.rowCount, total: result.rowCount }) } return result }) inflightMutations.set(dedupKey, promise) try { return await promise } finally { inflightMutations.delete(dedupKey) } } const result = await adapter.query(variables.sql, variables.params) // Report progress for bulk operations if (options?.onProgress && result.rowCount) { options.onProgress({ processed: result.rowCount, total: result.rowCount }) } return result }, } if (options?.onMutate) { if (options.onOptimisticUpdate) { const originalOnMutate = options.onMutate mutationOpts.onMutate = async (variables) => { const context = await originalOnMutate(variables) const mutationType = getMutationType(variables.sql) options.onOptimisticUpdate!({ type: mutationType, variables }) return context } } else { mutationOpts.onMutate = options.onMutate } } if (options?.onSuccess) { mutationOpts.onSuccess = options.onSuccess } if (options?.onError) { mutationOpts.onError = options.onError } if (options?.onSettled) { mutationOpts.onSettled = options.onSettled } return mutationOpts } /** * Create query key for a SQL query */ getQueryKey(sql: string, params?: unknown[]): PostgresQueryKey { return createQueryKey(this.config.database, sql, params) } /** * Get invalidation keys for tables */ getInvalidationKeysForTables(_tables: string[]): Array { return [['postgres', this.config.database] as const] } /** * Get the database name */ get database(): string { return this.config.database } } /** * Create a TanStack Query adapter for postgres.do */ export function createQueryAdapter(config: QueryAdapterConfig): QueryAdapter { return new QueryAdapter(config) } // ============================================================================ // Collection Utilities (shared filtering/sorting logic) // ============================================================================ /** * Filter items by a partial match object. * Each defined key in the filter must match the corresponding value in the item. */ export function filterByPartialMatch(items: T[], where: Partial): T[] { return items.filter(item => { for (const [key, value] of Object.entries(where)) { if (value === undefined) continue if (item[key as keyof T] !== value) return false } return true }) } /** * Sort items by a field and direction. * Returns a new sorted array (does not mutate the input). */ export function sortByField( items: T[], field: keyof T, direction: 'asc' | 'desc' ): T[] { return [...items].sort((a, b) => { const aVal = a[field] const bVal = b[field] if (aVal < bVal) return direction === 'asc' ? -1 : 1 if (aVal > bVal) return direction === 'asc' ? 1 : -1 return 0 }) } /** * Apply offset and limit pagination to an array of items. * Returns a sliced copy of the array. */ export function applyPagination(items: T[], offset?: number, limit?: number): T[] { let result = items if (offset !== undefined && offset > 0) { result = result.slice(offset) } if (limit !== undefined && limit > 0) { result = result.slice(0, limit) } return result } // ============================================================================ // Collection Adapter // ============================================================================ /** * Options for creating collection query options */ export interface CollectionQueryOptions { /** The collection to query */ collection: Collection /** Stale time in milliseconds */ staleTime?: number /** GC time in milliseconds */ gcTime?: number /** Whether the query is enabled */ enabled?: boolean /** Filter conditions */ where?: Partial /** Sort configuration */ orderBy?: { field: keyof T; direction: 'asc' | 'desc' } /** Limit results */ limit?: number /** Offset results */ offset?: number } /** * Create query options from a collection * Bridges the Collection API with TanStack Query */ export function createCollectionQueryOptions( options: CollectionQueryOptions ): PostgresQueryOptions { const { collection, staleTime = DEFAULT_STALE_TIME_MS, gcTime = DEFAULT_GC_TIME_MS, enabled = true, where, orderBy, limit, offset } = options return { queryKey: ['postgres', 'collection', collection.id] as unknown as PostgresQueryKey, queryFn: async () => { let items = collection.getAll() if (where) { items = filterByPartialMatch(items, where) } if (orderBy) { items = sortByField(items, orderBy.field, orderBy.direction) } items = applyPagination(items, offset, limit) return items }, staleTime, gcTime, enabled, } } /** * Collection mutation variable types */ export type CollectionMutationVariables = { type: 'insert' | 'update' | 'delete' | 'batch' | 'upsert' data: Partial id?: string | number operations?: Array<{ type: 'insert' | 'update' | 'delete'; data: Partial; id?: string | number }> } /** * Create mutation options from a collection */ export function createCollectionMutationOptions( collection: Collection, options?: { onMutate?: (variables: CollectionMutationVariables) => MutationContext | undefined onSuccess?: (data: T | void, variables: CollectionMutationVariables) => void onError?: (error: Error, variables: CollectionMutationVariables) => void transactional?: boolean } ): PostgresMutationOptions, MutationContext> { const mutationOpts: PostgresMutationOptions, MutationContext> = { mutationKey: ['postgres', 'collection', collection.id, 'mutation'], mutationFn: async (variables) => { switch (variables.type) { case 'insert': return collection.insert(variables.data as Omit & { id?: T['id'] }) case 'update': if (variables.id === undefined) { throw new Error(`Collection mutation failed: 'id' is required for update operations on collection "${collection.id}"`) } return collection.update(variables.id, variables.data) case 'delete': if (variables.id === undefined) { throw new Error(`Collection mutation failed: 'id' is required for delete operations on collection "${collection.id}"`) } await collection.delete(variables.id) return case 'upsert': { const id = (variables.data as BaseRecord).id if (id !== undefined && collection.get(id)) { return collection.update(id, variables.data) } else { return collection.insert(variables.data as Omit & { id?: T['id'] }) } } case 'batch': { if (!variables.operations || variables.operations.length === 0) { return } if (options?.transactional) { // Save snapshots for rollback const snapshots: Array<{ op: typeof variables.operations[0]; previousItem?: T }> = [] const completed: Array<{ op: typeof variables.operations[0]; previousItem?: T }> = [] try { for (const op of variables.operations) { const previousItem = op.id !== undefined ? collection.get(op.id) : undefined snapshots.push({ op, previousItem }) switch (op.type) { case 'insert': await collection.insert(op.data as Omit & { id?: T['id'] }) break case 'update': if (op.id === undefined) throw new Error(`Batch operation failed: 'id' is required for update in transactional batch on collection "${collection.id}"`) await collection.update(op.id, op.data) break case 'delete': if (op.id === undefined) throw new Error(`Batch operation failed: 'id' is required for delete in transactional batch on collection "${collection.id}"`) await collection.delete(op.id) break } completed.push({ op, previousItem }) } } catch (error) { // Rollback completed operations in reverse order for (let i = completed.length - 1; i >= 0; i--) { const entry = completed[i] if (!entry) continue const { op, previousItem } = entry try { switch (op.type) { case 'insert': { const insertedId = (op.data as BaseRecord).id if (insertedId !== undefined) { await collection.delete(insertedId) } break } case 'update': if (op.id !== undefined && previousItem) { await collection.update(op.id, previousItem) } break case 'delete': if (previousItem) { await collection.insert(previousItem as Omit & { id?: T['id'] }) } break } } catch { // Best-effort rollback } } throw error } } else { // Non-transactional batch: execute all operations for (const op of variables.operations) { switch (op.type) { case 'insert': await collection.insert(op.data as Omit & { id?: T['id'] }) break case 'update': if (op.id === undefined) throw new Error(`Batch operation failed: 'id' is required for update in batch on collection "${collection.id}"`) await collection.update(op.id, op.data) break case 'delete': if (op.id === undefined) throw new Error(`Batch operation failed: 'id' is required for delete in batch on collection "${collection.id}"`) await collection.delete(op.id) break } } } return } } }, } if (options?.onMutate) { mutationOpts.onMutate = options.onMutate } if (options?.onSuccess) { const onSuccess = options.onSuccess mutationOpts.onSuccess = (data, variables, _context) => onSuccess(data, variables) } if (options?.onError) { const onError = options.onError mutationOpts.onError = (error, variables, _context) => onError(error, variables) } return mutationOpts }