import type { ProducerRecord, RecordMetadata } from 'kafkajs'; import type { KafkaConnectionCredentials, } from './types'; import { getKafkaConnection } from './connection'; import createLogger from '../log'; export const kafkaClient = async ( topic: string, getId: (payload: object) => string, messages: any[], // any according to SchemaRegistry.encode credentials: KafkaConnectionCredentials, logger = createLogger() ) => { let send: (record: ProducerRecord) => Promise; let encode: (payload: object) => Promise; try { ({ send, encode } = await getKafkaConnection({ topic, credentials })); const encodedMessages: { key: string, value: Buffer }[] = []; const encodedPromises = messages.map( async (message) => { const key = getId(message); const encodedMessage = await encode(message); return { key, value: encodedMessage }; } ); const results = await Promise.allSettled(encodedPromises); for (const result of results) { if (result.status === 'fulfilled') { encodedMessages.push(result.value); } else { logger.error('Kafka message failed to encode', result.reason); } } if (encodedMessages.length === 0) { logger.error('No kafka messages to send'); return; } const recordMetadataArray = await send({ topic, messages: encodedMessages }); recordMetadataArray.forEach((metadata, index) => { logger.info(`Message sent: Topic: ${topic}, Key:${encodedMessages[index].key}` + `${metadata.errorCode ? `, ErrorCode: ${metadata.errorCode}` : ''}`); }); } catch (error) { logger.error(error); } };