{"version":3,"sources":["../../../../src/events/types/rabbitmq.ts"],"names":["config","#client","channel","#columnName","topicName","options","data","topic","onMessage","queueName","Instance","Random"],"mappings":"AACA,yIAAwB,2DAKf,6DACoC,6CAOR,MAC7B,EACN,QAAmBA,oBAAAA,CAAO,CAAA,CAAA,CAAA,CAAA,CAAA,CAAA,WAC1B,CAAA,CAAA,CAAA,CAAKC,KAAkB,CAACD,CAAAA,CAAO,IAAI,CAAA,CAAE,CAAA,CAAA,CAAA,CAAA,eACpC,CAAM,IACN,CAAA,CAAA,CAAA,CAAA,4CAAO,CAAA,CAAA,CAAA,GAAOE,CAAAA,CAA4B,CACzC,aAAc,CAAA,CAAA,IAAA,CAAA,CAAA,CAAA,CAAe,KAAKC,CAAAA,MAAa,CAAA,EAAA,CAAA,MAAY,CAAA,CAAA,cACrDD,CAAQ,IAAA,CAAA,CAAA,CAAA,CAAA,QAKjB,CAAA,CAAA,OAAA,CAAA,CAAA,CAAiDE,CAAAA,CAA2BC,CAAAA,MAC3E,CAAA,CAAA,QAAsB,CAAA,CAAA,CAAA,CAAA,CAAA,CAAA,CAAA,YAAuC,CAAA,CAAA,CAAA,CAAA,CAAA,CAAA,CAAA,CAAA,CAAA,MAAuB,CAAA,CACpF,CAAA,CAAA,SACC,CAAA,CAAA,CAAA,qBAAS,CAAA,GAAA,CAAA,CAAOC,CAAAA,aACJL,CAAAA,CAAAA,CAAQ,CAAA,MAAA,CAAQ,OAAKE,CAAaI,MAAO,CAAK,EAAA,MAAA,IAAc,CAAG,CAAE,CAAA,CAAA,OAAA,CAAA,IAAkB,CAAA,CAC/F,CAAA,CAAA,CAAA,CAAA,IAAA,CAAA,SAAkBC,CAAsD,CACvE,CAAA,CAAA,CAAA,UAAkB,CAAA,CAAA,CAAA,CAAY,CAC7B,CAAA,SAAM,CAAKP,MAAQ,CAAA,EAAA,CAAA,MAAS,CAAA,CAAOC,KAClC,CAAA,CAAA,EAAA,CAAMO,MAAoB,IAAA,CACvBC,CAAAA,CAAS,CAAA,QAAM,CAAA,MAAA,CAAA,EAAA,CAAc,MAAY,CAAA,CAAA,CAAI,CAAA,MAAI,CAAA,qBAAA,CAAA,GAAA,CAAA,CAAWC,CAAAA,aAAmB,CAAA,CAC/EJ,EAAAA","file":"/home/runner/work/equipped/equipped/dist/cjs/events/types/rabbitmq.min.cjs","sourcesContent":["import type { ChannelWrapper } from 'amqp-connection-manager'\nimport { connect } from 'amqp-connection-manager'\nimport type { ConfirmChannel } from 'amqplib'\n\nimport { Instance } from '../../instance'\nimport type { Events } from '../../types'\nimport { Random, parseJSONValue } from '../../utilities'\nimport { EventBus, type StreamOptions } from '../base'\nimport type { RabbitMQConfig } from '../pipes'\n\nexport class RabbitMQEventBus extends EventBus {\n\t#client: ChannelWrapper\n\t#columnName: string\n\n\tconstructor(config: RabbitMQConfig) {\n\t\tsuper()\n\t\tthis.#columnName = config.eventColumnName\n\t\tthis.#client = connect([config.uri]).createChannel({\n\t\t\tjson: false,\n\t\t\tsetup: async (channel: ConfirmChannel) => {\n\t\t\t\tawait channel.assertExchange(this.#columnName, 'direct', { durable: true })\n\t\t\t\tawait channel.prefetch(1)\n\t\t\t},\n\t\t})\n\t}\n\n\tcreateStream<Event extends Events[keyof Events]>(topicName: Event['topic'], options: Partial<StreamOptions> = {}) {\n\t\tconst topic = options.skipScope ? topicName : Instance.get().getScopedName(topicName)\n\t\treturn {\n\t\t\tpublish: async (data: Event['data']) =>\n\t\t\t\tawait this.#client.publish(this.#columnName, topic, JSON.stringify(data), { persistent: true }),\n\t\t\tsubscribe: async (onMessage: (data: Event['data']) => Promise<void>) => {\n\t\t\t\tconst subscribe = async () => {\n\t\t\t\t\tawait this.#client.addSetup(async (channel: ConfirmChannel) => {\n\t\t\t\t\t\tconst queueName = options.fanout\n\t\t\t\t\t\t\t? Instance.get().getScopedName(`${Instance.get().id}-fanout-${Random.string(10)}`)\n\t\t\t\t\t\t\t: topic\n\t\t\t\t\t\tconst { queue } = await channel.assertQueue(queueName, { durable: !options.fanout, exclusive: options.fanout })\n\t\t\t\t\t\tawait channel.bindQueue(queue, this.#columnName, topic)\n\t\t\t\t\t\tchannel.consume(\n\t\t\t\t\t\t\tqueue,\n\t\t\t\t\t\t\tasync (msg) => {\n\t\t\t\t\t\t\t\tawait Instance.resolveBeforeCrash(async () => {\n\t\t\t\t\t\t\t\t\tif (!msg) return\n\t\t\t\t\t\t\t\t\ttry {\n\t\t\t\t\t\t\t\t\t\tawait onMessage(parseJSONValue(msg.content.toString()))\n\t\t\t\t\t\t\t\t\t\tchannel.ack(msg)\n\t\t\t\t\t\t\t\t\t} catch {\n\t\t\t\t\t\t\t\t\t\tchannel.nack(msg)\n\t\t\t\t\t\t\t\t\t}\n\t\t\t\t\t\t\t\t})\n\t\t\t\t\t\t\t},\n\t\t\t\t\t\t\t{ noAck: false },\n\t\t\t\t\t\t)\n\t\t\t\t\t})\n\t\t\t\t}\n\n\t\t\t\tInstance.on('start', subscribe, 2)\n\t\t\t},\n\t\t}\n\t}\n}\n"]}