{"version":3,"sources":["../../../../src/events/types/kafka.ts"],"names":["KafkaEventBus","#client","KafkaJS","options","topicName","Instance","producer","data","onMessage","subscribe","#createTopic","topic","Random"],"mappings":"AAAA,0IAAwB,yDAGf,4DAEQ,6DAIJA,6CAKNC,MAAcC,EAAQ,QAC1B,oBAAA,CAAA,CAAA,CAAA,CAAA,CAAS,CAAE,CAAA,WAAW,CAAA,CAAUA,CAAAA,CAAQ,KAAA,CAAA,CAAA,CAAA,IAAS,CAAA,CAAA,CAAA,CAAQ,IAI3D,wBAAA,CAAA,KAAA,CAAA,CAAA,OAA4EC,CAAkC,CAAC,GAC9G,CAAA,CAAA,QAAsB,CAAA,wBAAA,CAAA,QAAYC,CAAYC,OAAa,CAAE,CAAA,CAAA,CAAA,YAAuB,CAAA,CACpF,CAAA,CAAA,CAAA,CAAA,CAAA,CAAO,CACN,MAAA,CAAA,CAAS,CAAA,CAAA,SACR,CAAA,CAAA,CAAA,sBAAA,CAAMC,GAAW,CAAA,CAAA,CAAA,aAAsB,CAAA,CACvC,CAAA,CAAA,MAAA,CAAA,OAAe,CAAA,MAAQ,CAAA,EACvB,CAAA,MAAe,CAAA,CAAA,IACd,CAAA,CAAA,CAAA,CAAA,QACA,CAAA,CAAA,CAAA,OAAa,MAAO,CAAA,CAAK,OAAA,CAAA,CAAA,CAAUC,MACnC,CACM,CAAA,IAER,CAAA,CAAA,KAAA,CAAA,CAAYC,CAAAA,QACX,CAAMC,CAAAA,CAAY,KAAA,CAAA,IACjB,CAAA,SAAM,CAAKC,CAAAA,CAAAA,CAAaC,CAAK,CAAA,CAC7B,CAAA,CAAA,CAAA,CAAA,CAAA,SAAwB,CAAA,CAAA,EACrBN,CAAS,MAAM,CAAA,CAAA,KAAA,CAAA,CAAA,EAAA,CAAA,MAAiBA,IAAS,CAAI,CAAA,CAAE,CAAA,CAAE,CAAA,CAAA,MAAA,CAAA,CAAA,CAAWO,CAAAA,MAAO,CAAA,sBAAO,CAAA,GAAG,CAAE,CAAA,CAC/ED,aAC2B,CAAA,CAAA,EAAA","file":"/home/runner/work/equipped/equipped/dist/cjs/events/types/kafka.min.cjs","sourcesContent":["import { KafkaJS } from '@confluentinc/kafka-javascript'\n\nimport { EquippedError } from '../../errors'\nimport { Instance } from '../../instance'\nimport type { Events } from '../../types'\nimport { Random, parseJSONValue } from '../../utilities'\nimport { EventBus, type StreamOptions } from '../base'\nimport type { KafkaConfig } from '../pipes'\n\nexport class KafkaEventBus extends EventBus {\n\t#client: KafkaJS.Kafka\n\t#admin: Promise<KafkaJS.Admin> | undefined\n\tconstructor(config: KafkaConfig) {\n\t\tsuper()\n\t\tthis.#client = new KafkaJS.Kafka({\n\t\t\tkafkaJS: { ...config, logLevel: KafkaJS.logLevel.NOTHING },\n\t\t})\n\t}\n\n\tcreateStream<Event extends Events[keyof Events]>(topicName: Event['topic'], options: Partial<StreamOptions> = {}) {\n\t\tconst topic = options.skipScope ? topicName : Instance.get().getScopedName(topicName)\n\t\treturn {\n\t\t\tpublish: async (data: Event['data']) => {\n\t\t\t\tconst producer = this.#client.producer()\n\t\t\t\tawait producer.connect()\n\t\t\t\tawait producer.send({\n\t\t\t\t\ttopic,\n\t\t\t\t\tmessages: [{ value: JSON.stringify(data) }],\n\t\t\t\t})\n\t\t\t\treturn true\n\t\t\t},\n\t\t\tsubscribe: (onMessage: (data: Event['data']) => Promise<void>) => {\n\t\t\t\tconst subscribe = async () => {\n\t\t\t\t\tawait this.#createTopic(topic)\n\t\t\t\t\tconst groupId = options.fanout\n\t\t\t\t\t\t? Instance.get().getScopedName(`${Instance.get().id}-fanout-${Random.string(10)}`)\n\t\t\t\t\t\t: topic\n\t\t\t\t\tconst consumer = this.#client.consumer({ kafkaJS: { groupId } })\n\n\t\t\t\t\tawait consumer.connect()\n\t\t\t\t\tawait consumer.subscribe({ topic })\n\n\t\t\t\t\tawait consumer.run({\n\t\t\t\t\t\teachMessage: async ({ message }) => {\n\t\t\t\t\t\t\tawait Instance.resolveBeforeCrash(async () => {\n\t\t\t\t\t\t\t\tif (!message.value) return\n\t\t\t\t\t\t\t\tawait onMessage(parseJSONValue(message.value.toString()))\n\t\t\t\t\t\t\t}).catch((error) =>\n\t\t\t\t\t\t\t\tInstance.crash(new EquippedError('Error processing kafka event', { topic, groupId, options }, error)),\n\t\t\t\t\t\t\t)\n\t\t\t\t\t\t},\n\t\t\t\t\t})\n\n\t\t\t\t\tif (options.fanout)\n\t\t\t\t\t\tInstance.on(\n\t\t\t\t\t\t\t'close',\n\t\t\t\t\t\t\tasync () => {\n\t\t\t\t\t\t\t\tawait consumer.disconnect()\n\t\t\t\t\t\t\t\tawait this.#deleteGroup(groupId)\n\t\t\t\t\t\t\t},\n\t\t\t\t\t\t\t10,\n\t\t\t\t\t\t)\n\t\t\t\t}\n\t\t\t\tInstance.on('start', subscribe, 2)\n\t\t\t},\n\t\t}\n\t}\n\n\tasync #getAdmin() {\n\t\tif (!this.#admin)\n\t\t\tthis.#admin = (async () => {\n\t\t\t\tconst admin = this.#client.admin()\n\t\t\t\tawait admin.connect()\n\t\t\t\treturn admin\n\t\t\t})()\n\t\treturn this.#admin\n\t}\n\n\tasync #createTopic(topic: string) {\n\t\tconst admin = await this.#getAdmin()\n\t\tawait admin.createTopics({ topics: [{ topic }], timeout: 5000 })\n\t}\n\n\tasync #deleteGroup(groupId: string) {\n\t\tconst admin = await this.#getAdmin()\n\t\tawait admin.deleteGroups([groupId]).catch(() => {})\n\t}\n}\n"]}