///
export = Broker;
declare namespace Broker {
// Connection management
export function addConnection(options: ConnectionOptions): void;
export function close(
connectionName?: string,
reset?: boolean
): Promise;
export function closeAll(reset?: boolean): Promise;
export function retry(): Promise;
export function shutdown(): Promise;
// Managing topology
export function configure(options: ConfigurationOptions): Promise;
export function addExchange(
exchangeName: string,
exchangeType: ExchangeType,
options?: Omit,
connectionName?: string
): Promise;
export function addQueue(
queueName: string,
options?: Omit,
connectionName?: string
): Promise;
export function bindExchange(
sourceExchange: string,
targetExchange: string,
routingKeys?: string,
connectionName?: string
): Promise;
export function bindQueue(
sourceExchange: string,
targetQueue: string,
routingKeys?: string | string[],
connectionName?: string
): Promise;
export function purgeQueue(
queueName: string,
connectionName?: string
): Promise;
// Publishing
export function publish(
exchangeName: string,
options: PublishOptions,
connectionName?: string
): Promise;
export function request(
exchangeName: string,
options: PublishOptions,
connectionName?: string
): Promise>;
export function bulkPublish(
set:
| BulkPublishSet
| Array>,
connectionName?: string
): Promise;
// Receiving
export function handle(
options: HandlerOptions,
handler: (message: Message) => any
): Promise;
export function handle(
typeName: string,
handler: (message: Message) => any,
queueName?: string,
context?: string
): Promise;
export function startSubscription(
queueName: string,
exclusive?: boolean,
connectionName?: string
): void;
export function stopSubscription(): void;
// Custom serializers
export function serialize(object: any): Buffer;
export function deserialize(bytes: Buffer, encoding: string): any;
export function addSerializer(
contentType: string,
serializer: {
deserialize: (bytes: Buffer, encoding: string) => any;
serialize: (object: any) => any;
}
): void;
// Event handler
export function on(event: string, handler: (...args: any[]) => void): any;
/** Remove all event subscriptions */
export function off(): void;
/** Emit an event*/
export function emit(topic: string, data?: any, timestamp?: Date): void;
// Unhandled messages
export function onUnhandled(handler: (msg: Message) => void): void;
export function nackUnhandled(handler: (msg: Message) => void): void;
export function rejectUnhandled(handler: (msg: Message) => void): void;
export function onReturned(handler: (msg: Message) => void): void;
// Undocumented
export function reset(): void;
export function setAckInterval(interval: number): void;
export function clearAckInterval(): void;
export function nackOnError(): void;
export function ignoreHandlerErrors(): void;
export function getExchange(name: string, connectionName?: string): any;
export function batchAck(): void;
export function unbindExchange(
source: string,
target: string,
keys: string | string[],
connectionName?: string
): Promise;
export function unbindQueue(
source: string,
target: string,
keys: string | string[],
connectionName?: string
): Promise;
export function log(
loggers: Array<{
level: string;
stream: {
write(data: string): void;
};
}>
): void;
export const connections: Record;
export interface Message {
ack(): Promise;
nack(): Promise;
reject(): Promise;
reply(
message: ReplyBodyType,
options?: {
more: string;
replyType: string;
contentType: string;
headers: {
[key: string]: string;
};
}
): Promise;
fields: MessageFields;
properties: MessageProperties;
body: BodyType;
content: {
type: string;
data: Buffer;
};
type: string;
quarantine: boolean;
}
export interface MessageFields {
consumerTag: string;
deliveryTag: string;
redelivered: boolean;
exchange: string;
routingKey: string;
}
export interface MessageProperties {
contentType: string;
contentEncoding: string;
headers: {
[key: string]: any;
};
correlationId: string;
replyTo: string;
messageId: string;
type: string;
appId: string;
}
export type ExchangeType = "fanout" | "topic" | "direct";
export interface ConfigurationOptions {
connection: ConnectionOptions;
exchanges?: Array;
queues?: Array;
bindings?: Array;
}
export interface ConnectionOptions {
uri?: string;
name?: string;
host?: string;
port?: number;
server?: string | string[];
vhost?: string;
protocol?: "amqp" | "amqps";
user?: string;
pass?: string;
timeout?: number;
heartbeat?: number;
frameMax?: number;
replyQueue?:
| boolean
| string
| {
name: string;
autoDelete?: boolean;
subscribe?: boolean;
};
publishTimeout?: number;
replyTimeout?: number;
failAfter?: number;
retryLimit?: number;
waitMin?: number;
waitMax?: number;
waitIncrement?: number;
clientProperties?: any;
caPath?: string;
certPath?: string;
keyPath?: string;
passphrase?: string;
pfxPath?: string;
}
export interface QueueOptions {
name: string;
limit?: number;
queueLimit?: number;
queueVersion?: 1 | 2;
deadLetter?: string;
deadLetterRoutingKey?: string;
deadLetterStrategy?: "at-most-once" | "at-least-once";
subscribe?: boolean;
autoDelete?: boolean;
passive?: boolean;
messageTtl?: number;
type?: "classic" | "quorum";
overflow?: "drop-head" | "reject-publish";
}
export interface BindingOptions {
exchange: string;
target: string;
keys?: string | string[];
}
export interface ExchangeOptions {
name: string;
type: ExchangeType;
publishTimeout?: number;
alternate?: string;
persistent?: boolean;
durable?: boolean;
}
export interface PublishOptions {
routingKey?: string;
type?: string;
correlationId?: string;
contentType?: string;
body?: MessageBodyType;
messageId?: string;
expiresAfter?: number;
timestamp?: number;
mandatory?: boolean;
persistent?: boolean;
headers?: {
[key: string]: string;
};
timeout?: number;
}
export interface BulkPublishSet {
[exchangeName: string]: Array>;
}
export interface HandlerOptions {
queue: string;
type: string;
autoNack?: boolean;
context?: any;
handler?(msg: Message): any;
}
export interface Handler {
(msg: Message): Promise;
remove(): void;
catch(errorHandler: (err: any, msg: Message) => void): void;
}
}