import type { ChannelModel } from 'amqplib'; import type { IContainer } from 'node-cqrs'; import type { IMessage, IMessageMeta } from '../interfaces/index.ts'; import { EventEmitter } from 'events'; type MessageHandler = (m: IMessage, meta?: IMessageMeta) => Promise | unknown; /** * Represents a subscription to events from a RabbitMQ exchange. */ export type Subscription = { /** Name of the exchange to subscribe to */ exchange: string; /** Optional durable queue name; if omitted, an exclusive temporary queue is used */ queueName?: string; /** Specific event type (routing key) for filtering, defaults to all if omitted */ eventType?: string; /** Callback function to process received messages */ handler: MessageHandler; /** If true, messages originating from this instance are ignored */ ignoreOwn?: boolean; /** Optional limit for concurrent message handling */ concurrentLimit?: number; /** * If true, the broker won't expect an acknowledgement of messages delivered to this consumer; * i.e., it will dequeue messages as soon as they've been sent down the wire. * * Defaults to `false` - messages are acknowledged after successful handler completion or rejected on exception. */ noAck?: boolean; /** * Handler timeout in milliseconds; if the handler does not complete within this time, * the message is considered failed and is rejected. * * Defaults to 1h (`RabbitMqGateway.HANDLER_PROCESS_TIMEOUT`). * Set to `0` to disable the timeout entirely. */ handlerProcessTimeout?: number; /** * How long a durable queue may remain unused (no consumers, no new messages) before RabbitMQ * automatically deletes it, in milliseconds. * * When set, the `x-expires` argument is passed to RabbitMQ on queue assertion (requires RabbitMQ ≥ 3.10 * for quorum queues). Set to `0` or leave `undefined` to keep the queue indefinitely. */ queueExpires?: number; /** * Enables RabbitMQ single active consumer mode (`x-single-active-consumer`). */ singleActiveConsumer?: boolean; /** * Whether to create a dead letter queue for rejected or timed out messages. * Only applicable to durable queues (when `queueName` is set). * Defaults to `true` when `queueName` is set, `false` otherwise. */ deadLetterQueue?: boolean; /** * Queue-level message TTL in milliseconds (`x-message-ttl`). * Messages that remain in the queue longer than this duration are discarded (or dead-lettered). * Set to `0` or leave `undefined` to keep messages indefinitely. */ messageTtl?: number; }; export type SubscribeResult = { /** The actual queue name (may be auto-generated for exclusive queues) */ queue: string; /** True when a new consumer was created for the queue in this call */ consumerCreated: boolean; /** Number of ready messages in the queue at the time of subscription */ messageCount: number; /** Number of consumers on the queue at the time of subscription (before this one) */ consumerCount: number; }; interface RabbitMqGatewayConnected { get connection(): ChannelModel; } type GatewayEvents = { connected: []; disconnected: [reason?: string]; }; /** * RabbitMqGateway provides RabbitMQ-based publish/subscribe for ICommand/IEvent-style messages. * * It uses a fanout exchange to broadcast messages to all connected subscribers. * The `on` and `off` methods allow you to register and remove handlers for specific event types. * The `queue(name)` method creates or returns a durable queue with the given name, ensuring that * all messages delivered to the fanout exchange are also routed to this queue. */ export declare class RabbitMqGateway { #private; static HANDLER_PROCESS_TIMEOUT: number; static ALL_EVENTS_WILDCARD: string; static RECONNECT_DELAY: number; get connection(): ChannelModel | undefined; isConnected(): this is this & RabbitMqGatewayConnected; constructor({ rabbitMqConnectionFactory, rabbitMqAppId, eventEmitterFactory, logger, process, tracerFactory }: Partial> & { eventEmitterFactory?: () => EventEmitter; }); /** Returns this gateway's app id from `rabbitMqAppIdProvider` */ getAppId(): Promise; /** * Establishes a connection to RabbitMQ. * If a connection attempt is already in progress, it waits for it to complete. * If the connection is lost, it attempts to reconnect automatically. * Upon successful connection, it restores any previously active subscriptions. * * This method is called automatically by other methods if a connection is required but not yet established. * * @returns A promise that resolves with the ChannelModel representing the established connection. */ connect(): Promise; disconnect(): Promise; subscribeToQueue(exchange: string, queueName: string, handler: MessageHandler, options?: Omit): Promise; /** * Subscribes to a non-durable, exclusive queue without requiring acknowledgments. * The queue is deleted when the connection closes. * Messages are considered "delivered" upon receipt. * Failed message processing does not result in redelivery or dead-lettering. */ subscribeToFanout(exchange: string, handler: MessageHandler, options?: Omit): Promise; /** * Subscribes to events from a specified exchange. * * This method sets up the necessary RabbitMQ topology (exchange, queue, bindings) based on the provided details. * If a `queueName` is provided, it asserts a durable queue with a dead-letter queue for failed messages. * If `queueName` is omitted, it uses or creates a temporary, exclusive queue for the connection. * Then it starts consuming messages from the queue with the specified concurrency limit, if specified. * * @param subscription - The subscription details. * @param subscription.exchange - The name of the exchange to subscribe to. * @param subscription.queueName - Optional. The name of the durable queue. If omitted, an exclusive queue is used. * @param subscription.eventType - The routing key or pattern to bind the queue with. * @param subscription.concurrentLimit - Optional. The maximum number of concurrent messages to process. * @returns A promise that resolves when the subscription is successfully set up. */ subscribe(subscription: Subscription): Promise; unsubscribe(subscription: Pick): Promise; /** * Publishes an event to the fanout exchange. * The event will be delivered to all subscribers, except this instance's own consumer. */ publish(exchange: string, message: IMessage, meta?: IMessageMeta): Promise; on(event: K, fn: (...args: GatewayEvents[K]) => void): this; once(event: K, fn: (...args: GatewayEvents[K]) => void): this; off(event: K, fn: (...args: GatewayEvents[K]) => void): this; } export {};