import { Kafka } from 'kafkajs'; import { readAVSCAsync, SchemaRegistry, } from '@kafkajs/confluent-schema-registry'; import { getKafkaConnection } from './connection'; import { KafkaConnectionCredentials } from './types'; jest.mock('kafkajs', () => ({ Kafka: jest.fn(), CompressionCodecs: {}, CompressionTypes: { Snappy: 'snappy' }, })); jest.mock('@kafkajs/confluent-schema-registry', () => ({ SchemaRegistry: jest.fn(), readAVSCAsync: jest.fn(), COMPATIBILITY: { FORWARD_TRANSITIVE: 'FORWARD_TRANSITIVE', BACKWARD_TRANSITIVE: 'BACKWARD_TRANSITIVE' }, })); describe('getKafkaConnection', () => { // const mockKafkaConnection = { send: jest.fn(), encode: jest.fn() }; const mockProducer = { connect: jest.fn(), send: jest.fn() }; const mockSchemaRegistry = { getLatestSchemaId: jest.fn(), encode: jest.fn() }; const mockCredentials = { ca: '', brokers: [''], cert: '', key: '', registry: '' } as KafkaConnectionCredentials; const mockTopic = 'test-topic'; beforeEach(() => { (Kafka as jest.Mock).mockImplementation(() => ({ producer: () => mockProducer, })); (SchemaRegistry as jest.Mock).mockImplementation(() => mockSchemaRegistry); (readAVSCAsync as jest.Mock).mockResolvedValue({}); mockSchemaRegistry.getLatestSchemaId.mockResolvedValue({ id: 1 }); mockSchemaRegistry.encode.mockReturnValue('encoded-payload'); }); it('returns a cached connection if one exists', async () => { const firstConnection = await getKafkaConnection({ topic: mockTopic, credentials: mockCredentials }); const secondConnection = await getKafkaConnection({ topic: mockTopic, credentials: mockCredentials }); expect(firstConnection).toBe(secondConnection); }); it('creates a new connection if one does not exist in the cache', async () => { const firstConnection = await getKafkaConnection({ topic: mockTopic, credentials: mockCredentials }); const secondConnection = await getKafkaConnection({ topic: 'another-topic', credentials: mockCredentials }); expect(firstConnection).not.toBe(secondConnection); }); it('correctly registers a schema with the schema registry', async () => { await getKafkaConnection({ topic: mockTopic, credentials: mockCredentials }); expect(mockSchemaRegistry.getLatestSchemaId.mock.calls).toEqual([ ['test-topic-value'], ['another-topic-value'] ]); }); it('correctly encodes a payload', async () => { const connection = await getKafkaConnection({ topic: mockTopic, credentials: mockCredentials }); const encodedPayload = connection.encode({ test: 'payload' }); expect(encodedPayload).toBe('encoded-payload'); }); });