import type { ReceiveSyncDurableOp } from "@firtoz/db-helpers"; import { exhaustiveGuard } from "@firtoz/maybe-error"; import type { InferSchemaOutput } from "@tanstack/db"; import { and, eq, getTableColumns, sql, type SQL } from "drizzle-orm"; import type { SQLiteInsertValue, SQLiteUpdateSetSource, } from "drizzle-orm/sqlite-core"; import type { SelectSchema, SyncBackend } from "../collection-utils"; import type { TableWithRequiredFields } from "../syncableTable"; import { convertBasicExpressionToDrizzle, convertOrderByToDrizzle, } from "./convert-ir"; import type { SQLInterceptor } from "./types"; export type SqliteDriverMode = "async" | "sync"; /** * `ON CONFLICT DO UPDATE` set map using SQLite `excluded.*` so inserts are idempotent * (matches IndexedDB `put` / replayed partial-sync rows already present from `initialLoad`). */ function sqliteExcludedUpsertSet( table: TTable, ): SQLiteUpdateSetSource { const cols = getTableColumns(table); const set: Record = {}; for (const [jsName, col] of Object.entries(cols)) { if (jsName === "id") continue; set[jsName] = sql.raw(`excluded."${col.name}"`); } return set as SQLiteUpdateSetSource; } export interface SqliteTableSyncBackendConfig< TTable extends TableWithRequiredFields, > { /** drizzle-orm SQLite database (async WASM/libsql or sync Durable Object) */ // biome-ignore lint/suspicious/noExplicitAny: generic over sync/async Drizzle DB shapes drizzle: any; table: TTable; tableName: string; debug?: boolean; checkpoint?: () => Promise; interceptor?: SQLInterceptor; /** * `async`: libsql/WASM — use `await db.transaction(async (tx) => …)`. * `sync`: Cloudflare DO SQLite — `transactionSync` requires a **synchronous** callback; use `.all()` / `.run()` on builders inside `tx`. */ driverMode: SqliteDriverMode; } export function createSqliteTableSyncBackend< TTable extends TableWithRequiredFields, >(config: SqliteTableSyncBackendConfig): SyncBackend { type TItem = InferSchemaOutput>; const table = config.table; const driverMode = config.driverMode; let transactionQueue = Promise.resolve(); const queueTransaction = ( _label: string, fn: () => Promise, ): Promise => { const run = (): Promise => fn(); const result = transactionQueue.then(run, run); transactionQueue = result.then( () => {}, () => {}, ); return result; }; const backend: SyncBackend = { initialLoad: async () => { const items = (await config.drizzle .select() .from(table)) as unknown as InferSchemaOutput>[]; if (config.interceptor?.onOperation) { config.interceptor.onOperation({ type: "select-all", tableName: config.tableName, itemsReturned: items, itemCount: items.length, context: "Initial load (eager mode)", timestamp: Date.now(), }); } if (config.interceptor?.onOperation) { config.interceptor.onOperation({ type: "write", tableName: config.tableName, itemsWritten: items, writeCount: items.length, context: "Initial load (eager mode)", timestamp: Date.now(), }); } return items as unknown as InferSchemaOutput>[]; }, loadSubset: async (options) => { let query = config.drizzle.select().from(table).$dynamic(); let hasWhere = false; if (options.where || options.cursor?.whereFrom) { let drizzleWhere: SQL | undefined; if (options.where && options.cursor?.whereFrom) { const mainWhere = convertBasicExpressionToDrizzle( options.where, table, ); const cursorWhere = convertBasicExpressionToDrizzle( options.cursor.whereFrom, table, ); drizzleWhere = and(mainWhere, cursorWhere); } else if (options.where) { drizzleWhere = convertBasicExpressionToDrizzle(options.where, table); } else if (options.cursor?.whereFrom) { drizzleWhere = convertBasicExpressionToDrizzle( options.cursor.whereFrom, table, ); } if (drizzleWhere) { query = query.where(drizzleWhere); hasWhere = true; } } if (options.orderBy) { const drizzleOrderBy = convertOrderByToDrizzle(options.orderBy, table); query = query.orderBy(...drizzleOrderBy); } if (options.limit !== undefined) { query = query.limit(options.limit); } if (options.offset !== undefined && options.offset > 0) { query = query.offset(options.offset); } const items = (await query) as unknown as InferSchemaOutput< SelectSchema >[]; if (config.interceptor?.onOperation) { const contextParts: string[] = ["On-demand load"]; if (options.orderBy) contextParts.push("with sorting"); if (options.limit !== undefined) contextParts.push(`limit ${options.limit}`); if (options.offset !== undefined && options.offset > 0) contextParts.push(`offset ${options.offset}`); if (options.cursor) contextParts.push("with cursor pagination"); if (hasWhere) { config.interceptor.onOperation({ type: "select-where", tableName: config.tableName, whereClause: "WHERE clause applied", itemsReturned: items, itemCount: items.length, context: contextParts.join(", "), timestamp: Date.now(), }); } else { config.interceptor.onOperation({ type: "select-all", tableName: config.tableName, itemsReturned: items, itemCount: items.length, context: contextParts.join(", "), timestamp: Date.now(), }); } } if (config.interceptor?.onOperation) { const contextParts: string[] = ["On-demand load"]; if (hasWhere) contextParts.push("with WHERE clause"); if (options.orderBy) contextParts.push("with sorting"); if (options.limit !== undefined) contextParts.push(`limit ${options.limit}`); if (options.offset !== undefined && options.offset > 0) contextParts.push(`offset ${options.offset}`); config.interceptor.onOperation({ type: "write", tableName: config.tableName, itemsWritten: items, writeCount: items.length, context: contextParts.join(", "), timestamp: Date.now(), }); } return items as unknown as InferSchemaOutput>[]; }, handleInsert: async (items) => { const results: Array>> = []; await queueTransaction("handleInsert", async () => { if (driverMode === "sync") { config.drizzle.transaction((tx: typeof config.drizzle) => { for (const itemToInsert of items) { if (config.debug) { console.log( `[${new Date().toISOString()}] insertListener inserting`, itemToInsert, ); } const result = tx .insert(table) .values( itemToInsert as unknown as SQLiteInsertValue, ) .onConflictDoUpdate({ target: table.id, set: sqliteExcludedUpsertSet(table), }) .returning() .all() as Array>>; if (config.debug) { console.log( `[${new Date().toISOString()}] insertListener result`, result, ); } if (result.length > 0) { results.push(result[0]); } } }); } else { await config.drizzle.transaction( async (tx: typeof config.drizzle) => { for (const itemToInsert of items) { if (config.debug) { console.log( `[${new Date().toISOString()}] insertListener inserting`, itemToInsert, ); } const result = (await tx .insert(table) .values( itemToInsert as unknown as SQLiteInsertValue, ) .onConflictDoUpdate({ target: table.id, set: sqliteExcludedUpsertSet(table), }) .returning()) as Array< InferSchemaOutput> >; if (config.debug) { console.log( `[${new Date().toISOString()}] insertListener result`, result, ); } if (result.length > 0) { results.push(result[0]); } } }, ); } if (config.checkpoint) { await config.checkpoint(); } }); return results; }, handleUpdate: async (mutations) => { const results: Array>> = []; await queueTransaction("handleUpdate", async () => { if (driverMode === "sync") { config.drizzle.transaction((tx: typeof config.drizzle) => { for (const mutation of mutations) { if (config.debug) { console.log( `[${new Date().toISOString()}] updateListener updating`, mutation, ); } const updateTime = new Date(); const result = tx .update(table) .set({ ...mutation.changes, updatedAt: updateTime, } as SQLiteUpdateSetSource) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, mutation.key as any)) .returning() .all() as Array>>; if (config.debug) { console.log( `[${new Date().toISOString()}] updateListener result`, result, ); } results.push(...result); } }); } else { await config.drizzle.transaction( async (tx: typeof config.drizzle) => { for (const mutation of mutations) { if (config.debug) { console.log( `[${new Date().toISOString()}] updateListener updating`, mutation, ); } const updateTime = new Date(); const result = (await tx .update(table) .set({ ...mutation.changes, updatedAt: updateTime, } as SQLiteUpdateSetSource) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, mutation.key as any)) .returning()) as Array< InferSchemaOutput> >; if (config.debug) { console.log( `[${new Date().toISOString()}] updateListener result`, result, ); } results.push(...result); } }, ); } if (config.checkpoint) { await config.checkpoint(); } }); return results; }, handleDelete: async (mutations) => { await queueTransaction("handleDelete", async () => { if (driverMode === "sync") { config.drizzle.transaction((tx: typeof config.drizzle) => { for (const mutation of mutations) { tx.delete(table) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, mutation.key as any)) .run(); } }); } else { await config.drizzle.transaction( async (tx: typeof config.drizzle) => { for (const mutation of mutations) { await tx .delete(table) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, mutation.key as any)); } }, ); } if (config.checkpoint) { await config.checkpoint(); } }); }, handleTruncate: async () => { await queueTransaction("handleTruncate", async () => { if (driverMode === "sync") { config.drizzle.transaction((tx: typeof config.drizzle) => { tx.delete(table).run(); }); } else { await config.drizzle.transaction( async (tx: typeof config.drizzle) => { await tx.delete(table); }, ); } if (config.checkpoint) { await config.checkpoint(); } }); }, applyReceiveSyncDurableWrites: async ( ops: ReceiveSyncDurableOp[], ) => { if (ops.length === 0) return; await queueTransaction("applyReceiveSyncDurableWrites", async () => { if (driverMode === "sync") { config.drizzle.transaction((tx: typeof config.drizzle) => { for (const op of ops) { switch (op.type) { case "insert": { if (config.debug) { console.log( `[${new Date().toISOString()}] receiveSync batch insert`, op.value, ); } tx.insert(table) .values( op.value as unknown as SQLiteInsertValue, ) .onConflictDoUpdate({ target: table.id, set: sqliteExcludedUpsertSet(table), }) .run(); break; } case "update": { if (config.debug) { console.log( `[${new Date().toISOString()}] receiveSync batch update`, op, ); } const updateTime = new Date(); tx.update(table) .set({ ...op.changes, updatedAt: updateTime, } as SQLiteUpdateSetSource) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, op.key as any)) .run(); break; } case "delete": tx.delete(table) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, op.key as any)) .run(); break; case "truncate": tx.delete(table).run(); break; default: exhaustiveGuard(op); } } }); } else { await config.drizzle.transaction( async (tx: typeof config.drizzle) => { for (const op of ops) { switch (op.type) { case "insert": { if (config.debug) { console.log( `[${new Date().toISOString()}] receiveSync batch insert`, op.value, ); } await tx .insert(table) .values( op.value as unknown as SQLiteInsertValue, ) .onConflictDoUpdate({ target: table.id, set: sqliteExcludedUpsertSet(table), }); break; } case "update": { if (config.debug) { console.log( `[${new Date().toISOString()}] receiveSync batch update`, op, ); } const updateTime = new Date(); await tx .update(table) .set({ ...op.changes, updatedAt: updateTime, } as SQLiteUpdateSetSource) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, op.key as any)); break; } case "delete": await tx .delete(table) // biome-ignore lint/suspicious/noExplicitAny: branded id key .where(eq(table.id, op.key as any)); break; case "truncate": await tx.delete(table); break; default: exhaustiveGuard(op); } } }, ); } if (config.checkpoint) { await config.checkpoint(); } }); }, }; return backend; }