{"version":3,"sources":["../../../../src/events/types/rabbitmq.ts","/home/runner/work/equipped/equipped/dist/cjs/events/types/rabbitmq.cjs"],"names":[],"mappings":"AACA,yIAAwB;AAGxB,oDAAyB;AAEzB,sDAAuC;AACvC,sCAA6C;AAGtC,MAAM,iBAAA,QAAyB,kBAAS;AAAA,EAC9C,CAAA,MAAA;AAAA,EACA,CAAA,UAAA;AAAA,EAEA,WAAA,CAAY,MAAA,EAAwB;AACnC,IAAA,KAAA,CAAM,CAAA;AACN,IAAA,IAAA,CAAK,CAAA,WAAA,EAAc,MAAA,CAAO,eAAA;AAC1B,IAAA,IAAA,CAAK,CAAA,OAAA,EAAU,4CAAA,CAAS,MAAA,CAAO,GAAG,CAAC,CAAA,CAAE,aAAA,CAAc;AAAA,MAClD,IAAA,EAAM,KAAA;AAAA,MACN,KAAA,EAAO,MAAA,CAAO,OAAA,EAAA,GAA4B;AACzC,QAAA,MAAM,OAAA,CAAQ,cAAA,CAAe,IAAA,CAAK,CAAA,UAAA,EAAa,QAAA,EAAU,EAAE,OAAA,EAAS,KAAK,CAAC,CAAA;AAC1E,QAAA,MAAM,OAAA,CAAQ,QAAA,CAAS,CAAC,CAAA;AAAA,MACzB;AAAA,IACD,CAAC,CAAA;AAAA,EACF;AAAA,EAEA,YAAA,CAAiD,SAAA,EAA2B,QAAA,EAAkC,CAAC,CAAA,EAAG;AACjH,IAAA,MAAM,MAAA,EAAQ,OAAA,CAAQ,UAAA,EAAY,UAAA,EAAY,kBAAA,CAAS,GAAA,CAAI,CAAA,CAAE,aAAA,CAAc,SAAS,CAAA;AACpF,IAAA,OAAO;AAAA,MACN,OAAA,EAAS,MAAA,CAAO,IAAA,EAAA,GACf,MAAM,IAAA,CAAK,CAAA,MAAA,CAAQ,OAAA,CAAQ,IAAA,CAAK,CAAA,UAAA,EAAa,KAAA,EAAO,IAAA,CAAK,SAAA,CAAU,IAAI,CAAA,EAAG,EAAE,UAAA,EAAY,KAAK,CAAC,CAAA;AAAA,MAC/F,SAAA,EAAW,MAAA,CAAO,SAAA,EAAA,GAAsD;AACvE,QAAA,MAAM,UAAA,EAAY,MAAA,CAAA,EAAA,GAAY;AAC7B,UAAA,MAAM,IAAA,CAAK,CAAA,MAAA,CAAQ,QAAA,CAAS,MAAA,CAAO,OAAA,EAAA,GAA4B;AAC9D,YAAA,MAAM,UAAA,EAAY,OAAA,CAAQ,OAAA,EACvB,kBAAA,CAAS,GAAA,CAAI,CAAA,CAAE,aAAA,CAAc,CAAA,EAAA;AAEyB,YAAA;AACH,YAAA;AAC9C,YAAA;AACP,cAAA;AACe,cAAA;AACgC,gBAAA;AACnC,kBAAA;AACN,kBAAA;AACmD,oBAAA;AACvC,oBAAA;AACR,kBAAA;AACS,oBAAA;AACjB,kBAAA;AACA,gBAAA;AACF,cAAA;AACe,cAAA;AAChB,YAAA;AACA,UAAA;AACF,QAAA;AAEiC,QAAA;AAClC,MAAA;AACD,IAAA;AACD,EAAA;AACD;ACX6E;AACA;AACA","file":"/home/runner/work/equipped/equipped/dist/cjs/events/types/rabbitmq.cjs","sourcesContent":["import type { ChannelWrapper } from 'amqp-connection-manager'\nimport { connect } from 'amqp-connection-manager'\nimport type { ConfirmChannel } from 'amqplib'\n\nimport { Instance } from '../../instance'\nimport type { Events } from '../../types'\nimport { Random, parseJSONValue } from '../../utilities'\nimport { EventBus, type StreamOptions } from '../base'\nimport type { RabbitMQConfig } from '../pipes'\n\nexport class RabbitMQEventBus extends EventBus {\n\t#client: ChannelWrapper\n\t#columnName: string\n\n\tconstructor(config: RabbitMQConfig) {\n\t\tsuper()\n\t\tthis.#columnName = config.eventColumnName\n\t\tthis.#client = connect([config.uri]).createChannel({\n\t\t\tjson: false,\n\t\t\tsetup: async (channel: ConfirmChannel) => {\n\t\t\t\tawait channel.assertExchange(this.#columnName, 'direct', { durable: true })\n\t\t\t\tawait channel.prefetch(1)\n\t\t\t},\n\t\t})\n\t}\n\n\tcreateStream<Event extends Events[keyof Events]>(topicName: Event['topic'], options: Partial<StreamOptions> = {}) {\n\t\tconst topic = options.skipScope ? topicName : Instance.get().getScopedName(topicName)\n\t\treturn {\n\t\t\tpublish: async (data: Event['data']) =>\n\t\t\t\tawait this.#client.publish(this.#columnName, topic, JSON.stringify(data), { persistent: true }),\n\t\t\tsubscribe: async (onMessage: (data: Event['data']) => Promise<void>) => {\n\t\t\t\tconst subscribe = async () => {\n\t\t\t\t\tawait this.#client.addSetup(async (channel: ConfirmChannel) => {\n\t\t\t\t\t\tconst queueName = options.fanout\n\t\t\t\t\t\t\t? Instance.get().getScopedName(`${Instance.get().id}-fanout-${Random.string(10)}`)\n\t\t\t\t\t\t\t: topic\n\t\t\t\t\t\tconst { queue } = await channel.assertQueue(queueName, { durable: !options.fanout, exclusive: options.fanout })\n\t\t\t\t\t\tawait channel.bindQueue(queue, this.#columnName, topic)\n\t\t\t\t\t\tchannel.consume(\n\t\t\t\t\t\t\tqueue,\n\t\t\t\t\t\t\tasync (msg) => {\n\t\t\t\t\t\t\t\tawait Instance.resolveBeforeCrash(async () => {\n\t\t\t\t\t\t\t\t\tif (!msg) return\n\t\t\t\t\t\t\t\t\ttry {\n\t\t\t\t\t\t\t\t\t\tawait onMessage(parseJSONValue(msg.content.toString()))\n\t\t\t\t\t\t\t\t\t\tchannel.ack(msg)\n\t\t\t\t\t\t\t\t\t} catch {\n\t\t\t\t\t\t\t\t\t\tchannel.nack(msg)\n\t\t\t\t\t\t\t\t\t}\n\t\t\t\t\t\t\t\t})\n\t\t\t\t\t\t\t},\n\t\t\t\t\t\t\t{ noAck: false },\n\t\t\t\t\t\t)\n\t\t\t\t\t})\n\t\t\t\t}\n\n\t\t\t\tInstance.on('start', subscribe, 2)\n\t\t\t},\n\t\t}\n\t}\n}\n",null]}