Source: rabbit.client.js

var amqp = require('amqplib/callback_api');
var util = require('./rabbit.util.js');
var Q = require('q');
var logger = {};
var registeredHandlers = {};
/**
 * The configuration object that must be passed for an amqp connection string to be properly built
 * @typedef {Object} customLogger
 * @property {function} error - custom implementation of customLogger.error
 * @property {function} info - custom implementation of customLogger.info
 * @property {function} debug - custom implementation of customLogger.debug
 * @property {function} fatal - custom implementation of customLogger.fatal
 * @property {function} trace - custom implementation of customLogger.trace
 * @property {function} warn - custom implementation of customLogger.warn
 */

/**
 * Creates a new Listener instance
 * @constructor
 * @param {customLogger} [customLogger = require('./loggerService.js')] - A custom logger object
 * @example
 * var subscriber = require('amqplib-lite');
 *
 * // Custom logger passed in
 * let client = new RabbitClient(customLogObj);
 * client.handlers = handlers; // when a disconnect happens this handler property will be used to reconnect internally
 * client.connect(config).then((connection) => {
 *    client.registerHandlers(handlers, connection);
 * }).catch(error => {
 *   logger.error("Error occurred while bootstrapping queue handlers: ", error);
 * });
 *
 * // No custom logger pass in
 * let client = new RabbitClient();
 * client.handlers = handlers; // when a disconnect happens this handler property will be used to reconnect internally
 * client.connect(config).then((connection) => {
 *    client.registerHandlers(handlers, connection);
 * }).catch(error => {
 *   logger.error("Error occurred while bootstrapping queue handlers: ", error);
 * });
 *
 */
function Connect(customLogger) {
    logger = customLogger || require('./loggerService.js');

}

/**
 * The configuration object that must be passed for an amqp connection string to be properly built
 * @typedef {Object} RabbitHandler
 * @property {function} handlerFunction - The callback function that messages will be returned and processed on
 * @property {String} queueConfig - The queue that it will connect to ex "My.First.Queue"
 * @property {Number} messageRate - The amount of messages that can be received at a time. Once this amount of messages is ack more will come in (if available)
 */

/**
 * An array of RabbitHandlers, each rabbit handler has a configuration for a queue to connect to
 * @typedef {Array<RabbitHandler>} RabbitHandlers
 */

/**
 * The configuration object that must be passed for an amqp connection string to be properly built
 * @typedef {Object} RabbitConfiguration
 * @property {String} rabbitmqserver - RabbitMqServer string IP or Domain.
 * @property {Number} rabbitmqport - RabbitMqServer Port.
 * @property {String} rabbitmqusername - RabbitMqServer username.
 * @property {String} rabbitmqpassword - RabbitMqServer password.
 * @property {Number} rabbitheartbeat - optional, sets the client heartbeat with the server. Helps prevent TCP timeouts if rabbit server does not have heartbeat service enabled
 * @property {String} vhost - RabbitMqServer vhost.
 */

/**
 * Generates and processes a single amqp connection for channels to be opened on.
 * @memberof Listener
 * @param {RabbitHandlers} handlers - Array of callback handlers WITH configuration for those handlers, one handler per channel
 * @param {RabbitConfiguration} config - must pass a {@link RabbitConfiguration} object
 */

Connect.prototype.connect = function (config) {
    var context = this;

    return Q.ninvoke(amqp, "connect", util.buildRabbitMqUrl(config)).then(function (conn) {
        logger.info("Connection in progress...");

        conn.on("error", function (err) {
            if (err.message !== "Connection closing") {
                console.error("[AMQP] conn error", err.message);
                logger.error("[AMQP] " + err.message);
                conn.close();
            }
        });

        conn.on("close", function () {
            console.error("[AMQP] reconnecting");
            logger.error("[AMQP] reconnecting");

            return setTimeout(function() {
                context.connect(config).then(function(conn){
                    context.registerHandlers(context.handlers, conn);
                })
            }, 1000);
        });

        console.log("[AMQP] connected");
        logger.info("[AMQP] has connected successfully");
        return conn;

    }).catch(function (err) {
        console.error("[AMQP]", err.message);
        logger.error("[AMQP] " + err.message);
        return setTimeout(function() {
            context.connect(config).then(function(conn){
                context.registerHandlers(context.handlers, conn);
            })
        }, 1000);

    });
};

/**
 * A Channel object, part of the amqplib. Search amqplib documentation for more information
 * @typedef {Object} Channel
 */

/**
 * Sets up a channel object to be used
 * @memberof Listener
 * @param {number} messageRate - number of messages that will be fetched at a time. server must receive ack before it will pass more.
 * @param {Connection} amqpConn - xxxxxx
 * @returns {Promise<Channel>} - channel object that can be used to request messages and response
 */
Connect.prototype.setUpListener = function(messageRate, amqpConn) {
    var context = this;
    return Q.ninvoke(amqpConn, 'createChannel').then(function (ch) {

        ch.on("error", function (err) {
            console.error("[AMQP] channel error", err);
            logger.error("[AMQP] channel error " + err);
        });
        ch.on("close", function () {
            console.log("[AMQP] Channel closed");
            logger.info("[AMQP] Channel closed");

            context.registerHandlers(registeredHandlers,amqpConn);
        });
        logger.info("[AMQP] Channel prefetch rate set to " + messageRate);
        ch.prefetch(messageRate); // limit the number of messages that are read to 1, once the server receives an acknowledgement back it will then send another
        return ch;
    });
}

/**
 * This function should be fired when the main amqp connection has been fired
 * @memberof Listener
 * @param {array} handlers - Takes in an array of confuration settings to loop through and create queue connections for
 */
Connect.prototype.registerHandlers = function (handlers, amqpConn) {
    var context = this;
    logger.info("[AMQP] Beginning channel connections");
    registeredHandlers = handlers;
    if(registeredHandlers){
        console.log("[AMQP] Set handlers " ,JSON.stringify(registeredHandlers));
    }

    registeredHandlers.forEach(function (handler) {
        logger.info("[AMQP] attempting queue listener handshake for " + handler.queueConfig);
        context.setUpListener(handler.messageRate, amqpConn)
            .then(function (ch) {
                logger.info("[AMQP] Success handshake complete, listening on " + handler.queueConfig);
                ch.consume(handler.queueConfig, handler.handlerFunction.bind(ch), {noAck: false});
            }).catch(function (err) {
            if (err) {
                console.log(err);
                logger.fatal("[AMQP] " + err.message);

            }
        });
    });

};

/**
 * Used to register new channels on connections that exist, it also checks that the publishing exchange is reachable
 * @param config
 * @param amqpConn
 */
Connect.prototype.registerPublisher = function(config, amqpConn){
    return new Promise(function(resolve, reject) {
        logger.info("[AMQP] Beginning publisher connections");
        logger.info("[AMQP] attempting publisher handshake for new channel to publish on " + config.publisherExchange);
        amqpConn.createChannel(function(err, ch) {
            if (err) {
            logger.error('no channel');
            return reject(err);
        }

        ch.checkExchange(config.publisherExchange, function (err, ok) {
            if (err) {
                logger.error('[AMQP] error finding exchange ' + config.publisherExchange);
            } else {
                logger.info('[AMQP] success finding exchange ' + config.publisherExchange);
                resolve(ch);

            }
        });
    });
    });
};

module.exports = Connect;