///
import type Mysql from 'mysql';
import type Pg from 'pg';
import type PgConnectionString from 'pg-connection-string';
import type { Resolvable } from '../sbvr-api/common-types.js';
import { Engines } from '@balena/abstract-sql-compiler';
import { EventEmitter } from 'eventemitter3';
import _ from 'lodash';
import { TypedError } from 'typed-error';
import * as env from '../config-loader/env.js';
import { fromCallback, timeout } from '../sbvr-api/control-flow.js';
import fnv1a from '@sindresorhus/fnv1a';
export const metrics = new EventEmitter();
export interface CodedError extends Error {
code: number | string;
}
type CreateTransactionFn = (
stackTraceErr?: Error,
timeoutMS?: number,
) => Promise;
type CloseTransactionFn = () => void;
export interface Row {
[fieldName: string]: any;
}
export interface Result {
rows: Row[];
rowsAffected: number;
insertId?: number | undefined;
}
export type Sql = string;
export type Bindings = any[];
const isSqlError = (value: any): value is SQLError => {
return value?.constructor?.name === 'SQLError';
};
export class DatabaseError extends TypedError {
public code?: number | string;
constructor(message?: string | CodedError | SQLError) {
if (isSqlError(message)) {
// If this is a SQLError we have to handle it specially (since it's not actually an instance of Error)
super(message.message as string);
} else {
super(message);
}
if (
message != null &&
typeof message !== 'string' &&
message.code != null
) {
// If the message has a code then use that as our code.
this.code = message.code;
}
}
}
export class ConstraintError extends DatabaseError {}
export class UniqueConstraintError extends ConstraintError {}
export class ForeignKeyConstraintError extends ConstraintError {}
export class CheckConstraintError extends ConstraintError {}
export class ExclusionConstraintError extends ConstraintError {}
export class TransactionClosedError extends DatabaseError {}
export class ReadOnlyViolationError extends DatabaseError {}
const wrapDatabaseError = (err: CodedError): DatabaseError => {
metrics.emit('db_error', err);
if (!(err instanceof DatabaseError)) {
// Wrap the error so we can catch it easier later
return new DatabaseError(err);
}
return err;
};
const alwaysExport = {
DatabaseError,
ConstraintError,
UniqueConstraintError,
ForeignKeyConstraintError,
CheckConstraintError,
ExclusionConstraintError,
TransactionClosedError,
ReadOnlyViolationError,
};
type BaseDatabase = typeof alwaysExport;
interface TransactionFn {
(
fn: (tx: Tx) => Resolvable,
options?: {
timeoutMS?: number;
},
): Promise;
(): Promise;
}
export interface Database extends BaseDatabase {
engine: Engines;
executeSql: (
this: Database,
sql: Sql,
bindings?: Bindings,
) => Promise;
transaction: TransactionFn;
readTransaction: TransactionFn;
}
interface EngineParams {
[engine: string]: (options: unknown) => Database;
}
export const engines = {} as EngineParams;
const types = {
integer: {
min: -2147483648,
max: 2147483647,
},
};
const validateTransactionLockParameter = (
value: number,
parameterName: string,
) => {
if (
!Number.isInteger(value) ||
value < types.integer.min ||
types.integer.max < value
) {
throw new TypeError(
`Invalid parameter '${parameterName}' provided for transaction lock`,
);
}
};
const transactionLockNamespaceMap = new Map();
/**
*
* @param namespaceKey Any string representing a namespace
* @param namespaceId For application level locks positive values should be used
* as internal / low level locks are recommended to use negative values
*/
export function registerTransactionLockNamespace(
namespaceKey: string,
namespaceId: number,
) {
validateTransactionLockParameter(namespaceId, 'namespaceId');
if (transactionLockNamespaceMap.has(namespaceKey)) {
throw new Error(
`Error while registering transaction lock namespace '${namespaceKey}'. Namespace key is already registered.`,
);
}
for (const entry of transactionLockNamespaceMap.entries()) {
if (entry[1] === namespaceId) {
throw new Error(
`Error while registering transaction lock namespace '${namespaceKey}'. Transaction lock namespace id '${namespaceId}' already registered for namespace ${entry[0]}.`,
);
}
}
transactionLockNamespaceMap.set(namespaceKey, namespaceId);
}
const maybePrepareCache = new Map();
// Pre-allocate a buffer to improve performance when hashing sql / reduce allocations
const fnv1aOpts = { utf8Buffer: new Uint8Array(256) };
const getPreparedName = (sql: Sql): string | undefined => {
if (env.db.prepareAfterN === false) {
return;
}
// Use a hash to minimize the size of the cache keys
const sqlHash = `${fnv1a(sql, fnv1aOpts).toString(36)}${sql.length}`;
if (env.db.prepareAfterN === true) {
return sqlHash;
}
const currentCount = maybePrepareCache.get(sqlHash) ?? 0;
if (currentCount >= env.db.prepareAfterN) {
return sqlHash;
}
// Only increment if we haven't already reached the threshold as we don't care past that point
maybePrepareCache.set(sqlHash, currentCount + 1);
};
const atomicExecuteSql: Database['executeSql'] = async function (
sql,
bindings,
) {
return await this.transaction(
async (tx) => await tx.executeSql(sql, bindings),
);
};
const asyncTryFn = (fn: () => any) => {
void Promise.resolve().then(fn);
};
type RejectedFunctions = (message: string) => {
executeSql: Tx['executeSql'];
rollback: Tx['rollback'];
};
const getRejectedFunctions: RejectedFunctions = env.DEBUG
? (message) => {
// In debug mode we create the error here to give the stack trace of where we first closed the transaction,
// but it adds significant overhead for a production environment
const rejectionValue = new TransactionClosedError(message);
// eslint-disable-next-line @typescript-eslint/require-await -- We need to return a promise for compatibility reasons.
const rejectFn = async () => {
// We return a new rejected promise on each call so that errors are automatically logged if the
// rejection is not handled (but only if it is not handled)
throw rejectionValue;
};
return {
executeSql: rejectFn,
rollback: rejectFn,
};
}
: (message) => {
// eslint-disable-next-line @typescript-eslint/require-await -- We need to return a promise for compatibility reasons.
const rejectFn = async () => {
throw new TransactionClosedError(message);
};
return {
executeSql: rejectFn,
rollback: rejectFn,
};
};
const onEnd: Tx['on'] = (name: string, fn: () => void) => {
if (name === 'end') {
asyncTryFn(fn);
}
};
const onRollback: Tx['on'] = (name: string, fn: () => void) => {
if (name === 'rollback') {
asyncTryFn(fn);
}
};
class AutomaticClose {
private automaticCloseTimeout: ReturnType;
private automaticClose: () => void;
private pending: false | number = 0;
private timeoutMS = env.db.timeoutMS;
constructor(
tx: Tx,
private stackTraceErr?: Error,
timeoutMS?: number,
) {
if (timeoutMS != null && timeoutMS > 0) {
this.timeoutMS = timeoutMS;
}
this.automaticClose = () => {
console.error(
`Transaction still open after ${this.timeoutMS}ms without an execute call.`,
);
if (this.stackTraceErr) {
console.error(this.stackTraceErr.stack);
}
void tx.rollback();
};
this.automaticCloseTimeout = setTimeout(
this.automaticClose,
this.timeoutMS,
);
}
public incrementPending() {
if (this.pending === false) {
return;
}
this.pending++;
clearTimeout(this.automaticCloseTimeout);
}
public decrementPending() {
if (this.pending === false) {
return;
}
this.pending--;
// We only ever want one timeout running at a time, hence not using <=
if (this.pending === 0) {
this.automaticCloseTimeout = setTimeout(
this.automaticClose,
this.timeoutMS,
);
} else if (this.pending < 0) {
console.error('Pending transactions is less than 0, wtf?');
this.pending = 0;
}
}
public cancelPending() {
// Set pending to false to cancel all pending.
this.pending = false;
clearTimeout(this.automaticCloseTimeout);
}
}
export abstract class Tx {
private closed = false;
protected automaticClose: AutomaticClose | undefined;
constructor(
protected readOnly: boolean,
stackTraceErr?: Error | AutomaticClose,
timeoutMS?: number,
) {
if (stackTraceErr instanceof AutomaticClose) {
this.automaticClose = stackTraceErr;
} else if (timeoutMS != null && timeoutMS > 0) {
// Disable automatic closing if timeoutMS is 0
this.automaticClose = new AutomaticClose(this, stackTraceErr, timeoutMS);
}
}
private closeTransaction(message: string): void {
this.automaticClose?.cancelPending();
const { executeSql, rollback } = getRejectedFunctions(message);
this.executeSql = executeSql;
this.rollback = this.end = rollback;
this.closed = true;
}
public isClosed() {
return this.closed;
}
protected abstract clone(readOnly?: boolean): Tx;
public asReadOnly() {
if (this.readOnly) {
return this;
}
return this.clone(true);
}
public isReadOnly() {
return this.readOnly;
}
public async executeSql(
sql: Sql,
bindings: Bindings = [],
...args: any[]
): Promise {
if (
env.db.checkReadOnlyQueries &&
this.readOnly &&
!/^\s*SELECT\s(?:[^;]|;\s*SELECT\s)*$/.test(sql)
) {
throw new ReadOnlyViolationError(
`Attempted to run a non-SELECT statement in a read-only tx: ${sql}`,
);
}
return await this.$executeSql(sql, bindings, ...args);
}
protected async $executeSql(
sql: Sql,
bindings: Bindings = [],
...args: any[]
): Promise {
this.automaticClose?.incrementPending();
const t0 = Date.now();
try {
return await this._executeSql(sql, bindings, ...args);
} catch (err: any) {
throw wrapDatabaseError(err);
} finally {
this.automaticClose?.decrementPending();
const queryTime = Date.now() - t0;
metrics.emit('db_query_time', {
queryTime,
// metrics-TODO: statistics on query types (SELECT, INSERT)
// themselves should be gathered by postgres, while at this
// scope in pine, we should report the overall query time as
// being associated with an HTTP method on the given model
// (eg. [PUT, Device])
//
// metrics-TODO: evaluate whether a request to a model can,
// with hooks, make multiple DB queries in such a way that
// it would be a statistically significant difference in the
// "query time" metric if we were to report them individually
// by attaching here, vs. aggregating all query times for a
// given request as one figure.
//
// Grab the first word of the query and regard that as the
// "query type" (to be improved in line with the above
// TODO's)
queryType: sql.split(' ', 1)[0],
});
}
}
public async rollback(): Promise {
try {
const promise = this._rollback();
this.closeTransaction('Transaction has been rolled back.');
await promise;
} finally {
this.listeners.rollback.forEach(asyncTryFn);
this.on = onRollback;
this.clearListeners();
}
}
public async end(): Promise {
await Promise.all(
this.listeners.preCommit.map(async (hook) => {
await hook();
}),
);
const promise = this._commit();
this.closeTransaction('Transaction has been ended.');
await promise;
this.listeners.end.forEach(asyncTryFn);
this.on = onEnd;
this.clearListeners();
}
public disableAutomaticClose(): void {
this.automaticClose?.cancelPending();
}
private listeners: {
end: Array<() => void>;
rollback: Array<() => void>;
preCommit: Array<() => void | Promise>;
} = {
end: [],
rollback: [],
preCommit: [],
};
public on(
name: T,
fn: Tx['listeners'][T][number],
): void {
this.listeners[name].push(fn);
}
private clearListeners() {
this.listeners.end.length = 0;
this.listeners.rollback.length = 0;
this.listeners.preCommit.length = 0;
}
protected abstract _executeSql(
sql: Sql,
bindings: Bindings,
addReturning?: false | string,
): Promise;
protected abstract _rollback(): Promise;
protected abstract _commit(): Promise;
// eslint-disable-next-line @typescript-eslint/require-await -- We need to return a promise for compatibility reasons.
public async getTxLevelLock(
// TODO: Re-enable the lint rule once eslint properly supports abstract class base implementations
/* eslint-disable @typescript-eslint/no-unused-vars */
_namespaceKey: string,
_key: number,
_blocking = true,
/* eslint-enable @typescript-eslint/no-unused-vars */
): Promise {
throw new Error(
'The getTxLevelLock method is not implemented for the current engine.',
);
}
public abstract tableList(extraWhereClause?: string): Promise;
public async dropTable(tableName: string, ifExists = true) {
if (typeof tableName !== 'string') {
throw new TypeError('"tableName" must be a string');
}
if (tableName.includes('"')) {
throw new TypeError('"tableName" cannot include double quotes');
}
if (this.readOnly) {
throw new ReadOnlyViolationError(
'Cannot drop tables in a read-only transaction',
);
}
const ifExistsStr = ifExists === true ? ' IF EXISTS' : '';
return await this.$executeSql(`DROP TABLE${ifExistsStr} "${tableName}";`);
}
}
const getStackTraceErr: () => Error | undefined = env.DEBUG
? () => new Error()
: (_.noop as () => undefined);
const createTransaction = (createFunc: CreateTransactionFn): TransactionFn => {
return async (
fn?: (tx: Tx) => Resolvable,
options?: {
timeoutMS?: number;
},
): Promise => {
const stackTraceErr = getStackTraceErr();
let tx;
try {
tx = await createFunc(stackTraceErr, options?.timeoutMS);
} catch (err: any) {
throw wrapDatabaseError(err);
}
if (fn) {
try {
const result = await fn(tx);
await tx.end();
return result;
} catch (err: any) {
try {
await tx.rollback();
} catch {
// Ignore rollback errors as we want to throw the original error
}
throw err;
}
} else {
return tx;
}
};
};
let maybePg: typeof Pg | undefined;
let maybePgConnectionString: typeof PgConnectionString | undefined;
try {
maybePg = (await import('pg')).default;
maybePgConnectionString = (await import('pg-connection-string')).default;
} catch {
// Ignore errors
}
interface EngineParams {
postgres: (
options:
| string
| Pg.PoolConfig
| { primary: Pg.PoolConfig; replica?: Pg.PoolConfig },
) => Database;
}
if (maybePg != null) {
const pg = maybePg;
engines.postgres = (connectString) => {
const PG_UNIQUE_VIOLATION = '23505';
const PG_FOREIGN_KEY_VIOLATION = '23503';
const PG_CHECK_CONSTRAINT_VIOLATION = '23514';
const PG_EXCLUSION_CONSTRAINT_VIOLATION = '23P01';
const { PG_SCHEMA } = process.env;
const initPool = (config: Pg.PoolConfig) => {
config.max ??= env.db.poolSize;
config.idleTimeoutMillis ??= env.db.idleTimeoutMillis;
config.statement_timeout ??= env.db.statementTimeout;
config.query_timeout ??= env.db.queryTimeout;
config.connectionTimeoutMillis ??= env.db.connectionTimeoutMillis;
config.keepAlive ??= env.db.keepAlive;
config.maxLifetimeSeconds ??= env.db.maxLifetimeSeconds;
config.maxUses ??= env.db.maxUses;
const p = new pg.Pool(config);
if (PG_SCHEMA != null) {
p.on('connect', (client) => {
void client.query({ text: `SET search_path TO "${PG_SCHEMA}"` });
});
}
p.on('connect', (client) => {
client.on('error', (err) => {
try {
console.error('Releasing client on error:', err);
client.release(err);
} catch (e) {
console.error('Error releasing client on error:', e);
}
});
});
p.on('error', (err) => {
console.error('Pool error:', err.message);
});
return p;
};
let pool: Pg.Pool;
let replica: Pg.Pool;
if (typeof connectString === 'string') {
if (maybePgConnectionString == null) {
throw new Error(
'pg-connection-string is required for string connection strings',
);
}
// We have to cast because of the use of null vs undefined
const config = maybePgConnectionString.parse(
connectString,
) as Pg.PoolConfig;
pool = initPool(config);
} else {
const config = connectString;
if ('primary' in config) {
pool = initPool(config.primary);
if (config.replica) {
replica = initPool(config.replica);
}
} else {
pool = initPool(config);
}
}
replica ??= pool;
const createResult = ({
rowCount,
rows,
}: {
rowCount: number | null;
rows: Row[];
}): Result => {
return {
rows,
rowsAffected: rowCount ?? 0,
insertId: rows?.[0]?.id,
};
};
class PostgresTx extends Tx {
constructor(
private db: Pg.PoolClient,
readOnly: boolean,
stackTraceErr?: Error | AutomaticClose,
timeoutMS?: number,
) {
super(readOnly, stackTraceErr, timeoutMS);
}
protected clone(readOnly = this.readOnly) {
return new PostgresTx(this.db, readOnly, this.automaticClose);
}
protected async _executeSql(
sql: Sql,
bindings: Bindings,
addReturning: false | string = false,
) {
if (addReturning && /^\s*(?:INSERT\s+INTO|UPDATE|DELETE)/i.test(sql)) {
sql = sql.replace(/;?$/, ' RETURNING "' + addReturning + '";');
}
let result;
try {
const preparedName = getPreparedName(sql);
result = await this.db.query({
text: sql,
values: bindings,
name: preparedName,
});
} catch (err: any) {
if (err.code === PG_UNIQUE_VIOLATION) {
throw new UniqueConstraintError(err);
}
if (err.code === PG_FOREIGN_KEY_VIOLATION) {
throw new ForeignKeyConstraintError(err);
}
if (err.code === PG_CHECK_CONSTRAINT_VIOLATION) {
throw new CheckConstraintError(err);
}
if (err.code === PG_EXCLUSION_CONSTRAINT_VIOLATION) {
throw new ExclusionConstraintError(err);
}
throw err;
}
return createResult(result);
}
protected async _rollback() {
try {
// Error/dequeue all queued up queries on a rollback, this will not cancel an in-progress query however
// @ts-expect-error typings do not include this queryQueue
const queryQueue = this.db.queryQueue as Pg.Query[];
if (queryQueue.length > 0) {
const err = new DatabaseError('Rolling back transaction');
for (const query of queryQueue) {
process.nextTick(() => {
// @ts-expect-error typings do not include this function
query.handleError(err, this.db.connection);
});
}
queryQueue.length = 0;
}
await timeout(
this.$executeSql('ROLLBACK;'),
env.db.rollbackTimeout,
'Rolling back transaction timed out',
);
this.db.release();
} catch (err: any) {
const errorToReturn = wrapDatabaseError(err);
this.db.release(errorToReturn);
throw errorToReturn;
}
}
protected async _commit() {
try {
await this.$executeSql('COMMIT;');
this.db.release();
} catch (err: any) {
this.db.release(err);
throw err;
}
}
public override async getTxLevelLock(
namespaceKey: string,
key: number,
blocking = true,
) {
validateTransactionLockParameter(key, 'key');
const namespaceId = transactionLockNamespaceMap.get(namespaceKey);
if (namespaceId == null) {
throw new Error(
`Transaction lock namespace ${namespaceKey} not registered.`,
);
}
try {
if (blocking) {
await this.executeSql(`SELECT pg_advisory_xact_lock($1, $2);`, [
namespaceId,
key,
]);
return true;
} else {
const { rows } = await this.executeSql(
`SELECT pg_try_advisory_xact_lock($1, $2);`,
[namespaceId, key],
);
return rows[0].pg_try_advisory_xact_lock === true;
}
} catch (err) {
throw new Error(
`getTxLevelLock error during getting lock from postgres db layer ${err}`,
);
}
}
public async tableList(extraWhereClause = '') {
if (extraWhereClause !== '') {
extraWhereClause = 'WHERE ' + extraWhereClause;
}
return await this.executeSql(`
SELECT *
FROM (
SELECT tablename as name
FROM pg_tables
WHERE schemaname = 'public'
) t ${extraWhereClause};
`);
}
}
return {
engine: Engines.postgres,
executeSql: atomicExecuteSql,
transaction: createTransaction(async (stackTraceErr, timeoutMS) => {
const client = await pool.connect();
const tx = new PostgresTx(client, false, stackTraceErr, timeoutMS);
void tx.executeSql('START TRANSACTION;');
return tx;
}),
readTransaction: createTransaction(async (stackTraceErr, timeoutMS) => {
const client = await replica.connect();
const tx = new PostgresTx(client, false, stackTraceErr, timeoutMS);
void tx.executeSql('START TRANSACTION READ ONLY;');
return tx.asReadOnly();
}),
...alwaysExport,
};
};
}
let maybeMysql: typeof Mysql | undefined;
try {
maybeMysql = (await import('mysql')).default;
} catch {
// Ignore errors
}
interface EngineParams {
mysql: (options: Mysql.PoolConfig) => Database;
}
if (maybeMysql != null) {
const mysql = maybeMysql;
engines.mysql = (options) => {
const MYSQL_UNIQUE_VIOLATION = 'ER_DUP_ENTRY';
const MYSQL_FOREIGN_KEY_VIOLATION = 'ER_ROW_IS_REFERENCED';
const MYSQL_CHECK_CONSTRAINT_VIOLATION = 'ER_CHECK_CONSTRAINT_VIOLATED';
const pool = mysql.createPool(options);
pool.on('connection', (db) => {
db.query("SET sql_mode='ANSI';");
});
const getConnectionAsync = () =>
fromCallback((callback) => {
pool.getConnection(callback);
});
interface MysqlRowArray extends Array {
affectedRows: number;
insertId?: number;
}
const createResult = (rows: MysqlRowArray): Result => {
return {
rows,
rowsAffected: rows.affectedRows,
insertId: rows.insertId,
};
};
class MySqlTx extends Tx {
constructor(
private db: Mysql.Connection,
private close: CloseTransactionFn,
readOnly: boolean,
stackTraceErr?: Error | AutomaticClose,
timeoutMS?: number,
) {
super(readOnly, stackTraceErr, timeoutMS);
}
protected clone(readOnly = this.readOnly) {
return new MySqlTx(this.db, this.close, readOnly, this.automaticClose);
}
protected async _executeSql(sql: Sql, bindings: Bindings) {
let result;
try {
result = await fromCallback((callback) => {
this.db.query(sql, bindings, callback);
});
} catch (err: any) {
if (err.code === MYSQL_UNIQUE_VIOLATION) {
// We know that the type is an IError for mysql, but typescript doesn't like the catch obj sugar
throw new UniqueConstraintError(err as Mysql.MysqlError);
}
if (err.code === MYSQL_FOREIGN_KEY_VIOLATION) {
throw new ForeignKeyConstraintError(err as Mysql.MysqlError);
}
if (err.code === MYSQL_CHECK_CONSTRAINT_VIOLATION) {
throw new CheckConstraintError(err as Mysql.MysqlError);
}
throw err;
}
return createResult(result);
}
protected async _rollback() {
const promise = this.$executeSql('ROLLBACK;');
this.close();
await promise;
}
protected async _commit() {
const promise = this.$executeSql('COMMIT;');
this.close();
await promise;
}
public async tableList(extraWhereClause = '') {
if (extraWhereClause !== '') {
extraWhereClause = ' WHERE ' + extraWhereClause;
}
return await this.executeSql(
`
SELECT name
FROM (
SELECT table_name AS name
FROM information_schema.tables
WHERE table_schema = ?
) t ${extraWhereClause};
`,
[options.database],
);
}
}
return {
engine: Engines.mysql,
executeSql: atomicExecuteSql,
transaction: createTransaction(async (stackTraceErr, timeoutMS) => {
const client = await getConnectionAsync();
const close = () => {
client.release();
};
const tx = new MySqlTx(client, close, false, stackTraceErr, timeoutMS);
void tx.executeSql('START TRANSACTION;');
return tx;
}),
readTransaction: createTransaction(async (stackTraceErr, timeoutMS) => {
const client = await getConnectionAsync();
const close = () => {
client.release();
};
const tx = new MySqlTx(client, close, false, stackTraceErr, timeoutMS);
void tx.executeSql('START TRANSACTION READ ONLY;');
return tx.asReadOnly();
}),
...alwaysExport,
};
};
}
interface EngineParams {
websql: (databaseName: string) => Database;
}
if (typeof window !== 'undefined' && window.openDatabase != null) {
interface WebSqlResult {
insertId?: number;
rowsAffected: number;
rows: {
item: (i: number) => Row;
length: number;
};
}
type AsyncQuery = [
Sql,
Bindings,
SQLStatementCallback,
SQLStatementErrorCallback,
];
engines.websql = (databaseName) => {
const WEBSQL_CONSTRAINT_ERR = 6;
const db = window.openDatabase(
databaseName,
'1.0',
'rulemotion',
2 * 1024 * 1024,
);
const getInsertId = (result: WebSqlResult) => {
try {
return result.insertId;
} catch {
// Ignore the potential DOM exception.
}
};
const createResult = (result: WebSqlResult): Result => {
const { length } = result.rows;
// We convert `result.rows` to a real array to make it easier to work with
const rows: Row[] = Array(length);
for (let i = 0; i < length; i++) {
rows[i] = result.rows.item(i);
}
return {
rows,
rowsAffected: result.rowsAffected,
insertId: getInsertId(result),
};
};
class WebSqlTx extends Tx {
constructor(
private tx: WebSqlWrapper,
readOnly: boolean,
stackTraceErr?: Error | AutomaticClose,
) {
super(readOnly, stackTraceErr);
}
protected clone(readOnly = this.readOnly) {
return new WebSqlTx(this.tx, readOnly, this.automaticClose);
}
protected async _executeSql(sql: Sql, bindings: Bindings) {
let result;
try {
result = await this.tx.executeSql(sql, bindings);
} catch (err: any) {
if (err.code === WEBSQL_CONSTRAINT_ERR) {
throw new ConstraintError('Constraint failed.');
}
throw err;
}
return createResult(result);
}
protected async _rollback(): Promise {
await this.tx.rollback();
}
// eslint-disable-next-line @typescript-eslint/require-await -- We need to return a promise for compatibility reasons.
protected async _commit() {
this.tx.commit();
}
public async tableList(extraWhereClause = '') {
if (extraWhereClause !== '') {
extraWhereClause = ' AND ' + extraWhereClause;
}
return await this.executeSql(`
SELECT name, sql
FROM sqlite_master
WHERE type='table'
AND name NOT IN (
'__WebKitDatabaseInfoTable__',
'sqlite_sequence'
)
${extraWhereClause};
`);
}
}
class WebSqlWrapper {
private running = true;
private queue: AsyncQuery[] = [];
constructor(private tx: SQLTransaction) {
this.asyncRecurse();
}
// This function is used to recurse executeSql calls and keep the transaction open,
// allowing us to use async calls within the API.
private asyncRecurse = () => {
let args: AsyncQuery | undefined;
while ((args = this.queue.pop())) {
console.debug('Running', args[0]);
this.tx.executeSql(args[0], args[1], args[2], args[3]);
}
if (this.running) {
console.debug('Looping');
this.tx.executeSql('SELECT 0', [], this.asyncRecurse);
}
};
public async executeSql(sql: Sql, bindings: Bindings) {
return await new Promise((resolve, reject) => {
const successCallback: SQLStatementCallback = (_tx, results) => {
resolve(results);
};
const errorCallback: SQLStatementErrorCallback = (_tx, err) => {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion -- we need to do the cast for another rule
reject(err as unknown as Error);
return false;
};
this.queue.push([sql, bindings, successCallback, errorCallback]);
});
}
public async rollback(): Promise {
await new Promise((resolve) => {
const successCallback: SQLStatementCallback = () => {
resolve();
throw new Error('Rollback');
};
const errorCallback: SQLStatementErrorCallback = () => {
resolve();
return true;
};
this.queue = [
[
'RUN A FAILING STATEMENT TO ROLLBACK',
[],
successCallback,
errorCallback,
],
];
this.running = false;
});
}
public commit() {
this.running = false;
}
}
return {
engine: Engines.websql,
executeSql: atomicExecuteSql,
transaction: createTransaction(
(stackTraceErr) =>
new Promise((resolve) => {
db.transaction((tx) => {
resolve(
new WebSqlTx(new WebSqlWrapper(tx), false, stackTraceErr),
);
});
}),
),
readTransaction: createTransaction(
(stackTraceErr) =>
new Promise((resolve) => {
db.transaction((tx) => {
resolve(new WebSqlTx(new WebSqlWrapper(tx), true, stackTraceErr));
});
}),
),
...alwaysExport,
};
};
}
export type DatabaseOptions = {
engine: T;
params: Parameters[0];
};
export const connect = (
databaseOptions: DatabaseOptions,
) => {
if (engines[databaseOptions.engine] == null) {
throw new Error('Unsupported database engine: ' + databaseOptions.engine);
}
return engines[databaseOptions.engine](databaseOptions.params);
};