var amqp = require('amqplib/callback_api'); var util = require('./rabbit.util.js'); var Q = require('q'); var logger = {}; var registeredHandlers = {}; /** * The configuration object that must be passed for an amqp connection string to be properly built * @typedef {Object} customLogger * @property {function} error - custom implementation of customLogger.error * @property {function} info - custom implementation of customLogger.info * @property {function} debug - custom implementation of customLogger.debug * @property {function} fatal - custom implementation of customLogger.fatal * @property {function} trace - custom implementation of customLogger.trace * @property {function} warn - custom implementation of customLogger.warn */ /** * Creates a new Listener instance * @constructor * @param {customLogger} [customLogger = require('./loggerService.js')] - A custom logger object * @example * var subscriber = require('amqplib-lite'); * * // Custom logger passed in * let client = new RabbitClient(customLogObj); * client.handlers = handlers; // when a disconnect happens this handler property will be used to reconnect internally * client.connect(config).then((connection) => { * client.registerHandlers(handlers, connection); * }).catch(error => { * logger.error("Error occurred while bootstrapping queue handlers: ", error); * }); * * // No custom logger pass in * let client = new RabbitClient(); * client.handlers = handlers; // when a disconnect happens this handler property will be used to reconnect internally * client.connect(config).then((connection) => { * client.registerHandlers(handlers, connection); * }).catch(error => { * logger.error("Error occurred while bootstrapping queue handlers: ", error); * }); * */ function Connect(customLogger) { logger = customLogger || require('./loggerService.js'); } /** * The configuration object that must be passed for an amqp connection string to be properly built * @typedef {Object} RabbitHandler * @property {function} handlerFunction - The callback function that messages will be returned and processed on * @property {String} queueConfig - The queue that it will connect to ex "My.First.Queue" * @property {Number} messageRate - The amount of messages that can be received at a time. Once this amount of messages is ack more will come in (if available) */ /** * An array of RabbitHandlers, each rabbit handler has a configuration for a queue to connect to * @typedef {Array<RabbitHandler>} RabbitHandlers */ /** * The configuration object that must be passed for an amqp connection string to be properly built * @typedef {Object} RabbitConfiguration * @property {String} rabbitmqserver - RabbitMqServer string IP or Domain. * @property {Number} rabbitmqport - RabbitMqServer Port. * @property {String} rabbitmqusername - RabbitMqServer username. * @property {String} rabbitmqpassword - RabbitMqServer password. * @property {Number} rabbitheartbeat - optional, sets the client heartbeat with the server. Helps prevent TCP timeouts if rabbit server does not have heartbeat service enabled * @property {String} vhost - RabbitMqServer vhost. */ /** * Generates and processes a single amqp connection for channels to be opened on. * @memberof Listener * @param {RabbitHandlers} handlers - Array of callback handlers WITH configuration for those handlers, one handler per channel * @param {RabbitConfiguration} config - must pass a {@link RabbitConfiguration} object */ Connect.prototype.connect = function (config) { var context = this; return Q.ninvoke(amqp, "connect", util.buildRabbitMqUrl(config)).then(function (conn) { logger.info("Connection in progress..."); conn.on("error", function (err) { if (err.message !== "Connection closing") { console.error("[AMQP] conn error", err.message); logger.error("[AMQP] " + err.message); conn.close(); } }); conn.on("close", function () { console.error("[AMQP] reconnecting"); logger.error("[AMQP] reconnecting"); return setTimeout(function() { context.connect(config).then(function(conn){ context.registerHandlers(context.handlers, conn); }) }, 1000); }); console.log("[AMQP] connected"); logger.info("[AMQP] has connected successfully"); return conn; }).catch(function (err) { console.error("[AMQP]", err.message); logger.error("[AMQP] " + err.message); return setTimeout(function() { context.connect(config).then(function(conn){ context.registerHandlers(context.handlers, conn); }) }, 1000); }); }; /** * A Channel object, part of the amqplib. Search amqplib documentation for more information * @typedef {Object} Channel */ /** * Sets up a channel object to be used * @memberof Listener * @param {number} messageRate - number of messages that will be fetched at a time. server must receive ack before it will pass more. * @param {Connection} amqpConn - xxxxxx * @returns {Promise<Channel>} - channel object that can be used to request messages and response */ Connect.prototype.setUpListener = function(messageRate, amqpConn) { var context = this; return Q.ninvoke(amqpConn, 'createChannel').then(function (ch) { ch.on("error", function (err) { console.error("[AMQP] channel error", err); logger.error("[AMQP] channel error " + err); }); ch.on("close", function () { console.log("[AMQP] Channel closed"); logger.info("[AMQP] Channel closed"); context.registerHandlers(registeredHandlers,amqpConn); }); logger.info("[AMQP] Channel prefetch rate set to " + messageRate); ch.prefetch(messageRate); // limit the number of messages that are read to 1, once the server receives an acknowledgement back it will then send another return ch; }); } /** * This function should be fired when the main amqp connection has been fired * @memberof Listener * @param {array} handlers - Takes in an array of confuration settings to loop through and create queue connections for */ Connect.prototype.registerHandlers = function (handlers, amqpConn) { var context = this; logger.info("[AMQP] Beginning channel connections"); registeredHandlers = handlers; if(registeredHandlers){ console.log("[AMQP] Set handlers " ,JSON.stringify(registeredHandlers)); } registeredHandlers.forEach(function (handler) { logger.info("[AMQP] attempting queue listener handshake for " + handler.queueConfig); context.setUpListener(handler.messageRate, amqpConn) .then(function (ch) { logger.info("[AMQP] Success handshake complete, listening on " + handler.queueConfig); ch.consume(handler.queueConfig, handler.handlerFunction.bind(ch), {noAck: false}); }).catch(function (err) { if (err) { console.log(err); logger.fatal("[AMQP] " + err.message); } }); }); }; /** * Used to register new channels on connections that exist, it also checks that the publishing exchange is reachable * @param config * @param amqpConn */ Connect.prototype.registerPublisher = function(config, amqpConn){ return new Promise(function(resolve, reject) { logger.info("[AMQP] Beginning publisher connections"); logger.info("[AMQP] attempting publisher handshake for new channel to publish on " + config.publisherExchange); amqpConn.createChannel(function(err, ch) { if (err) { logger.error('no channel'); return reject(err); } ch.checkExchange(config.publisherExchange, function (err, ok) { if (err) { logger.error('[AMQP] error finding exchange ' + config.publisherExchange); } else { logger.info('[AMQP] success finding exchange ' + config.publisherExchange); resolve(ch); } }); }); }); }; module.exports = Connect;