import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; const repService = vi.hoisted(() => ({ // We expose the callback methods that are normally called from the "on" // "data/error/heartbeat" emitted events to be able to manually call them. handleData: undefined as | ((lsn: string, log: any) => Promise | void) | undefined, handleError: undefined as ((err: Error) => void) | undefined, handleHeartbeat: undefined as | (( lsn: string, timestamp: number, shouldRespond: boolean, ) => Promise | void) | undefined, acknowledge: undefined as ReturnType | undefined, stop: undefined as ReturnType | undefined, })); vi.mock('pg-logical-replication', async () => { const actual = await vi.importActual( 'pg-logical-replication', ); class MockLogicalReplicationService { constructor(_config: unknown, _ackConfig: unknown) { repService.acknowledge = this.acknowledge; repService.stop = this.stop; } on = (event: 'data' | 'error' | 'heartbeat', listener: any) => { switch (event) { case 'data': repService.handleData = listener; break; case 'error': repService.handleError = listener; break; case 'heartbeat': repService.handleHeartbeat = listener; break; } }; acknowledge = vi.fn(); removeAllListeners = vi.fn(); emit = vi.fn(); stop = vi.fn(() => Promise.resolve()); subscribe = () => new Promise(() => { /** never return */ }); isStop = () => false; } return { ...actual, LogicalReplicationService: MockLogicalReplicationService, }; }); import { Pgoutput } from 'pg-logical-replication'; import { LogicalReplicationMessageHandler, createLogicalReplicationService, } from './create-logical-replication-service'; const relation: Pgoutput.MessageRelation = { tag: 'relation', relationOid: 1, schema: 'test_schema', name: 'test_table', replicaIdentity: 'default', columns: [ { name: 'id', flags: 0, typeOid: 23, typeMod: -1, typeSchema: 'pg_catalog', typeName: 'int4', parser: (raw: any) => raw, }, { name: 'name', flags: 0, typeOid: 25, typeMod: -1, typeSchema: 'pg_catalog', typeName: 'text', parser: (raw: any) => raw, }, ], keyColumns: ['id'], }; describe('createLogicalReplicationService', () => { afterEach(() => { vi.useRealTimers(); }); beforeEach(() => { repService.handleData = undefined; repService.handleError = undefined; repService.handleHeartbeat = undefined; repService.acknowledge = undefined; repService.stop = undefined; }); it('initialization throws an error if empty operations array is passed', async () => { // Act & Assert await expect( createLogicalReplicationService({ connectionString: 'test-valid-connection-string', publicationNames: ['test_publication'], replicationSlotName: 'test_slot', messageHandler: vi.fn(), operationsToWatch: [], }), ).rejects.toThrow( 'Unable to start the logical replication service when operationsToWatch is an empty array.', ); }); it('should call messageHandler and acknowledge the message when no errors are thrown', async () => { // Arrange const messageHandler: LogicalReplicationMessageHandler = vi.fn(); const cleanup = await createLogicalReplicationService({ connectionString: 'test-valid-connection-string', publicationNames: ['test_publication'], replicationSlotName: 'test_slot', messageHandler, }); expect(repService.handleData).toBeDefined(); const fullMessage = { tag: 'insert', relation, old: { id: 'test_id', aggregate_type: 'test_type_old', aggregate_id: 'test_aggregate_id_old', event_type: 'test_event_type_old', payload: { result: 'in_progress' }, created_at: new Date('2023-01-18T21:02:27.000Z'), }, new: { id: 'test_id', aggregate_type: 'test_type', aggregate_id: 'test_aggregate_id', event_type: 'test_event_type', payload: { result: 'success' }, created_at: new Date('2023-01-18T21:02:27.000Z'), }, }; // Act await repService.handleData!('0/00000001', fullMessage); // Assert expect(repService.handleError).toBeDefined(); expect(repService.handleHeartbeat).toBeDefined(); expect(messageHandler).toHaveBeenCalledWith({ scopedMessage: { new: fullMessage.new, old: fullMessage.old, operation: 'insert', schemaName: 'test_schema', tableName: 'test_table', }, fullMessage, }); expect(repService.acknowledge).toHaveBeenCalledWith('0/00000001'); expect(repService.stop).not.toHaveBeenCalled(); await cleanup(); expect(repService.stop).toHaveBeenCalledTimes(1); }); it('should call messageHandler with minimal scoped message and acknowledge the message when no errors are thrown', async () => { // Arrange const messageHandler: LogicalReplicationMessageHandler = vi.fn(); const cleanup = await createLogicalReplicationService({ connectionString: 'test-valid-connection-string', publicationNames: ['test_publication'], replicationSlotName: 'test_slot', messageHandler, }); expect(repService.handleData).toBeDefined(); const fullMessage = { tag: 'insert', relation: { tag: 'relation', }, old: null, new: undefined, }; // Act await repService.handleData!('0/00000001', fullMessage); // Assert expect(repService.handleError).toBeDefined(); expect(repService.handleHeartbeat).toBeDefined(); expect(messageHandler).toHaveBeenCalledWith({ scopedMessage: { new: undefined, old: undefined, operation: 'insert', schemaName: undefined, tableName: undefined, }, fullMessage, }); expect(repService.acknowledge).toHaveBeenCalledWith('0/00000001'); expect(repService.stop).not.toHaveBeenCalled(); await cleanup(); expect(repService.stop).toHaveBeenCalledTimes(1); }); it('should call messageHandler but not acknowledge the message when an error is thrown', async () => { // Arrange const testError = new Error('Transient error'); const cleanup = await createLogicalReplicationService({ connectionString: 'test-valid-connection-string', publicationNames: ['test_publication'], replicationSlotName: 'test_slot', messageHandler: async () => { throw testError; }, }); expect(repService.handleData).toBeDefined(); // Act await repService.handleData!('0/00000001', { tag: 'insert', relation, new: { id: 'test_id', aggregate_type: 'test_type', aggregate_id: 'test_aggregate_id', event_type: 'test_event_type', payload: { result: 'success' }, created_at: new Date('2023-01-18T21:02:27.000Z'), }, }); // Assert expect(repService.handleError).toBeDefined(); expect(repService.handleHeartbeat).toBeDefined(); expect(repService.acknowledge).not.toHaveBeenCalled(); expect(repService.stop).not.toHaveBeenCalled(); await cleanup(); expect(repService.stop).toHaveBeenCalledTimes(1); }); it('A heartbeat should be acknowledged after 5 seconds', async () => { // Arrange vi.useFakeTimers(); const cleanup = await createLogicalReplicationService({ connectionString: 'test-valid-connection-string', publicationNames: ['test_publication'], replicationSlotName: 'test_slot', messageHandler: vi.fn(), }); expect(repService.handleHeartbeat).toBeDefined(); // Act await repService.handleHeartbeat!('0/00000001', 123, true); // Assert expect(repService.handleData).toBeDefined(); expect(repService.handleError).toBeDefined(); expect(repService.acknowledge).not.toHaveBeenCalled(); await vi.advanceTimersByTimeAsync(4000); expect(repService.acknowledge).not.toHaveBeenCalled(); await vi.advanceTimersByTimeAsync(1010); expect(repService.acknowledge).toHaveBeenCalled(); expect(repService.stop).not.toHaveBeenCalled(); await cleanup(); expect(repService.stop).toHaveBeenCalledTimes(1); }, 10_000); it('A heartbeat should not be acknowledged after 5 seconds when a message acknowledgement comes in between', async () => { // Arrange vi.useFakeTimers(); const cleanup = await createLogicalReplicationService({ connectionString: 'test-valid-connection-string', publicationNames: ['test_publication'], replicationSlotName: 'test_slot', messageHandler: vi.fn(), }); expect(repService.handleData).toBeDefined(); expect(repService.handleHeartbeat).toBeDefined(); // Act await repService.handleHeartbeat!('0/00000001', 123, true); await vi.advanceTimersByTimeAsync(1000); await repService.handleData!('0/00000002', { tag: 'insert', relation, new: { id: 'test_id', aggregate_type: 'test_type', aggregate_id: 'test_aggregate_id', event_type: 'test_event_type', payload: { result: 'success' }, created_at: new Date('2023-01-18T21:02:27.000Z'), }, }); // Assert expect(repService.handleError).toBeDefined(); expect(repService.acknowledge).toHaveBeenCalledWith('0/00000002'); await vi.advanceTimersByTimeAsync(4010); expect(repService.acknowledge).toHaveBeenCalledTimes(1); expect(repService.stop).not.toHaveBeenCalled(); await cleanup(); expect(repService.stop).toHaveBeenCalledTimes(1); }, 10_000); });