/**
 * Minified by jsDelivr using Terser v5.39.0.
 * Original file: /npm/bunnybus@7.0.0/lib/index.js
 *
 * Do NOT use SRI with dynamically generated files! More information: https://www.jsdelivr.com/using-sri-with-dynamic-files
 */
"use strict";const EventEmitter=require("events").EventEmitter,Helpers=require("./helpers"),Exceptions=require("./exceptions"),{ChannelManager:ChannelManager,ConnectionManager:ConnectionManager,SubscriptionManager:SubscriptionManager}=require("./states"),{SerialDispatcher:SerialDispatcher,ConcurrentDispatcher:ConcurrentDispatcher}=require("./schedulers"),{EventLogger:EventLogger}=require("./loggers"),internals={handlers:{}};let $;class BunnyBus extends EventEmitter{constructor(e){return $||(super(),$=this,$._state={recoveryLock:!1},$.logger=new EventLogger($),$._subscriptions=new SubscriptionManager,$._connections=new ConnectionManager,$._channels=new ChannelManager,$._dispatchers={serial:new SerialDispatcher,concurrent:new ConcurrentDispatcher},$._subscriptions.on(SubscriptionManager.BLOCKED_EVENT,internals.handlers[SubscriptionManager.BLOCKED_EVENT]),$._subscriptions.on(SubscriptionManager.UNBLOCKED_EVENT,internals.handlers[SubscriptionManager.UNBLOCKED_EVENT])),e&&($.config=e),$}static get DEFAULT_SERVER_CONFIGURATION(){return Helpers.defaultConfiguration.server}static get DEFAULT_QUEUE_CONFIGURATION(){return Helpers.defaultConfiguration.queue}static get DEFAULT_EXCHANGE_CONFIGURATION(){return Helpers.defaultConfiguration.exchange}static get LOG_DEBUG_EVENT(){return EventLogger.LOG_DEBUG_EVENT}static get LOG_INFO_EVENT(){return EventLogger.LOG_INFO_EVENT}static get LOG_WARN_EVENT(){return EventLogger.LOG_WARN_EVENT}static get LOG_ERROR_EVENT(){return EventLogger.LOG_ERROR_EVENT}static get LOG_FATAL_EVENT(){return EventLogger.LOG_FATAL_EVENT}static get PUBLISHED_EVENT(){return"bunnybus.published"}static get SUBSCRIBED_EVENT(){return"bunnybus.subscribed"}static get UNSUBSCRIBED_EVENT(){return"bunnybus.unsubscribed"}static get MESSAGE_DISPATCHED_EVENT(){return"bunnybus.message-dispatched"}static get MESSAGE_ACKED_EVENT(){return"bunnybus.message-acked"}static get MESSAGE_REJECTED_EVENT(){return"bunnybus.message-rejected"}static get MESSAGE_REQUEUED_EVENT(){return"bunnybus.message-requeued"}static get RECOVERING_CONNECTION_EVENT(){return"bunnybus.recovering-connection"}static get RECOVERED_CONNECTION_EVENT(){return"bunnybus.recovered-connection"}static get RECOVERING_CHANNEL_EVENT(){return"bunnybus.recovering-channel"}static get RECOVERED_CHANNEL_EVENT(){return"bunnybus.recovered-channel"}static get RECOVERY_FAILED_EVENT(){return"bunnybus.recovery-failed"}static get DEFAULT_CONNECTION_NAME(){return"default"}static MANAGEMENT_CHANNEL_NAME(){return"admin-channel"}static QUEUE_CHANNEL_NAME(e){return`send-${e}-channel`}static PUBLISH_CHANNEL_NAME(){return"publish-channel"}get config(){return $._config||BunnyBus.DEFAULT_SERVER_CONFIGURATION}set config(e){$._config=Object.assign({},$._config||BunnyBus.DEFAULT_SERVER_CONFIGURATION,e)}get subscriptions(){return $._subscriptions}get connections(){return $._connections}get channels(){return $._channels}get logger(){return $._logger}set logger(e){if(!Helpers.validateLoggerContract(e))throw new Exceptions.IncompatibleLoggerError;$._logger=e}get connectionString(){return Helpers.createConnectionString($.config)}async createExchange(e,n,a){const t=await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME());return await t.channel.assertExchange(e,n,Object.assign({},BunnyBus.DEFAULT_EXCHANGE_CONFIGURATION,a))}async deleteExchange(e,n){const a=await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME());return await a.channel.deleteExchange(e,n)}async checkExchange(e){let n=!1;const a=await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME()),t=new Promise((e=>{a.once(ChannelManager.AMQP_CHANNEL_CLOSE_EVENT,e)}));try{await a.channel.checkExchange(e),n=!0}catch(e){if(404!==e.code)throw e}try{await Helpers.timeoutAsync((async()=>await t),500)}catch(e){if("Timeout occurred"!==e.Message)throw e}finally{await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME())}return n}async createQueue(e,n){const a=await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME());return await a.channel.assertQueue(e,Object.assign({},BunnyBus.DEFAULT_QUEUE_CONFIGURATION,n))}async purgeQueue(e){let n=!1;const a=await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME()),t=new Promise((e=>{a.once(ChannelManager.AMQP_CHANNEL_CLOSE_EVENT,e)}));try{await a.channel.purgeQueue(e),n=!0}catch(e){if(404!==e.code)throw e}try{await Helpers.timeoutAsync((async()=>await t),500)}catch(e){if("Timeout occurred"!==e.Message)throw e}finally{await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME())}return n}async deleteQueue(e,n){const a=await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME());return await a.channel.deleteQueue(e,n)}async checkQueue(e){let n=!1;const a=await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME()),t=new Promise((e=>{a.once(ChannelManager.AMQP_CHANNEL_CLOSE_EVENT,e)}));try{await a.channel.checkQueue(e),n=!0}catch(e){if(404!==e.code)throw e}try{await Helpers.timeoutAsync((async()=>await t),500)}catch(e){if("Timeout occurred"!==e.Message)throw e}finally{await $._autoBuildChannelContext(BunnyBus.MANAGEMENT_CHANNEL_NAME())}return n}async send(e,n,a){const t=Helpers.reduceRouteKey(null,a,e),r=a&&a.source,s=Helpers.convertToBuffer(e),i=a&&a.transactionId?a.transactionId:Helpers.createTransactionId(),o=await $._autoBuildChannelContext(BunnyBus.QUEUE_CHANNEL_NAME(n),n);await $.createQueue(n);const c={transactionId:i,isBuffer:s.isBuffer,source:r,routeKey:t,createdAt:(new Date).toISOString(),bunnyBus:Helpers.getPackageData().version},u=Helpers.buildPublishOrSendOptions(a,c);o.channel.sendToQueue(n,s.buffer,u),await o.channel.waitForConfirms()}async get(e,n){const a=await $._autoBuildChannelContext(BunnyBus.QUEUE_CHANNEL_NAME(e),e);return await a.channel.get(e,n)}async getAll(e,n,a){const t=a&&a.get,r=a&&a.meta,s=BunnyBus.QUEUE_CHANNEL_NAME(e);let i=!0;do{const a=await $.get(e,t);if(a){const e=Helpers.parsePayload(a);r?await n(e.message,e.metaData,$._ack.bind(null,a,s)):await n(e.message,$._ack.bind(null,a,s))}else i=!1}while(i)}async publish(e,n){const a=n&&n.globalExchange||$.config.globalExchange,t=Helpers.reduceRouteKey(null,n,e),r=n&&n.source;if(!t)throw new Exceptions.NoRouteKeyError;const s=Helpers.convertToBuffer(e),i=n&&n.transactionId?n.transactionId:Helpers.createTransactionId(),o=await $._autoBuildChannelContext(BunnyBus.PUBLISH_CHANNEL_NAME());await $.createExchange(a,"topic",null);const c={transactionId:i,isBuffer:s.isBuffer,source:r,routeKey:t,createdAt:(new Date).toISOString(),bunnyBus:Helpers.getPackageData().version},u=Helpers.buildPublishOrSendOptions(n,c);await o.channel.publish(a,t,s.buffer,u),await o.channel.waitForConfirms(),$.emit(BunnyBus.PUBLISHED_EVENT,u,e)}async subscribe(e,n,a){const t=$.subscriptions;if(t.contains(e))throw new Exceptions.SubscriptionExistError(e);if(t.isBlocked(e))throw new Exceptions.SubscriptionBlockedError(e);t.create(e,n,a);const r=a&&a.queue?a.queue:null,s=a&&a.globalExchange||$.config.globalExchange,i=a&&a.maxRetryCount||$.config.maxRetryCount,o=a&&a.hasOwnProperty("validatePublisher")?a.validatePublisher:$.config.validatePublisher,c=a&&a.hasOwnProperty("validateVersion")?a.validateVersion:$.config.validateVersion,u=a&&a.hasOwnProperty("disableQueueBind")?a.disableQueueBind:$.config.disableQueueBind,E=a&&a.hasOwnProperty("rejectUnroutedMessages")?a.rejectUnroutedMessages:$.config.rejectUnroutedMessages,l=a&&a.hasOwnProperty("normalizeMessages")?a.normalizeMessages:$.config.normalizeMessages,N=a&&a.meta,_=BunnyBus.QUEUE_CHANNEL_NAME(e),g=await $._autoBuildChannelContext(_,e);await Promise.all([$.createQueue(e,r),$.createExchange(s,"topic",null)]),u||await Promise.all(Object.keys(n).map((n=>g.channel.bindQueue(e,s,n))));const C=await g.channel.consume(e,(async a=>{if(a){const t=Helpers.parsePayload(a,l),r=Helpers.reduceRouteKey(a,null,t.message),s=a.properties.headers.retryCount||-1,u=`${e}_error`,C=Helpers.handlerMatcher(n,r);if(C.length>0)if(!o||a.properties&&a.properties.headers&&a.properties.headers.bunnyBus)if(o&&c&&!Helpers.isMajorCompatible(a.properties.headers.bunnyBus)){const e=`message came from older bunnyBus version (${a.properties.headers.bunnyBus})`;$.logger.warn(e),await $._reject(a,_,u,{reason:e})}else if(s<i)C.forEach((async n=>{$._dispatchers[$.config.dispatchType].push(e,(async()=>{$.emit(BunnyBus.MESSAGE_DISPATCHED_EVENT,t.metaData,t.message),N?await n(t.message,t.metaData,$._ack.bind(null,a,_),$._reject.bind(null,a,_,u),$._requeue.bind(null,a,_,e,{routeKey:r})):await n(t.message,$._ack.bind(null,a,_),$._reject.bind(null,a,_,u),$._requeue.bind(null,a,_,e))}))}));else{const e=`message passed retry limit of ${i} for routeKey (${r})`;$.logger.warn(e),await $._reject(a,_,u,{reason:e})}else{const e="message not of BunnyBus origin";$.logger.warn(e),await $._reject(a,_,u,{reason:e})}else{const e=`message consumed with no matching routeKey (${r}) handler`;$.logger.warn(e),E?await $._reject(a,_,u,{reason:e}):await g.channel.ack(a)}}}));C&&C.consumerTag&&(t.tag(e,C.consumerTag),$.emit(BunnyBus.SUBSCRIBED_EVENT,e))}async unsubscribe(e){const n=await $._autoBuildChannelContext(BunnyBus.QUEUE_CHANNEL_NAME(e)),a=$.subscriptions;a.contains(e)&&(await n.channel.cancel(a.get(e).consumerTag),a.clear(e),$.emit(BunnyBus.UNSUBSCRIBED_EVENT,e))}async _ack(e,n,a){const t=await $._autoBuildChannelContext(n);await t.channel.ack(e);const r=Helpers.parsePayload(e);r.metaData.headers.ackedAt=(new Date).toISOString(),$.emit(BunnyBus.MESSAGE_ACKED_EVENT,r.metaData,r.message)}async _requeue(e,n,a,t){const r=await $._autoBuildChannelContext(n),s=Helpers.reduceRouteKey(e,t),i={transactionId:e.properties.headers.transactionId,isBuffer:e.properties.headers.isBuffer,source:e.properties.headers.source,createdAt:e.properties.headers.createdAt,requeuedAt:(new Date).toISOString(),retryCount:e.properties.headers.retryCount||0,bunnyBus:Helpers.getPackageData().version,routeKey:s},o=Helpers.buildPublishOrSendOptions(t,i);++o.headers.retryCount,r.channel.sendToQueue(a,e.content,o),await r.channel.waitForConfirms(),await r.channel.ack(e);const c=Helpers.parsePayload(e);c.metaData.headers=Object.assign(c.metaData.headers,i),$.emit(BunnyBus.MESSAGE_REQUEUED_EVENT,c.metaData,c.message)}async _reject(e,n,a,t){const r=await $._autoBuildChannelContext(n),s=a||$.config.server.errorQueue,i={transactionId:e.properties.headers.transactionId,isBuffer:e.properties.headers.isBuffer,source:e.properties.headers.source,createdAt:e.properties.headers.createdAt,requeuedAt:e.properties.headers.requeuedAt,erroredAt:(new Date).toISOString(),retryCount:e.properties.headers.retryCount||0,bunnyBus:Helpers.getPackageData().version,reason:t&&t.reason},o=Helpers.buildPublishOrSendOptions(t,i);await $.createQueue(s),r.channel.sendToQueue(s,e.content,o),await r.channel.waitForConfirms(),await r.channel.ack(e);const c=Helpers.parsePayload(e);c.metaData.headers=Object.assign(c.metaData.headers,i),$.emit(BunnyBus.MESSAGE_REJECTED_EVENT,c.metaData,c.message)}async _autoBuildChannelContext(e,n=null){let a,t=$.channels.get(e);return t&&t.channel&&t.connectionContext.connection||(a=await $.connections.create(BunnyBus.DEFAULT_CONNECTION_NAME,$.config),a.removeListener(ConnectionManager.AMQP_CONNECTION_CLOSE_EVENT,internals.handlers[ConnectionManager.AMQP_CONNECTION_CLOSE_EVENT]).on(ConnectionManager.AMQP_CONNECTION_CLOSE_EVENT,internals.handlers[ConnectionManager.AMQP_CONNECTION_CLOSE_EVENT]).removeListener(ConnectionManager.AMQP_CONNECTION_ERROR_EVENT,internals.handlers[ConnectionManager.AMQP_CONNECTION_ERROR_EVENT]).on(ConnectionManager.AMQP_CONNECTION_ERROR_EVENT,internals.handlers[ConnectionManager.AMQP_CONNECTION_ERROR_EVENT]),t=await $.channels.create(e,n,a,$.config),t.removeListener(ChannelManager.AMQP_CHANNEL_CLOSE_EVENT,internals.handlers[ChannelManager.AMQP_CHANNEL_CLOSE_EVENT]).on(ChannelManager.AMQP_CHANNEL_CLOSE_EVENT,internals.handlers[ChannelManager.AMQP_CHANNEL_CLOSE_EVENT]).removeListener(ChannelManager.AMQP_CHANNEL_ERROR_EVENT,internals.handlers[ChannelManager.AMQP_CHANNEL_ERROR_EVENT]).on(ChannelManager.AMQP_CHANNEL_ERROR_EVENT,internals.handlers[ChannelManager.AMQP_CHANNEL_ERROR_EVENT])),t}async _recoverConnection(){for(const{name:e,queue:n}of $.channels.list())n&&await $._recoverChannel(e)}async _recoverChannel(e){if(!$._state.recoveryLock){$._state.recoveryLock=!0;try{const{queue:n}=$.channels.get(e);if(n&&$.subscriptions.contains(n,!1)){const{handlers:e,options:a}=$.subscriptions.get(n);$.subscriptions.isBlocked(n)||($.subscriptions.clear(n),await $.subscribe(n,e,a))}}catch(e){throw e.bunnyBusMessage="failed to recover, exiting process",$.logger.fatal(e),$.emit(BunnyBus.RECOVERY_FAILED_EVENT,e),e}finally{$._state.recoveryLock=!1}}}}internals.handlers[SubscriptionManager.BLOCKED_EVENT]=async e=>{$.logger.info(`blocking queue ${e}`);try{await $.unsubscribe(e)}catch(e){e.bunnyBusMessage="blocked event handling failed",$.logger.error(e)}},internals.handlers[SubscriptionManager.UNBLOCKED_EVENT]=async e=>{const n=$._subscriptions.get(e);$.logger.info(`unblocking queue ${e}`);try{await $.subscribe(e,n.handlers,n.options)}catch(e){e.bunnyBusMessage="unblocked event handling failed",$.logger.error(e)}},internals.handlers[ConnectionManager.AMQP_CONNECTION_ERROR_EVENT]=(e,n)=>{e.bunnyBusMessage="connection error",e.context=n,$.logger.error(e)},internals.handlers[ConnectionManager.AMQP_CONNECTION_CLOSE_EVENT]=async e=>{$.logger.info(`${e.name} connection closed`);try{$.emit(BunnyBus.RECOVERING_CONNECTION_EVENT,e.name),await $._recoverConnection(),$.emit(BunnyBus.RECOVERED_CONNECTION_EVENT,e.name)}catch(e){}},internals.handlers[ChannelManager.AMQP_CHANNEL_ERROR_EVENT]=(e,n)=>{e.bunnyBusMessage="channel errored",e.context=n,$.logger.error(e)},internals.handlers[ChannelManager.AMQP_CHANNEL_CLOSE_EVENT]=async e=>{$.logger.info(`${e.name} channel closed`);try{$.emit(BunnyBus.RECOVERING_CHANNEL_EVENT,e.name),await $._recoverChannel(e.name),$.emit(BunnyBus.RECOVERED_CHANNEL_EVENT,e.name)}catch(e){}},module.exports=BunnyBus;
//# sourceMappingURL=/sm/36cf491ec0dfd2b49b4789da4fa7a66c54c4f38f4f6eb9aa98214524edc1f739.map