import assert from 'node:assert' import SqliteDB from 'better-sqlite3' import { Kysely, KyselyPlugin, PluginTransformQueryArgs, PluginTransformResultArgs, QueryResult, RootOperationNode, SqliteDialect, UnknownRow, sql, } from 'kysely' import { dbLogger } from '../logger' import { retrySqlite } from './util' const DEFAULT_PRAGMAS = { // strict: 'ON', // @TODO strictness should live on table defs instead } export class Database { destroyed = false commitHooks: CommitHook[] = [] constructor(public db: Kysely) {} static sqlite( location: string, opts?: { pragmas?: Record }, ): Database { const sqliteDb = new SqliteDB(location, { timeout: 0, // handled by application }) const pragmas = { ...DEFAULT_PRAGMAS, ...(opts?.pragmas ?? {}), } for (const pragma of Object.keys(pragmas)) { sqliteDb.pragma(`${pragma} = ${pragmas[pragma]}`) } const db = new Kysely({ dialect: new SqliteDialect({ database: sqliteDb, }), }) return new Database(db) } async ensureWal() { await sql`PRAGMA journal_mode = WAL`.execute(this.db) } async transactionNoRetry( fn: (db: Database) => T | PromiseLike, ): Promise { this.assertNotTransaction() const leakyTxPlugin = new LeakyTxPlugin() const { hooks, txRes } = await this.db .withPlugin(leakyTxPlugin) .transaction() .execute(async (txn) => { const dbTxn = new Database(txn) try { const txRes = await fn(dbTxn) leakyTxPlugin.endTx() const hooks = dbTxn.commitHooks return { hooks, txRes } } catch (err) { leakyTxPlugin.endTx() // ensure that all in-flight queries are flushed & the connection is open await txn.getExecutor().provideConnection(async () => {}) throw err } }) hooks.map((hook) => hook()) return txRes } async transaction( fn: (db: Database) => T | PromiseLike, ): Promise { return retrySqlite(() => this.transactionNoRetry(fn)) } async executeWithRetry(query: { execute: () => Promise }) { if (this.isTransaction) { // transaction() ensures retry on entire transaction, no need to retry individual statements. return query.execute() } return retrySqlite(() => query.execute()) } onCommit(fn: () => void) { this.assertTransaction() this.commitHooks.push(fn) } get isTransaction() { return this.db.isTransaction } assertTransaction() { assert(this.isTransaction, 'Transaction required') } assertNotTransaction() { assert(!this.isTransaction, 'Cannot be in a transaction') } close(): void { if (this.destroyed) return this.db .destroy() .then(() => (this.destroyed = true)) .catch((err) => dbLogger.error({ err }, 'error closing db')) } } type CommitHook = () => void class LeakyTxPlugin implements KyselyPlugin { private txOver = false endTx() { this.txOver = true } transformQuery(args: PluginTransformQueryArgs): RootOperationNode { if (this.txOver) { throw new Error('tx already failed') } return args.node } async transformResult( args: PluginTransformResultArgs, ): Promise> { return args.result } }