import fs from 'fs/promises'; import { PoolClient } from 'pg'; import path from 'path'; import os from 'os'; import { Logger, getLogger } from '@squiz/dx-logger-lib'; // Be carful about changing this value. This needs to be consistent across deployments // this ID is used to signal other running instances that a migration is executing. const MIGRATION_ADVISORY_LOCK = 4569465; const logger = getLogger({ name: 'db-migrator', meta: { pid: process.pid, hostname: os.hostname() } }); export type Migration = { (db: PoolClient, logger: Logger): Promise; }; export class Migrator { constructor(protected migrationDir: string, protected migrationList: string[], protected pool: PoolClient) {} protected async ensureMigrationTableExists() { return this.pool.query('create table if not exists "__migrations__" (id varchar(128) NOT NULL)'); } protected async getAppliedMigrations() { await this.ensureMigrationTableExists(); const result = await this.pool.query('select * from __migrations__'); return result.rows.map(function (row) { return row.id; }); } protected async doSqlMigration(migration: string, sql: string) { try { const result = await this.pool.query(sql); logger.info('Applying ' + migration); if (result.rowCount !== undefined) { logger.info('affected rows', result.rowCount); } } catch (e) { logger.info('error occurred running migration', migration, e); throw e; } } protected async getPending(migrationsList: string[], appliedMigrations: string[]) { const pending: string[] = []; // get all migrations for (let i = 0; i < migrationsList.length; i++) { if (migrationsList[i] !== appliedMigrations[i]) { pending.push(migrationsList[i]); } } // validate order for (let i = 0; i < pending.length; i++) { if (appliedMigrations.includes(pending[i])) { throw new Error(`${pending[i]} has already run. Are you sure your migrations are in the correct order.`); } } // check the migration list is the right one (based on applied migrations on the db) for (let i = 0; i < migrationsList.length; i++) { if (migrationsList[i] !== appliedMigrations[i] && appliedMigrations[i] !== undefined) { throw new Error( `Migration #${i + 1} ${migrationsList[i]} on the migration list does not match with the applied migration ${ appliedMigrations[i] }. Are you sure your migrations are running in the correct DB? Migration list: [${migrationsList.join( ', ', )}], Applied migrations: [${appliedMigrations.join(', ')}]`, ); } } return pending; } protected async getSql(migration: string) { const sql = await fs.readFile(path.join(this.migrationDir, migration), { encoding: 'utf-8' }); return sql; } protected async tryToObtainLock(): Promise { const result = await this.pool.query(`SELECT pg_try_advisory_lock(${MIGRATION_ADVISORY_LOCK}) as lockobtained`); return result.rows[0].lockobtained; } protected async releaseLock(): Promise { await this.pool.query(`SELECT pg_advisory_unlock(${MIGRATION_ADVISORY_LOCK}) lockreleased`); } public async migrate(): Promise { try { const lockObtained = await this.tryToObtainLock(); if (lockObtained === false) { logger.info('migration already running'); await sleep(500); return await this.migrate(); } await this.runMigrations(); logger.info('completed migration'); await this.releaseLock(); this.dispose(); } catch (e) { logger.info('migration failed releasing lock'); await this.releaseLock(); throw e; } } protected async runMigrations() { const appliedMigrations = await this.getAppliedMigrations(); const pending = await this.getPending(this.migrationList, appliedMigrations); if (pending.length === 0) { logger.info('No pending migrations'); return; } logger.info('Pending migrations:\n\t' + pending.join('\n\t')); for (const migration of pending) { await this.runMigration(migration); } } protected async doScriptMigration(migration: string) { const migrationScript = path.join(this.migrationDir, migration); // eslint-disable-next-line @typescript-eslint/no-var-requires const migrationFunc = require(migrationScript); // eslint-disable-next-line @typescript-eslint/ban-types let callable: Function; if (migrationFunc instanceof Function) { callable = migrationFunc; } else if (migrationFunc.default instanceof Function) { callable = migrationFunc.default; } else { throw new Error(`${migrationScript} isn't callable`); } await callable(this.pool, logger); } protected async runMigration(migration: string) { try { await this.pool.query('BEGIN'); await this.doMigrationWork(migration); await this.pool.query('insert into __migrations__ (id) values ($1)', [migration]); await this.pool.query('COMMIT'); } catch (e) { logger.error('migration failed', migration, e); await this.pool.query('ROLLBACK'); throw e; } } protected async doMigrationWork(migration: string) { if (migration.endsWith('.sql')) { const sql = await this.getSql(migration); await this.doSqlMigration(migration, sql); } else if (migration.endsWith('.js')) { await this.doScriptMigration(migration); } else { throw new Error(`${migration} as an invalid migration extension`); } } protected dispose() { return this.pool.release(); } } async function sleep(timeMs: number): Promise { return new Promise((resolve) => { setTimeout(resolve, timeMs); }); }