{"version":3,"sources":["../../../../src/events/types/kafka.ts","/home/runner/work/equipped/equipped/dist/cjs/events/types/kafka.cjs"],"names":[],"mappings":"AAAA,0IAAwB;AAExB,kDAA8B;AAC9B,qDAAyB;AAEzB,sDAAuC;AACvC,sCAA6C;AAGtC,MAAM,cAAA,QAAsB,kBAAS;AAAA,EAC3C,CAAA,MAAA;AAAA,EACA,CAAA,KAAA;AAAA,EACA,WAAA,CAAY,MAAA,EAAqB;AAChC,IAAA,KAAA,CAAM,CAAA;AACN,IAAA,IAAA,CAAK,CAAA,OAAA,EAAU,IAAI,wBAAA,CAAQ,KAAA,CAAM;AAAA,MAChC,OAAA,EAAS,EAAE,GAAG,MAAA,EAAQ,QAAA,EAAU,wBAAA,CAAQ,QAAA,CAAS,QAAQ;AAAA,IAC1D,CAAC,CAAA;AAAA,EACF;AAAA,EAEA,YAAA,CAAiD,SAAA,EAA2B,QAAA,EAAkC,CAAC,CAAA,EAAG;AACjH,IAAA,MAAM,MAAA,EAAQ,OAAA,CAAQ,UAAA,EAAY,UAAA,EAAY,mBAAA,CAAS,GAAA,CAAI,CAAA,CAAE,aAAA,CAAc,SAAS,CAAA;AACpF,IAAA,OAAO;AAAA,MACN,OAAA,EAAS,MAAA,CAAO,IAAA,EAAA,GAAwB;AACvC,QAAA,MAAM,SAAA,EAAW,IAAA,CAAK,CAAA,MAAA,CAAQ,QAAA,CAAS,CAAA;AACvC,QAAA,MAAM,QAAA,CAAS,OAAA,CAAQ,CAAA;AACvB,QAAA,MAAM,QAAA,CAAS,IAAA,CAAK;AAAA,UACnB,KAAA;AAAA,UACA,QAAA,EAAU,CAAC,EAAE,KAAA,EAAO,IAAA,CAAK,SAAA,CAAU,IAAI,EAAE,CAAC;AAAA,QAC3C,CAAC,CAAA;AACD,QAAA,OAAO,IAAA;AAAA,MACR,CAAA;AAAA,MACA,SAAA,EAAW,CAAC,SAAA,EAAA,GAAsD;AACjE,QAAA,MAAM,UAAA,EAAY,MAAA,CAAA,EAAA,GAAY;AAC7B,UAAA,MAAM,IAAA,CAAK,CAAA,WAAA,CAAa,KAAK,CAAA;AAC7B,UAAA,MAAM,QAAA,EAAU,OAAA,CAAQ,OAAA,EACrB,mBAAA,CAAS,GAAA,CAAI,CAAA,CAAE,aAAA,CAAc,CAAA,EAAA;AAE+B,UAAA;AAExC,UAAA;AACW,UAAA;AAEf,UAAA;AACkB,YAAA;AACW,cAAA;AACzB,gBAAA;AACoC,gBAAA;AACtD,cAAA;AAC+B,gBAAA;AAClC,cAAA;AACD,YAAA;AACA,UAAA;AAEW,UAAA;AACF,YAAA;AACR,cAAA;AACY,cAAA;AACe,gBAAA;AACK,gBAAA;AAChC,cAAA;AACA,cAAA;AACD,YAAA;AACF,QAAA;AACiC,QAAA;AAClC,MAAA;AACD,IAAA;AACD,EAAA;AAEkB,EAAA;AACP,IAAA;AACkB,MAAA;AACO,QAAA;AACb,QAAA;AACb,QAAA;AACL,MAAA;AACQ,IAAA;AACb,EAAA;AAEkC,EAAA;AACE,IAAA;AAC4B,IAAA;AAChE,EAAA;AAEoC,EAAA;AACA,IAAA;AACa,IAAA;AAAE,IAAA;AACnD,EAAA;AACD;ACXyE;AACA;AACA","file":"/home/runner/work/equipped/equipped/dist/cjs/events/types/kafka.cjs","sourcesContent":["import { KafkaJS } from '@confluentinc/kafka-javascript'\n\nimport { EquippedError } from '../../errors'\nimport { Instance } from '../../instance'\nimport type { Events } from '../../types'\nimport { Random, parseJSONValue } from '../../utilities'\nimport { EventBus, type StreamOptions } from '../base'\nimport type { KafkaConfig } from '../pipes'\n\nexport class KafkaEventBus extends EventBus {\n\t#client: KafkaJS.Kafka\n\t#admin: Promise<KafkaJS.Admin> | undefined\n\tconstructor(config: KafkaConfig) {\n\t\tsuper()\n\t\tthis.#client = new KafkaJS.Kafka({\n\t\t\tkafkaJS: { ...config, logLevel: KafkaJS.logLevel.NOTHING },\n\t\t})\n\t}\n\n\tcreateStream<Event extends Events[keyof Events]>(topicName: Event['topic'], options: Partial<StreamOptions> = {}) {\n\t\tconst topic = options.skipScope ? topicName : Instance.get().getScopedName(topicName)\n\t\treturn {\n\t\t\tpublish: async (data: Event['data']) => {\n\t\t\t\tconst producer = this.#client.producer()\n\t\t\t\tawait producer.connect()\n\t\t\t\tawait producer.send({\n\t\t\t\t\ttopic,\n\t\t\t\t\tmessages: [{ value: JSON.stringify(data) }],\n\t\t\t\t})\n\t\t\t\treturn true\n\t\t\t},\n\t\t\tsubscribe: (onMessage: (data: Event['data']) => Promise<void>) => {\n\t\t\t\tconst subscribe = async () => {\n\t\t\t\t\tawait this.#createTopic(topic)\n\t\t\t\t\tconst groupId = options.fanout\n\t\t\t\t\t\t? Instance.get().getScopedName(`${Instance.get().id}-fanout-${Random.string(10)}`)\n\t\t\t\t\t\t: topic\n\t\t\t\t\tconst consumer = this.#client.consumer({ kafkaJS: { groupId } })\n\n\t\t\t\t\tawait consumer.connect()\n\t\t\t\t\tawait consumer.subscribe({ topic })\n\n\t\t\t\t\tawait consumer.run({\n\t\t\t\t\t\teachMessage: async ({ message }) => {\n\t\t\t\t\t\t\tawait Instance.resolveBeforeCrash(async () => {\n\t\t\t\t\t\t\t\tif (!message.value) return\n\t\t\t\t\t\t\t\tawait onMessage(parseJSONValue(message.value.toString()))\n\t\t\t\t\t\t\t}).catch((error) =>\n\t\t\t\t\t\t\t\tInstance.crash(new EquippedError('Error processing kafka event', { topic, groupId, options }, error)),\n\t\t\t\t\t\t\t)\n\t\t\t\t\t\t},\n\t\t\t\t\t})\n\n\t\t\t\t\tif (options.fanout)\n\t\t\t\t\t\tInstance.on(\n\t\t\t\t\t\t\t'close',\n\t\t\t\t\t\t\tasync () => {\n\t\t\t\t\t\t\t\tawait consumer.disconnect()\n\t\t\t\t\t\t\t\tawait this.#deleteGroup(groupId)\n\t\t\t\t\t\t\t},\n\t\t\t\t\t\t\t10,\n\t\t\t\t\t\t)\n\t\t\t\t}\n\t\t\t\tInstance.on('start', subscribe, 2)\n\t\t\t},\n\t\t}\n\t}\n\n\tasync #getAdmin() {\n\t\tif (!this.#admin)\n\t\t\tthis.#admin = (async () => {\n\t\t\t\tconst admin = this.#client.admin()\n\t\t\t\tawait admin.connect()\n\t\t\t\treturn admin\n\t\t\t})()\n\t\treturn this.#admin\n\t}\n\n\tasync #createTopic(topic: string) {\n\t\tconst admin = await this.#getAdmin()\n\t\tawait admin.createTopics({ topics: [{ topic }], timeout: 5000 })\n\t}\n\n\tasync #deleteGroup(groupId: string) {\n\t\tconst admin = await this.#getAdmin()\n\t\tawait admin.deleteGroups([groupId]).catch(() => {})\n\t}\n}\n",null]}