Source: router.js

'use strict';

var util = require('util');
var events = require('events');
var net = require('net');
var _ = require('lodash');
var semver = require('semver');
var debug = require('debug')('dw:router');
var Promise = require('bluebird');
//var safeCyclicIncrement = require('./common').safeCyclicIncrement;
var SafeCyclicCounter = require('./common').SafeCyclicCounter;

var defaults = {
    socTimeout: 30000
};

/**
 * An enumeration of {@link Broker} states
 * @private
 * @memberof Router
 * @enum {number}
 * @constant
 */
var ConnectionState = {
    CLOSED: 0,
    OPENING: 1,
    OPEN: 2,
    CLOSING: 3
};

////////////////////////////////////////////////////////////////////////////////
// Public section

/**
 * Router class. Route messages between brokers over direct TCP connections.
 * @constructor
 * @param {string} id A unique broker ID.
 * @param {object} [option] Options.
 * @param {object} [option.socTimeout] Socket timeout in milliseconds.
 */
function Router(option) {
    this._option = _.defaults(option || {}, defaults);
    this._server = null;
    this._serverSockets = {};
    this._clientSockets = {};
    this._requesterIdCounter = new SafeCyclicCounter();
    this._nClientSocketsOpened = 0;
    this._nClientSocketsClosed = 0;
    this._nClientSocketsFailed = 0;
    this._nServerSocketsOpened = 0;
    this._nServerSocketsClosed = 0;
    this._nServerSocketsFailed = 0;
    this._peakBufferSize = -1;

    // Logger
    var self = this;
    this.logger = {};
    ['debug', 'warn', 'info', 'error'].forEach(function (level) {
        self.logger[level] = function () {
            self.emit('log', { level: level, message: util.format.apply(self, arguments) });
        };
    });
}

util.inherits(Router, events.EventEmitter);

/**
 * Start listening for incoming connections.
 * @param {string} [host] Local IP address. If not given, the router will listen
 * on '0.0.0.0'.
 * @returns {Promise} Returns a promise. Resolved value will be the local port
 * number assigned by underlying OS. Therefore, EADDRINUSE will not occur.
 */
Router.prototype.listen = function (host) {
    var self = this;
    return new Promise(function (resolve, reject) {
        self._server = net.createServer(self._onNewConnection.bind(self));

        if (semver.lt(process.version, '0.12.0')) {
            debug('dw:router: node < 0.12.0, host=%s', host);
            self._server.listen(0, host, function () {
                resolve(self._server.address());
            });
        } else {
            debug('dw:router: node >= 0.12.0, host=%s', host);
            self._server.listen({
                host: host || '0.0.0.0',
                port: 0,
                exclusive: true
            }, function () {
                resolve(self._server.address());
            });
        }
        self._server.on('error', function (err) {
            self._server = null;
            reject(err);
        });
    });
};

/**
 * Send a request to remote broker.
 * If no connection is available with the specified remote broker, it will
 * first establish a TCP connection, then send the data. If the connection
 * exists and the specified remote address/port match, it will reuse the
 * connection to send the data.
 * @param {object} address Destination address of the broker.
 * @param {string} address.host Remote IP address.
 * @param {number} address.port Remote port number.
 * @returns {Promise} Returns a Promise. The promise will be resolved
 * when the operation successfully writes the data into the underlying
 * socket.
 */
Router.prototype.request = function (address, data) {
    if (!address || (typeof address !== 'object')) {
        return Promise.reject(new Error('address must be a valid object'));
    }
    if (typeof address.host !== 'string') {
        return Promise.reject(new Error('address.host must be a string'));
    }
    if (typeof address.port !== 'number') {
        return Promise.reject(new Error('address.port must be a number'));
    }
    var self = this;
    var remoteId = address.host + ':' + address.port;
    return new Promise(function (resolve, reject) {
        var info = self._clientSockets[remoteId];
        if (info) {
            if (info.state === ConnectionState.CLOSING ||
                info.state === ConnectionState.CLOSED) {
                info.soc.destroy();
                delete self._clientSockets[remoteId];
                info = null;
            }
        }

        if (info) {
            if (info.state === ConnectionState.OPENING) {
                // Put the data in the pending queue
                info.pending.push({
                    resolve: resolve,
                    reject: reject,
                    data: data
                });
            } else {
                self.logger.debug('dw:router: send request to remote=%s', remoteId);
                self._send(info.soc, data);
                resolve();
            }

            return;
        }

        var since = Date.now();

        // Create a new connection.
        // Client socket lifetime is managed in this closure.
        info = {
            state: ConnectionState.OPENING,
            pending: [{
                resolve: resolve,
                reject: reject,
                data: data
            }]
        };
        info.soc = new net.createConnection(address.port, address.host);
        info.soc.setTimeout(self._option.socTimeout);
        info.soc.on('connect', function () {
            info.state = ConnectionState.OPEN;
            info.frax = require('frax').create();
            info.frax.on('data', function (frame) {
                var msg;
                try {
                    msg = JSON.parse(frame.toString());
                } catch (e) {
                    debug('JSON parse error:', e);
                    info.lastError = e;
                    info.state = ConnectionState.CLOSING;
                    info.soc.end();
                    return;
                }
                self.logger.debug('dw:router: received response from %s', remoteId);
                self.emit('response', msg);
            });
            // Send all pending data.
            info.pending.forEach(function (item) {
                self.logger.debug('dw:router: send request (in pending) to remote=%s', remoteId);
                self._send(info.soc, item.data);
                item.resolve();
            });
            info.pending = [];
            self._nClientSocketsOpened++;
            self.logger.info('dw:router: client socket connect: remote=%s elapsed=%d', remoteId, Date.now() - since);
        });
        info.soc.on('data', function (buf) {
            info.frax.input(buf);
        });
        info.soc.on('end', function () {
            self.logger.info('dw:router: client socket ended: remote=%s', remoteId);
            self.emit('disconnect', address);
            info.state = ConnectionState.CLOSING;
            info.soc.end();
        });
        info.soc.on('timeout', function () {
            self.logger.info('dw:router: client socket timeout: remote=%s', remoteId);
            info.state = ConnectionState.CLOSING;
            info.soc.end();
        });
        info.soc.on('error', function (err) {
            self.logger.info('dw:router: client socket error: remote=%s err=%s', remoteId, err.message);
            self.emit('disconnect', address);
            info.state = ConnectionState.CLOSING;
            info.lastError = err;
            self._nClientSocketsFailed++;
        });
        info.soc.on('close', function (had_error) {
            info.state = ConnectionState.CLOSED; // just in case
            // Reject promise if any in pending queue
            info.pending.forEach(function (item) {
                /* istanbul ignore else */
                if (had_error) {
                    item.reject(info.lastError);
                } else {
                    self.logger.warn(
                        'dw:router: unexpected pending item on close with no error: %s',
                        JSON.stringify(item)
                    );
                }
            });
            info.pending = [];
            delete self._clientSockets[remoteId];
            self._nClientSocketsClosed++;
            self.logger.info(
                'dw:router: client socket: close event: remote=%s' +
                ' had_error=%s opened=%d closed=%d failed=%d elapsed=%d',
                remoteId,
                had_error,
                self._nClientSocketsOpened,
                self._nClientSocketsClosed,
                self._nClientSocketsFailed,
                Date.now() - since
            );
        });
        self._clientSockets[remoteId] = info;
    });
};

