import { Kafka, CompressionTypes, CompressionCodecs } from 'kafkajs'; import { SchemaRegistry } from '@kafkajs/confluent-schema-registry'; import { Agent } from 'https'; import SnappyCodec from 'kafkajs-snappy'; import type { KafkaConnection, KafkaConnectionCredentials, KafkaConnectionMap } from './types'; CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec; const kafkaConnectionMap: KafkaConnectionMap = new Map(); export const getKafkaConnection = async ( { topic, credentials }: { topic: string, credentials: KafkaConnectionCredentials } ): Promise => { const connectionKey = topic; let kafkaConnection = kafkaConnectionMap.get(connectionKey); if (kafkaConnection) { // from cache return kafkaConnection; } const { ca, brokers, cert, key, registry: host } = credentials; const kafka = new Kafka({ clientId: 'chatbot', brokers, ssl: { ca, cert, key } }); const producer = kafka.producer(); await producer.connect(); const schemaRegistry = new SchemaRegistry({ host, agent: new Agent({ ca }) }); const schemaId = await schemaRegistry.getLatestSchemaId(`${topic}-value`); kafkaConnection = { send: producer.send.bind(producer), encode: (payload: object) => schemaRegistry.encode(schemaId, payload), }; kafkaConnectionMap.set(connectionKey, kafkaConnection); return kafkaConnection; };