import { DatabaseError } from 'pg'; import { LogicalReplicationService, Pgoutput, PgoutputPlugin, } from 'pg-logical-replication'; import { DbLogger } from '../common'; const sleep = (ms: number): Promise => new Promise((res) => setTimeout(res, ms)); export type LogicalReplicationOperation = | 'begin' | 'commit' | 'delete' | 'insert' | 'message' | 'origin' | 'relation' | 'truncate' | 'type' | 'update'; export interface PgOutputScopedMessage { operation: LogicalReplicationOperation; tableName?: string; schemaName?: string; // eslint-disable-next-line @typescript-eslint/no-explicit-any new?: Record; // eslint-disable-next-line @typescript-eslint/no-explicit-any old?: Record; } export interface LogicalReplicatonMessageHandlerParams { scopedMessage: PgOutputScopedMessage; fullMessage: Pgoutput.Message; } export type LogicalReplicationMessageHandler = ( params: LogicalReplicatonMessageHandlerParams, ) => Promise; /** * Configuration object to set up a logical replication service. */ export interface LogicalReplicationServiceConfig { /* Database connection string for the user with `REPLICATION` role */ connectionString: string; /** * Array of publication names for tables which changes would be watched. * Must be defined beforehand on database level, e.g. using `ensureReplicationSlotAndPublicationExist` */ publicationNames: string[]; /** * Name of the replication slot which will be streaming table changes. * Must be defined beforehand on database level, e.g. using `ensureReplicationSlotAndPublicationExist` */ replicationSlotName: string; /** * Decides how to react to detected table changes. */ messageHandler: LogicalReplicationMessageHandler; /** * An array of operations that will be handled and redirected to the * `messageHandler`. Other operations will be skipped, each producing a TRACE * log. */ operationsToWatch?: LogicalReplicationOperation[]; /** * Produces logs during the logical replication service runtime. If not * passed, console logs are used instead. */ logger?: DbLogger; /** * When logical replication service establishes a connection with a * replication slot - it can already be in use, e.g. by another instance of * the service. The logical replication service will then try to reconnect * once every 10 seconds for a total duration of 5 minutes. * * After the first 5 minutes, the delay of reconnection attempts will be * increased from 10 seconds to the value of this parameter. Default value is * 300000 (every 5 minutes). */ reconnectionIntervalInMs?: number; } const getScopedMessage = ( message: Pgoutput.Message, operationsToWatch: string[], ): PgOutputScopedMessage | undefined => { if (!operationsToWatch.includes(message.tag)) { return undefined; } return { operation: message.tag, tableName: 'relation' in message ? message.relation.name : undefined, schemaName: 'relation' in message ? message.relation.schema : undefined, new: 'new' in message ? message.new : undefined, old: 'old' in message && message.old ? message.old : undefined, }; }; const handleReconnection = async ( error: DatabaseError, reconnectionAttempts: number, replicationSlotName: string, reconnectionIntervalInMs: number, logger: DbLogger, ): Promise => { if (error?.code === '55006' && error?.routine === 'ReplicationSlotAcquire') { const isFirstFiveMinutesAfterStartup = reconnectionAttempts < 30; const log = isFirstFiveMinutesAfterStartup ? logger.trace.bind(logger) : logger.log.bind(logger); const waitingTimeInMs = isFirstFiveMinutesAfterStartup ? 10000 : reconnectionIntervalInMs; log( `The logical replication slot '${replicationSlotName}' is already in use. Waiting for ${ waitingTimeInMs / 1000 } seconds to try connecting again. (Attempt ${reconnectionAttempts + 1})`, ); await sleep(waitingTimeInMs); return true; } return false; }; const handleFailure = async ( error: Error, failedAttempts: number, failedAttemptsResetTimer: NodeJS.Timeout | undefined, logger: DbLogger, ): Promise => { if (failedAttempts > 20) { throw error; } logger.error( error, `Logical replication service failure has occurred. (Attempt ${failedAttempts})`, ); await sleep(1500 * failedAttempts); clearTimeout(failedAttemptsResetTimer); return setTimeout(async () => { logger.trace( `No errors have occurred within 5 minutes. Resetting failures counter after ${failedAttempts} failure(s).`, ); failedAttempts = 0; }, 300000); }; /** * Creates a logical replication service and keeps it running until stopped. * Watches the table changes based on passed publication names and replication * slot, handling messages based on the passed message handler. By default * watches the "insert", "update", and "delete" operations. * * In case of errors, the service will try to recover for 20 attempts with an * incremental delay between attempts for a total of ~5 minutes. Any success * would reset the error attempts counter. Each failure will be logged with an * attempts counter value. If 5 minutes passes without any errors - the errors * counter is reset. If the counter increases over 20 in * the span of 5 minutes - causing error will be thrown to crash the process * to trigger a service reload that might resolve the issue. * * @param config Connection and processing config for the logical replication * service. * @returns a function to stop the logical replication service. */ export const createLogicalReplicationService = async ({ connectionString, publicationNames, replicationSlotName, messageHandler, operationsToWatch = ['insert', 'update', 'delete'], logger: passedLogger, reconnectionIntervalInMs = 300000, }: LogicalReplicationServiceConfig): Promise<{ (): Promise }> => { if (operationsToWatch.length === 0) { throw new Error( 'Unable to start the logical replication service when operationsToWatch is an empty array.', ); } const logger = passedLogger ?? console; const plugin = new PgoutputPlugin({ protoVersion: 1, publicationNames }); let service: LogicalReplicationService; let stopped = false; let failedAttempts = 0; let reconnectionAttempts = 0; let failedAttemptsResetTimer: NodeJS.Timeout | undefined = undefined; // Run the service in an endless background loop until it gets stopped (async () => { while (!stopped) { try { await new Promise((resolve, reject) => { let heartbeatAckTimer: NodeJS.Timeout | undefined = undefined; service = new LogicalReplicationService( { connectionString }, { acknowledge: { auto: false, timeoutSeconds: 0 } }, ); service.on('start', async () => { logger.debug( `Started the logical replication service to watch the database operations (${operationsToWatch.join( ', ', )}) on table(s) associated with the following publication(s): ${publicationNames.join( ', ', )}`, ); }); service.on('data', async (lsn: string, message: Pgoutput.Message) => { try { if (service.isStop()) { logger.error( 'Received data even though the service is stopped.', ); return; } const scopedMessage = getScopedMessage( message, operationsToWatch, ); if (scopedMessage) { await messageHandler({ scopedMessage, fullMessage: message }); failedAttempts = 0; clearTimeout(failedAttemptsResetTimer); clearTimeout(heartbeatAckTimer); await service.acknowledge(lsn); } } catch (error) { if (!service.isStop()) { service.emit('error', error); } } }); service.on('error', async (err: Error) => { service.removeAllListeners(); await service.stop(); reject(err); }); service.on('heartbeat', async (lsn, _timestamp, shouldRespond) => { if (shouldRespond) { heartbeatAckTimer = setTimeout(async () => { logger.trace(`${lsn}: acknowledged heartbeat`); await service.acknowledge(lsn); }, 5000); } }); service .subscribe(plugin, replicationSlotName) .then(() => resolve(true)) .catch(async (err) => { service.removeAllListeners(); await service.stop(); reject(err); }); }); } catch (err) { const error = err as DatabaseError; if ( await handleReconnection( error, reconnectionAttempts, replicationSlotName, reconnectionIntervalInMs, logger, ) ) { reconnectionAttempts++; continue; } failedAttempts++; failedAttemptsResetTimer = await handleFailure( error, failedAttempts, failedAttemptsResetTimer, logger, ); } } })(); return async () => { logger.debug('Shutting down the logical replication service.'); stopped = true; service?.removeAllListeners(); service ?.stop() .catch((e) => logger.error(e, 'Error on logical replicationservice shutdown.'), ); }; };