// sqlite-kv.worker.ts import { parentPort, isMainThread } from 'worker_threads'; import type { Database, Transaction, Statement } from 'better-sqlite3'; import sqlite from 'better-sqlite3'; import type { WorkerRequest, WorkerResponse, InitPayload, GetPayload, SetPayload, DeletePayload, ScanStartPayload, ScanValuesStartPayload, CountPayload, ClearPayload, ApplyEditsPayload, } from './kv-worker-protocol.js'; // Adjust path import { parseSqliteKvStoreOptions, SQLiteKVStoreOptions, STATEMENTS, } from '../../utils/sqlite.js'; import { decodeTuple, encodeTuple, Tuple } from '../../../codec.js'; import { walSizeGuard } from '../../utils/sqlite-node.js'; // if (isMainThread) { // throw new Error('This script is intended to be run as a worker thread.'); // } // if (!parentPort) { // throw new Error('This script must be run as a worker thread.'); // } let db: Database | null = null; let statements: { get: Statement; set: Statement; delete: Statement; deleteRange: Statement; scan: Statement; scanValues: Statement; count: Statement; countRange: Statement; truncate: Statement; } | null = null; let transactions: { write: Transaction; } | null = null; // Store active iterators managed by this worker // Key: iteratorId (generated by worker), Value: Iterator object from better-sqlite3 const activeIterators = new Map>(); let nextIteratorId = 1; let walGuard: NodeJS.Timer | undefined; function initializeDatabase(payload: InitPayload): WorkerResponse { try { if (db) { // Optional: Handle re-initialization attempts? Close old one? console.warn( 'Worker received init request but database already initialized.' ); db.close(); // Close previous DB if any } db = sqlite(payload.databasePath); const parsedOptions = parseSqliteKvStoreOptions(payload.options || {}); db.unsafeMode(true); // Keep as per original code, evaluate later if needed db.exec(parsedOptions.pragma); // Create table db.prepare(STATEMENTS.createTable).run(); // Prepare statements statements = { get: db.prepare(STATEMENTS.get), set: db.prepare(STATEMENTS.set), delete: db.prepare(STATEMENTS.delete), deleteRange: db.prepare(STATEMENTS.deleteRange), scan: db.prepare(STATEMENTS.scan), scanValues: db.prepare(STATEMENTS.scanValues), count: db.prepare(STATEMENTS.count), countRange: db.prepare(STATEMENTS.countRange), truncate: db.prepare(STATEMENTS.truncate), }; // Prepare transaction transactions = { write: db.transaction((sets: [string, string][], deletes: string[]) => { // Statements are guaranteed to exist if transactions exists const delStmt = statements!.delete; const setStmt = statements!.set; for (const encodedKey of deletes) { delStmt.run(encodedKey); } for (const [encodedKey, encodedValue] of sets) { setStmt.run(encodedKey, encodedValue); } }), }; // Start WAL guard startWalGuard(db, parsedOptions); console.log(`Worker initialized DB: ${payload.databasePath}`); return { id: 0, type: 'initSuccess' }; // Use ID 0 for init confirmation } catch (error: any) { console.error('Worker DB Initialization failed:', error); return { id: 0, type: 'error', payload: error.message || 'Initialization failed', }; } } function startWalGuard(db: Database, options: Required) { if (walGuard) { clearInterval(walGuard); } const dbPath = db.name; const walFile = `${dbPath}-wal`; walGuard = setInterval(() => { walSizeGuard(db, walFile, { restartMax: options.checkpointRestart, truncateMax: options.checkpointTruncate, }); }, 60_000); } function getFullKey(key: Tuple, scope?: Tuple): Tuple { return scope ? [...scope, ...key] : key; } function encodeKey(key: Tuple, scope?: Tuple): string { return encodeTuple(getFullKey(key, scope)); } // --- Message Handler --- addEventListener('message', (msg: MessageEvent) => { const request = msg.data; // The incoming request from the main thread if (msg.data.operation === 'init') { const response = initializeDatabase(request.payload as InitPayload); postMessage(response); // Send init confirmation/error return; } // Ensure DB is initialized for all other operations if (!db || !statements || !transactions) { postMessage({ id: request.id, type: 'error', payload: 'Worker database not initialized.', } as WorkerResponse); return; } // Wrap operations in try-catch to send errors back try { let responsePayload: any = undefined; let responseType: WorkerResponse['type'] = 'result'; let iteratorId: number | undefined = undefined; // Specifically for scan responses switch (request.operation) { case 'get': { const { key, scope } = request.payload as GetPayload; const encodedKey = encodeKey(key, scope); const result: any = statements.get.get(encodedKey); responsePayload = result ? JSON.parse(result.value) : undefined; break; } case 'set': { const { key, value, scope } = request.payload as SetPayload; const encodedKey = encodeKey(key, scope); const encodedValue = JSON.stringify(value); statements.set.run(encodedKey, encodedValue); responsePayload = undefined; // Success, no data payload needed break; } case 'delete': { const { key, scope } = request.payload as DeletePayload; const encodedKey = encodeKey(key, scope); statements.delete.run(encodedKey); responsePayload = undefined; break; } case 'scanStart': { const { options, scope } = request.payload as ScanStartPayload; const low = scope ? encodeTuple([...scope, ...(options.prefix ?? [])]) : encodeTuple(options.prefix ?? []); const high = low + '\uffff'; // Assuming prefix scan is most common // TODO: Handle options.gt, options.gte, options.lt, options.lte if needed by adjusting low/high & query // TODO: Handle options.limit if needed const iterator = statements.scan.all(low, high); iteratorId = nextIteratorId++; activeIterators.set(iteratorId, iterator[Symbol.iterator]()); responseType = 'scanIteratorId'; responsePayload = iteratorId; // Send back the ID // No data sent yet, main thread needs to request `scanNext` break; } case 'scanNext': { const { iteratorId: reqIteratorId } = request.payload as { iteratorId: number; }; const iterator = activeIterators.get(reqIteratorId); iteratorId = reqIteratorId; // Keep iteratorId in the response for routing if (!iterator) { responseType = 'error'; responsePayload = `Scan iterator ${reqIteratorId} not found or already closed.`; } else { const result = iterator.next(); if (result.done) { responseType = 'scanComplete'; responsePayload = undefined; activeIterators.delete(reqIteratorId); // Clean up completed iterator } else { // result.value will be { key: string, value: string } const row = result.value; const decodedKey = decodeTuple(row.key); // We need scope/prefix info to remove it correctly. // This info was part of scanStart, not stored with iterator. // Simplification: Assume main thread handles prefix stripping based on original options. // If prefix stripping MUST happen in worker, we need to store scope/prefix with iterator. const keyWithoutPrefix = decodedKey; // Let main thread handle for now const value = JSON.parse(row.value); responseType = 'scanData'; responsePayload = [keyWithoutPrefix, value]; } } break; } case 'scanDispose': { const { iteratorId: reqIteratorId } = request.payload as { iteratorId: number; }; if (activeIterators.has(reqIteratorId)) { // No explicit dispose needed for better-sqlite3 iterators, just remove from map activeIterators.delete(reqIteratorId); console.debug(`Worker disposed scan iterator ${reqIteratorId}`); } // No response needed for dispose unless confirming success/failure return; // Don't send a standard response message } // --- scanValues variants (similar to scan) --- case 'scanValuesStart': { const { options, scope } = request.payload as ScanValuesStartPayload; const low = scope ? encodeTuple([...scope, ...(options.prefix ?? [])]) : encodeTuple(options.prefix ?? []); const high = low + '\uffff'; const iterator = statements.scanValues.pluck().all(low, high); // Pluck here iteratorId = nextIteratorId++; activeIterators.set(iteratorId, iterator[Symbol.iterator]()); // Store the iterator for future calls responseType = 'scanValuesIteratorId'; responsePayload = iteratorId; break; } case 'scanValuesNext': { const { iteratorId: reqIteratorId } = request.payload as { iteratorId: number; }; const iterator = activeIterators.get(reqIteratorId); iteratorId = reqIteratorId; if (!iterator) { responseType = 'error'; responsePayload = `ScanValues iterator ${reqIteratorId} not found or already closed.`; } else { const result = iterator.next(); if (result.done) { responseType = 'scanValuesComplete'; responsePayload = undefined; activeIterators.delete(reqIteratorId); } else { // result.value will be the string value due to pluck() const value = JSON.parse(result.value); responseType = 'scanValuesData'; responsePayload = value; } } break; } case 'scanValuesDispose': { const { iteratorId: reqIteratorId } = request.payload as { iteratorId: number; }; if (activeIterators.has(reqIteratorId)) { activeIterators.delete(reqIteratorId); console.debug(`Worker disposed scanValues iterator ${reqIteratorId}`); } // No response needed return; } // --- End scanValues --- case 'count': { const { options, scope } = request.payload as CountPayload; const fullPrefix = scope ? [...scope, ...(options.prefix ?? [])] : (options.prefix ?? []); if (!fullPrefix.length) { responsePayload = statements.count.pluck().get(); } else { const low = encodeTuple(fullPrefix); const high = low + '\uffff'; responsePayload = statements.countRange.pluck().get(low, high); } break; } case 'clear': { const { scope } = request.payload as ClearPayload; if (!scope?.length) { statements.truncate.run(); } else { const low = encodeTuple(scope); const high = low + '\uffff'; statements.deleteRange.run(low, high); } responsePayload = undefined; break; } case 'applyEdits': { const { sets, deletes, scope } = request.payload as ApplyEditsPayload; // Encode keys/values within the worker before passing to transaction const encodedDeletes = deletes.map((key) => encodeKey(key, scope)); const encodedSets: [string, string][] = sets.map(([key, value]) => [ encodeKey(key, scope), JSON.stringify(value), ]); transactions.write(encodedSets, encodedDeletes); responsePayload = undefined; break; } case 'close': { // Clean up all active iterators activeIterators.clear(); // Close the database if (db) { db.close(); db = null; statements = null; transactions = null; console.log('Worker closed database.'); } responsePayload = undefined; // Optionally exit the worker? Or just confirm close? // parentPort!.close(); // Could terminate the worker break; } default: responseType = 'error'; responsePayload = `Unknown operation: ${request.operation}`; } // Send the successful result back const response: WorkerResponse = { id: request.id, type: responseType, payload: responsePayload, iteratorId, }; postMessage(response); } catch (error: any) { // Send error back to the main thread console.error( `Worker error processing operation ${request.operation} (ID: ${request.id}):`, error ); const response: WorkerResponse = { id: request.id, type: 'error', payload: error.message || 'An unknown error occurred in the worker.', iteratorId: activeIterators.has( // @ts-expect-error request.payload?.iteratorId ) ? // @ts-expect-error request.payload.iteratorId : undefined, // Include iteratorId if error happened during scanNext }; // If error occurred during scanNext/scanValuesNext, clean up the iterator if ( (request.operation === 'scanNext' || request.operation === 'scanValuesNext') && // @ts-expect-error request.payload?.iteratorId ) { activeIterators.delete( // @ts-expect-error request.payload.iteratorId ); } postMessage(response); } }); console.log('SQLite KV Worker started.');