Router.prototype.respond = function (requesterId, data) {
    var soc = this._serverSockets[requesterId];
    if (soc) {
        this.logger.debug('dw:router: respond on requesterId=%d', requesterId);
        this._send(soc, data);
    } else {
        this.logger.warn('dw:router: socket not found for requesterId=%d', requesterId);
    }
};

Router.prototype.close = function () {
    var self = this;

    // Close all client sockets.
    Object.keys(this._clientSockets).forEach(function (brokerId) {
        var info = self._clientSockets[brokerId];
        /* istanbul ignore else */
        if (info.soc && info.state !== ConnectionState.CLOSED) {
            info.soc.destroy();
        }
    });
    this._clientSockets = {};

    // Close all server sockets.
    Object.keys(this._serverSockets).forEach(function (requesterId) {
        self._serverSockets[requesterId].destroy();
    });
    this._serverSockets = {};

    // Close listening socket
    if (this._server) {
        try {
            this._server.close();
        } catch (e) {
            /* istanbul ignore next */
            debug('Error:', e);
        }
        this._server = null;
    }
};

////////////////////////////////////////////////////////////////////////////////
// Private section

Router.prototype._send = function (soc, data) {
    var pl = JSON.stringify(data);
    var plBytes = Buffer.byteLength(pl);
    var buf = new Buffer(2 + plBytes);
    buf.writeUInt16BE(plBytes, 0);
    buf.write(pl, 2);
    soc.write(buf);

    if (this._peakBufferSize < soc.bufferSize) {
        this._peakBufferSize = soc.bufferSize;
        this.logger.info('dw:router: updated peak buffer size: %d', this._peakBufferSize);
    }
};

Router.prototype._onNewConnection = function (soc) {
    var self = this;
    var since = Date.now();
    var requesterId = this._requesterIdCounter.get();
    var lastError;
    var frax = require('frax').create();
    frax.on('data', function (frame) {
        var msg;
        try {
            msg = JSON.parse(frame.toString());
        } catch (e) {
            debug('JSON parse error:', e);
            lastError = e;
            soc.end();
            return;
        }
        self.logger.debug('dw:router: received request on requesterId=%d', requesterId);
        self.emit('request', msg, requesterId);
    });

    self._nServerSocketsOpened++;
    self.logger.info('dw:router: server socket connect: requesterId=%d', requesterId);

    // Server socket has timeout twice as long as that of client socket
    // to avoid simultaneous close, or race condition.
    soc.setTimeout(this._option.socTimeout * 2);

    soc.on('data', function (buf) {
        frax.input(buf);
    });
    soc.on('end', function () {
        self.logger.info('dw:router: server socket ended: requesterId=%d', requesterId);
        soc.end();
    });
    soc.on('timeout', function () {
        self.logger.info('dw:router: server socket timeout: requesterId=%d', requesterId);
        soc.end();
    });
    soc.on('error', /* istanbul ignore next */ function (err) {
        self.logger.info('dw:router: server socket error: requesterId=%d err=%s', requesterId, err.message);
        lastError = err;
        self._nServerSocketsFailed++;
    });
    soc.on('close', function (had_error) {
        delete self._serverSockets[requesterId];
        self._nServerSocketsClosed++;
        self.logger.info(
            'dw:router: server socket: close event: requesterId=%d had_error=%s' +
            ' opened=%d closed=%d failed=%d elapsed=%d',
            requesterId,
            had_error,
            self._nServerSocketsOpened,
            self._nServerSocketsClosed,
            self._nServerSocketsFailed,
            Date.now() - since
        );
    });
    this._serverSockets[requesterId] = soc;
};

module.exports.Router = Router;