import { Pool } from 'pg'; import { IsolationLevel, SQL, param, sql, transaction } from 'zapatos/db'; import { DbLogger } from '../common'; import { LogicalReplicationOperation } from './create-logical-replication-service'; /** * A set of parameters used by `ensureReplicationSlotAndPublicationExist` helper. */ export interface EnsureReplicationSlotAndPublicationExistParams { /** * The name of the replication slot associated with the current database to be * checked for existence and (re)created. * * The name must be a unique value across all databases present in the * PostgreSQL Server. */ replicationSlotName: string; /** * The name of the publication associated with the current database to be * checked for existence and (re)created. */ publicationName: string; /** * List of table names to be associated with publication to observe changes on. * `REPLICA IDENTITY FULL` is set for each passed table. */ tableNames: string[]; /** * Name of the schema for passed publication tables. */ schemaName: string; /** * PostgreSQL Pool instance. The role that is used to create this instance * must have a `REPLICATION` attribute assigned to it. */ replicationPgPool: Pool; /** * An array of database operations to be associated with (re)created * publication that will be observed. * @default - `insert`, `update`, `delete` */ operationsToWatch?: LogicalReplicationOperation[]; /** * Logical Replication output plugin to be used during (re)creation of the * replication slot. * @default - `pgoutput` */ pluginName?: string; /** * Mosaic logger instance to log info messages. If not provided, `console.log` * is used instead. */ logger?: DbLogger; } /** * Checks if specified replication slot and publication exist. If at least one * does not exists, or tables have changed - idempotently (re)creates both. * * It is preferable to run this helper on startup instead of defining the * replication slot and publication in context of migrations, because in cases * of database failover, the replication slots are not transferred between * primary and secondary databases, so the service needs to ensure that slot and * publication exist on its own. */ export const ensureReplicationSlotAndPublicationExist = async ({ replicationSlotName, publicationName, replicationPgPool, tableNames, schemaName, operationsToWatch = ['insert', 'update', 'delete'], pluginName = 'pgoutput', logger, }: EnsureReplicationSlotAndPublicationExistParams): Promise => { const slotName = param(replicationSlotName); const pubName = param(publicationName); const [{ slotExists, pubExists }] = await sql`SELECT EXISTS (SELECT FROM pg_replication_slots WHERE database = (SELECT current_database()) AND slot_name = ${slotName}) AS "slotExists", EXISTS (SELECT FROM pg_publication WHERE pubname = ${pubName}) AS "pubExists";`.run( replicationPgPool, ); const pubTables = await sql< SQL, { schemaname: string; tablename: string }[] >`SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = ${pubName}`.run(replicationPgPool); const assignedTables = new Set( pubTables.map((t) => `${t.schemaname}.${t.tablename}`), ); const expectedTables = new Set( tableNames.map((name) => `${schemaName}.${name}`), ); const tablesRemainTheSame = assignedTables.size === expectedTables.size && [...assignedTables].every((assignedTable) => expectedTables.has(assignedTable), ); if (slotExists && pubExists && tablesRemainTheSame) { (logger ?? console).log( `The replication slot "${replicationSlotName}" and publication "${publicationName}" already exist.`, ); return; } const operations = operationsToWatch.join(','); const plugin = param(pluginName); await transaction( replicationPgPool, IsolationLevel.Serializable, async (txn) => { await sql`DROP PUBLICATION IF EXISTS ${publicationName};`.run(txn); await sql`CREATE PUBLICATION ${publicationName} WITH (publish = ${operations});`.run( txn, ); for (const tableName of tableNames) { await sql`ALTER PUBLICATION ${publicationName} ADD TABLE ${schemaName}.${tableName};`.run( txn, ); await sql`ALTER TABLE ${schemaName}.${tableName} REPLICA IDENTITY full;`.run( txn, ); } }, ); await transaction( replicationPgPool, IsolationLevel.Serializable, async (txn) => { await sql`SELECT pg_drop_replication_slot(${slotName}) FROM pg_replication_slots WHERE slot_name = ${slotName} AND database = (SELECT current_database());`.run(txn); await sql`SELECT pg_create_logical_replication_slot(${slotName}, ${plugin});`.run( txn, ); }, ); (logger ?? console).log( `The replication slot "${replicationSlotName}" and publication "${publicationName}" successfully (re)created.`, ); };