import { DeleteOperationItemNotFoundError, DuplicateKeyInBatchError, SyncNotInitializedError, UpdateOperationItemNotFoundError, } from './errors' import type { QueryClient } from '@tanstack/query-core' import type { ChangeMessage, Collection } from '@tanstack/db' // Track active batch operations per context to prevent cross-collection contamination const activeBatchContexts = new WeakMap< SyncContext, { operations: Array> isActive: boolean } >() // Types for sync operations export type SyncOperation< TRow extends object, TKey extends string | number = string | number, TInsertInput extends object = TRow, > = | { type: `insert`; data: TInsertInput | Array } | { type: `update`; data: Partial | Array> } | { type: `delete`; key: TKey | Array } | { type: `upsert`; data: Partial | Array> } export interface SyncContext< TRow extends object, TKey extends string | number = string | number, > { collection: Collection queryClient: QueryClient queryKey: Array getKey: (item: TRow) => TKey /** * Begin a new sync transaction. * @param options.immediate - When true, the transaction will be processed immediately * even if there are persisting user transactions. Used by manual write operations. */ begin: (options?: { immediate?: boolean }) => void write: (message: Omit, `key`>) => void commit: () => void /** * Optional function to update the query cache with the latest synced data. * Handles both direct array caches and wrapped response formats (when `select` is used). * If not provided, falls back to directly setting the cache with the raw array. */ updateCacheData?: (items: Array) => void } interface NormalizedOperation< TRow extends object, TKey extends string | number = string | number, > { type: `insert` | `update` | `delete` | `upsert` key: TKey data?: TRow | Partial } // Normalize operations into a consistent format function normalizeOperations< TRow extends object, TKey extends string | number = string | number, TInsertInput extends object = TRow, >( ops: | SyncOperation | Array>, ctx: SyncContext, ): Array> { const operations = Array.isArray(ops) ? ops : [ops] const normalized: Array> = [] for (const op of operations) { if (op.type === `delete`) { const keys = Array.isArray(op.key) ? op.key : [op.key] for (const key of keys) { normalized.push({ type: `delete`, key }) } } else { const items = Array.isArray(op.data) ? op.data : [op.data] for (const item of items) { let key: TKey if (op.type === `update`) { // For updates, we need to get the key from the partial data key = ctx.getKey(item as TRow) } else { // For insert/upsert, validate and resolve the full item first const resolved = ctx.collection.validateData( item, op.type === `upsert` ? `insert` : op.type, ) key = ctx.getKey(resolved) } normalized.push({ type: op.type, key, data: item }) } } } return normalized } // Validate operations before executing function validateOperations< TRow extends object, TKey extends string | number = string | number, >( operations: Array>, ctx: SyncContext, ): void { const seenKeys = new Set() for (const op of operations) { // Check for duplicate keys within the batch if (seenKeys.has(op.key)) { throw new DuplicateKeyInBatchError(op.key) } seenKeys.add(op.key) // Validate operation-specific requirements // NOTE: These validations check the synced store only, not the combined view (synced + optimistic) // This allows write operations to work correctly even when items are optimistically modified if (op.type === `update`) { if (!ctx.collection._state.syncedData.has(op.key)) { throw new UpdateOperationItemNotFoundError(op.key) } } else if (op.type === `delete`) { if (!ctx.collection._state.syncedData.has(op.key)) { throw new DeleteOperationItemNotFoundError(op.key) } } } } // Execute a batch of operations export function performWriteOperations< TRow extends object, TKey extends string | number = string | number, TInsertInput extends object = TRow, >( operations: | SyncOperation | Array>, ctx: SyncContext, ): void { const normalized = normalizeOperations(operations, ctx) validateOperations(normalized, ctx) // Use immediate: true to ensure syncedData is updated synchronously, // even when called from within a mutationFn with an active persisting transaction ctx.begin({ immediate: true }) for (const op of normalized) { switch (op.type) { case `insert`: { const resolved = ctx.collection.validateData(op.data, `insert`) ctx.write({ type: `insert`, value: resolved, }) break } case `update`: { // Get from synced store only, not the combined view const currentItem = ctx.collection._state.syncedData.get(op.key)! const updatedItem = { ...currentItem, ...op.data, } const resolved = ctx.collection.validateData( updatedItem, `update`, op.key, ) ctx.write({ type: `update`, value: resolved, }) break } case `delete`: { // Get from synced store only, not the combined view const currentItem = ctx.collection._state.syncedData.get(op.key)! ctx.write({ type: `delete`, value: currentItem, }) break } case `upsert`: { // Check synced store only, not the combined view const existsInSyncedStore = ctx.collection._state.syncedData.has(op.key) const resolved = ctx.collection.validateData( op.data, existsInSyncedStore ? `update` : `insert`, op.key, ) if (existsInSyncedStore) { ctx.write({ type: `update`, value: resolved, }) } else { ctx.write({ type: `insert`, value: resolved, }) } break } } } ctx.commit() // Update query cache after successful commit const updatedData = Array.from(ctx.collection._state.syncedData.values()) if (ctx.updateCacheData) { ctx.updateCacheData(updatedData) } else { // Fallback: directly set the cache with raw array (for non-Query Collection consumers) ctx.queryClient.setQueryData(ctx.queryKey, updatedData) } } // Factory function to create write utils export function createWriteUtils< TRow extends object, TKey extends string | number = string | number, TInsertInput extends object = TRow, >(getContext: () => SyncContext | null) { function ensureContext(): SyncContext { const context = getContext() if (!context) { throw new SyncNotInitializedError() } return context } return { writeInsert(data: TInsertInput | Array) { const operation: SyncOperation = { type: `insert`, data, } const ctx = ensureContext() const batchContext = activeBatchContexts.get(ctx) // If we're in a batch, just add to the batch operations if (batchContext?.isActive) { batchContext.operations.push(operation) return } // Otherwise, perform the operation immediately performWriteOperations(operation, ctx) }, writeUpdate(data: Partial | Array>) { const operation: SyncOperation = { type: `update`, data, } const ctx = ensureContext() const batchContext = activeBatchContexts.get(ctx) if (batchContext?.isActive) { batchContext.operations.push(operation) return } performWriteOperations(operation, ctx) }, writeDelete(key: TKey | Array) { const operation: SyncOperation = { type: `delete`, key, } const ctx = ensureContext() const batchContext = activeBatchContexts.get(ctx) if (batchContext?.isActive) { batchContext.operations.push(operation) return } performWriteOperations(operation, ctx) }, writeUpsert(data: Partial | Array>) { const operation: SyncOperation = { type: `upsert`, data, } const ctx = ensureContext() const batchContext = activeBatchContexts.get(ctx) if (batchContext?.isActive) { batchContext.operations.push(operation) return } performWriteOperations(operation, ctx) }, writeBatch(callback: () => void) { const ctx = ensureContext() // Check if we're already in a batch (nested batch) const existingBatch = activeBatchContexts.get(ctx) if (existingBatch?.isActive) { throw new Error( `Cannot nest writeBatch calls. Complete the current batch before starting a new one.`, ) } // Set up the batch context for this specific collection const batchContext = { operations: [] as Array>, isActive: true, } activeBatchContexts.set(ctx, batchContext) try { // Execute the callback - any write operations will be collected const result = callback() // Check if callback returns a promise (async function) if ( // @ts-expect-error - Runtime check for async callback, callback is typed as () => void but user might pass async // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition result && typeof result === `object` && `then` in result && // @ts-expect-error - Runtime check for async callback, callback is typed as () => void but user might pass async typeof result.then === `function` ) { throw new Error( `writeBatch does not support async callbacks. The callback must be synchronous.`, ) } // Perform all collected operations if (batchContext.operations.length > 0) { performWriteOperations(batchContext.operations, ctx) } } finally { // Always clear the batch context batchContext.isActive = false activeBatchContexts.delete(ctx) } }, } }