'use strict';
var util = require('util');
var events = require('events');
var _ = require('lodash');
var scripts = require('./scripts');
var debug = require('debug')('dw:broker');
var redis = require('redis');
var Worker = require('./worker').Worker;
var Agent = require('./agent').Agent;
var Router = require('./router').Router;
var Const = require('./common').Const;
var SafeCyclicCounter = require('./common').SafeCyclicCounter;
var generateHashKey = require('./common').generateHashKey;
var NOOP = require('./common').NOOP;
var Promise = require('bluebird');
/**
* An enumeration of {@link Broker} states
* @memberof Broker
* @enum {number}
* @constant
*/
var State = {
/** Broker is inactive */
INACTIVE: 0,
/** Broker is activating */
ACTIVATING: 1,
/** Broker is active */
ACTIVE: 2,
/** Broker is being destroyed */
DESTROYING: 3,
/** Broker has been destroyed */
DESTROYED: 4
};
// Signal ID via redis pubsub
var SIG_RECOVER = 'recover';
var SIG_SALVAGE = 'salvage';
var SIG_RESTART = 'restart';
// Max number of message to read from the queue.
var DEFAULT_BATCH_READ_SIZE = 1000;
// Periodic timer for #_onTimer() call.
var TIMER_INTERVAL = 1000;
// Redis time sync interval.
var TIME_SYNC_INTERVAL = 30000;
// defaults
var defaults = {
ns: 'dw',
rpcTimeout: 3000, // in milli-seconds
batchReadSize: DEFAULT_BATCH_READ_SIZE,
clustername: 'main',
redis: {
host: '127.0.0.1',
port: 6379
},
brokerCache: {
max: 1000,
maxAge: 30000
},
ttl: 0, // Worker's max lifetime. Defaults to 0 (unlimited)
retries: {
initialInterval: 40,
maxInterval: 800,
duration: 2000
},
socTimeout: 15000,
healthCheckInterval: 1 // in seconds, 0 to dsiable.
};
/** @private */
function defaultWorkerRegistry() {
var _registry = {};
return {
add: function add(name, ctor) {
_registry[name] = ctor;
},
get: function get(name) {
return _registry[name];
}
};
}
/** @private */
function promisifyCb(func, thisObj) {
return function () {
var args = arguments;
return new Promise(function (resolve, reject) {
function tmpCb (err, res) {
if (err) {
return reject(err);
}
resolve(res);
}
args[args.length++] = tmpCb;
var p;
try {
p = func.apply(thisObj, args);
} catch (e) {
return reject(e);
}
if (p) {
/* istanbul ignore else */
if (typeof p.then === 'function') {
p.nodeify(tmpCb);
}
else {
debug('promisifyCb: unexpected non-promise value returned');
}
}
});
};
}
/**
* A function to be retried by {@link backoffRetry}.
* With this function, the `func` passed in will be retried until the given
* function succeeds, or hit the retry timeout condition specified via option.
* The retries take place with a back-off algorithm (doubling the previous
* interval time) until the given function succeeds, or the retries were
* attempted for more than the time specified by option.duration. If all
* retries fail, then this function return the last error returned by the
* given function, `func`.
* @callback backoffFunc
* @param {number} nRetries Current ordinal number of this function call. The
* number for the initial call will be 0.
*/
/**
* Retries func call for specified duration with backoff.
* @private
* @param {object} option
* @param {number} option.initialInterval Initial interval time (msec) for retries.
* @param {number} option.maxInterval Max interval time (msec) for retries.
* @param {number} option.duration Max duration (msec) of the retries.
* @param {backoffFunc} func User-defined function that will be retried.
* @example
* var option = {
* initialInterval: 20,
* maxInterval: 1000,
* duration: 5000
* };
*/
function backoffRetry(option, func) {
var retries = 0;
debug('backoffRetry: option:', option);
var interval = option.initialInterval;
var since = Date.now();
function loop() {
return func(retries)
.catch(function (e) {
var elapsed = Date.now() - since;
if (elapsed >= option.duration) {
throw e; // return the last error
}
retries++;
var msec = Math.min(interval, elapsed + 10);
interval = Math.min(interval*2, option.maxInterval);
debug('since : ' + since);
debug('elapsed : ' + elapsed);
debug('interval : ' + interval);
debug('retry wait time: ' + msec);
return Promise.delay(msec).then(function () { return loop(); });
});
}
return loop();
}
////////////////////////////////////////////////////////////////////////////////
// Public section
/**
* Broker class.
* @constructor
* @param {string} id A unique broker ID.
* @param {object} [option] Options.
* @param {string} [option.ns] Namespace used for redis keys. Default is 'dw'.
* @param {string} [option.clustername] Name of cluster this broker will belong to. Defaults to 'main'.
* @param {object} [option.redis] Redis configuration.
* @param {object} [option.redis.pub] Redis client instance for standard operation.
* @param {object} [option.redis.sub] Redis client instance for subscription.
* @param {number} [option.redis.port] Redis server's port number (defaults to 6379).
* @param {string} [option.redis.host] Redis server's IP address (defaults to '127.0.0.1').
* @param {number} [option.rpcTimeout] RPC transaction timeout. Defaults to 3000 msec.
* @param {number} [option.ttl] Recovery timeout. Defaults to 0 (unlimited). This value
* does not affect active workers. It is only referenced during the recovery process.
* A worker, even with recoverable flag set to true, won't be recovered if it's beyond its TTL.
* @param {number} [option.batchReadSize] The number of messages to read from message queue at a time. Defaults to 1000.
* @param {object} [option.brokerCache] Broker cache (LRU) configuration.
* @param {number} [option.brokerCache.max] Max number of workers LRU cache can remember
* to resolve broker ID (by worker ID).
* @param {number} [option.brokerCache.maxAge] Max number of milliseconds in which an entry
* in the cache will be invalidated.
* @param {object} [option.retries] Retry behavior configuration.
* @param {number} [option.retries.initialInterval] Initial interval in msec. Defaults to 40.
* @param {number} [option.retries.maxInterval] Max interval in msec. Defaults to 800.
* @param {number} [option.retries.duration] Max duration in msec, in which the retries will stop.
* @param {number} [option.healthCheckInterval] Health-check interval in seconds. Defaults to 1.
*/
function Broker(id, option) {
var self = this;
this._id = id;
this._option = _.defaults(option || {}, defaults);
this._state = State.INACTIVE;
this._workerReg = defaultWorkerRegistry();
this._workers = {};
this._load = 0;
this._loadUpdated = false;
// RPC sequence number originated from this broker
this._seqCounter = new SafeCyclicCounter(Math.floor(Math.random() * (Const.MAX_SAFE_INTEGER+1)));
this._cbs = {}; // RPC callback list (key: sequence number)
this._cbList = []; // RPC callback list (ordered by time, each item: [ msec, seq ]
this._timer = null;
this._timeOffset = 0;
this._lastTimeSync = 0;
this._chPrefix = this._option.ns + ':ch'; // subscriber channel prefix
this._healthCheckIn = this._option.healthCheckInterval;
if (this._option.redis.pub) {
this._pub = this._option.redis.pub;
} else {
this._pub = redis.createClient(this._option.redis.port, this._option.redis.host, {});
}
if (this._option.redis.sub) {
this._sub = this._option.redis.sub;
} else {
this._sub = redis.createClient(this._option.redis.port, this._option.redis.host, {});
}
// Promisify redis client
Promise.promisifyAll(this._pub);
this._sub.on("message", this._onSubMessage.bind(this));
this._keys = {
gh: this._option.ns + ':gh', // global hash
wh: this._option.ns + ':wh', // worker hash
bh: this._option.ns + ':bh', // broker hash
cz: this._option.ns + ':cz', // followed by ':'+clustername
bz: this._option.ns + ':bz', // followed by ':'+clustername
wz: this._option.ns + ':wz', // followed by ':'+brId
rz: this._option.ns + ':rz' // recovery list
};
this._chBc = self._chPrefix + ':*';
this._chUc = self._chPrefix + ':' + id;
this._subscribed = [];
// Broker info cache ('lru-cache')
this._brokerCache = require('lru-cache')(this._option.brokerCache);
this._brokerCacheHits = 0;
this._brokerCacheMisses = 0;
// Instantiate Router
this._router = new Router({ socTimeout: this._option.socTimeout });
this._router.on('request', this._onRequest.bind(this));
this._router.on('response', this._onResponse.bind(this));
this._router.on('disconnect', this._onRemoteDisconnect.bind(this));
// Define getter 'id'
this.__defineGetter__("id", function () {
return this._id;
});
// Define getter 'state'
this.__defineGetter__("state", function () {
return this._state;
});
// Define getter 'rpcTimeout'
this.__defineGetter__("rpcTimeout", function () {
return this._option.rpcTimeout;
});
// Define getter 'clustername'
this.__defineGetter__("clustername", function () {
return this._option.clustername;
});
// Define getter 'pub'
this.__defineGetter__("pub", function () {
return this._pub;
});
// Define getter 'sub'
this.__defineGetter__("sub", function () {
return this._sub;
});
// Define getter 'load'
this.__defineGetter__("load", function () {
return this._load;
});
// Getter for redis time.
this.__defineGetter__("redisTime", function () {
return Date.now() + this._timeOffset;
});
// Logger
this.logger = {};
['debug', 'warn', 'info', 'error'].forEach(function (level) {
self.logger[level] = function () {
self.emit('log', { id: self.id, level: level, message: util.format.apply(self, arguments) });
};
});
this._router.on('log', function (logMsg) {
logMsg.id = self.id;
self.emit('log', logMsg );
});
}
util.inherits(Broker, events.EventEmitter);
/**
* Callback function for {@link Broker.prototype.start}.
* @callback Broker~startCallback
* @param {Error} err Error object.
* @param {object} data Request data.
*/
/**
* Start this broker.
* @param {Broker~startCallback} [cb] Callback function.
* @returns {Promise} Returns a promise if `cb` is not provided.
* See {@link Broker~startCallback}.
*/
Broker.prototype.start = function (cb) {
if (this._state !== State.INACTIVE && this._state !== State.DESTROYED) {
return Promise.reject(new Error('start() not allowed: state=' + this._state))
.nodeify(cb);
}
var self = this;
this.logger.info('Broker (' + this.id + ') is starting');
if (!this._pub) {
return Promise.reject(new Error('Can not start without redis client (pub)'))
.nodeify(cb);
}
if (!this._sub) {
return Promise.reject(new Error('Can not start without redis client (sub)'))
.nodeify(cb);
}
this._syncRedisTime(Date.now());
this._setState(State.ACTIVATING);
return this._loadScripts()
.then(function () {
// Obtain local IP address from redis connection.
var localAddr = self._pub.stream.address().address;
// Pass the address to router to listen for it.
return self._router.listen(localAddr)
.then(function (addr) {
if (addr.address !== localAddr) {
self._router.close();
throw new Error('Failed to open TCP listener');
}
// Construct <ip-address>:<port> for later use.
self._addr = localAddr + ':' + addr.port;
self.logger.info('Broker (' + self.id + ') is listening on ' + self._addr);
});
})
.then(function () {
return new Promise(function (resolve) {
self._sub.subscribe(self._chBc, self._chUc, NOOP);
self._sub.on('subscribe', function (ch, count) {
self._subscribed.push(ch);
if (count === 2) {
self._sub.removeAllListeners('subscribe');
resolve();
}
});
});
})
.then(function () {
return self._pub.evalshaAsync(
scripts.addBroker.sha,
7,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.cz + ':' + self.clustername,
self._keys.bz + ':' + self.clustername,
self._keys.wz + ':' + self.id,
self._keys.rz,
self.id,
self._chPrefix,
self._load,
self.clustername,
self._addr,
generateHashKey(self.id)
);
})
.then(function (res) {
if (!res || !Array.isArray(res)) {
debug('addBroker result must be an array');
throw new Error('Internal error');
}
if (res[0] !== 0) {
debug('addBroker: unsupported result code ' + res[0]);
throw new Error('Internal error');
}
self._timer = setTimeout(self._onTimer.bind(self), TIMER_INTERVAL);
})
.then(function () {
self._setState(State.ACTIVE);
self.logger.info('Broker (' + self.id + ') started');
}, function (err) {
self.logger.error('Failed to start: ' + err.message);
self._setState(State.INACTIVE);
return new Promise(function (resolve, reject) {
if (self._subscribed.length === 0) {
return reject(err);
}
self._sub.unsubscribe(self._subscribed, NOOP);
self._sub.on('unsubscribe', function (ch, count) {
debug('on unsubscribe: ch=' + ch + ' count=' + count);
if (count === 0) {
self._sub.removeAllListeners('unsubscribe');
reject(err);
}
});
self._subscribed = [];
});
})
.nodeify(cb);
};
/**
* Set a custom worker class registry.
* @param {object} registry Custom registry to be used. The object
* must support the following methods:
* - add(name, constructor)
* - get(name, constructor)
* @returns {void}
*/
Broker.prototype.setWorkerRegistry = function (registry) {
if (typeof registry !== 'object') {
throw new Error('Invalid argument');
}
if (typeof registry.add !== 'function') {
throw new Error('Custom registry must support `add` method');
}
if (typeof registry.get !== 'function') {
throw new Error('Custom registry must support `get` method');
}
this._workerReg = registry;
};
/**
* Register a worker class.
* @param {string} [name] Name of the worker class. If omitted, the name
* will be ctor.classname or ctor.name (the function name of the ctor).
* @param {function} ctor Constructor of the worker to be registered.
* @param {object} [option] Option.
* @param {string} [option.clustername] Name of cluster the worker belongs to.
* If this option is omitted, it defaults to 'main'.
* @returns {void}
*/
Broker.prototype.registerWorker = function (name, ctor, option) {
if (typeof name !== 'string') {
if (typeof name !== 'function') {
throw new Error('Invalid argument');
}
option = ctor;
ctor = name;
name = ctor.classname || ctor.name;
if (!name) {
throw new Error('Invalid worker name');
}
} else {
if (!name) {
throw new Error('Worker name cannot be empty');
}
if (typeof ctor !== 'function') {
throw new Error('Invalid worker class');
}
}
// Check if the given worker is a subclass of Worker.
if (!(ctor.prototype instanceof Worker)) {
throw new Error('The given constructor is not a subclass of Worker');
}
if (ctor.agent) {
if (ctor.agent !== Agent) {
if (!(ctor.agent.prototype instanceof Agent)) {
throw new Error('The given `agent` is not a subclass of Agent');
}
}
}
if (option && option.clustername) {
ctor.clustername = option.clustername;
} else {
if (!ctor.clustername) {
ctor.clustername = 'main';
}
}
// Notice: if already exists, this overwrites the old.
this._workerReg.add(name, ctor);
};
/**
* Callback function for Broker#createWorker.
* @callback Broker~createWorkerCallback
* @param {Error} err Error object.
* @param {Agent} agent An instance for remote Worker (called 'Agent') via which you can
* access remote worker that has just been instantiated.
*/
/**
* Create a worker.
* @param {string} workerName Name of the worker to be created.
* @param {object} [option] Options.
* @param {string} [option.id] Worker ID to be assigned. If not specified and the worker
* is dynamic (option.static is set to false), then dworker will assign a unique ID for
* the new worker.
* @param {string} [option.static] If "static", then this method will create
* a static worker (only one instance in the cluster).
* @param {object} [option.attributes] A bag of parameters passed to the new worker
* created on another process (broker). Defaults to false.
* which can be referenced by Worker#attributes.<param-name>.
* @param {boolean} [option.attributes.recoverable] Mark the new worker as
* recoverable. When a process (broker) dies, workers marked as recoverable
* may be reconstructed if condition allows.
* @param {Broker~createWorkerCallback} [cb] Callback.
* @returns {Promise} Returns a promise if `cb` is not provided.
* See {@link Broker~createWorkerCallback}.
*/
Broker.prototype.createWorker = function (workerName, option, cb) {
var self = this;
if (typeof option !== 'object') {
cb = option;
option = {};
}
// The option.cause is set to Worker.CreateCause.New by default.
if (typeof option.cause !== 'number') {
option.cause = Worker.CreateCause.NEW;
}
// Make sure attributes object is always present.
if (!option.attributes) {
option.attributes = {};
}
// Make sure `id` of type string is always present..
if (typeof option.id !== 'string') {
option.id = '';
}
// Make sure attributes.static property is always present.
option.attributes.static = !!option.static;
// Check if the worker class is registered.
var kWorker = self._workerReg.get(workerName);
if (!kWorker) {
return Promise.reject(new Error('Worker, ' + workerName + ' is not registered'))
.nodeify(cb);
}
return self._pub.evalshaAsync(
scripts.getLeastLoadedBroker.sha,
5,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.cz+':'+kWorker.clustername,
self._keys.bz+':'+kWorker.clustername
)
.then(function (res) {
if (!res) {
throw new Error('No broker is available in the cluster ' + kWorker.clustername);
}
if (!Array.isArray(res)) {
throw new Error('Internal error');
}
if (typeof res[0] !== 'string' ||
typeof res[1] !== 'string' ||
typeof res[2] !== 'string') {
throw new Error('Internal error');
}
option.name = workerName;
// createWorker won't use res[1] (clustername)
var addr = res[2].split(':');
return self._requestToBroker(
'_onCreateWorker',
option,
{ host: addr[0], port: parseInt(addr[1]) }
);
})
.then(function (winfo) {
var Ctor = kWorker.agent? kWorker.agent:Agent;
return new Ctor(winfo.workerId, self, winfo.brokerId);
})
.nodeify(cb);
};
/**
* Callback function for Broker#findWorker.
* @callback Broker~findWorkerCallback
* @param {Error} err Error object.
* @param {Agent} agent An instance for remote Worker (called 'Agent') via which you can
* access remote worker that has just been instantiated. If the worker does not exist,
* the returned agent will be null.
*/
/**
* Find worker.
* @param {string} workerId ID of a worker to find. If the worker is a dynamic worker,
* then the ID should look like: "MyWorker#12" as in the form of <worker-name>#<instance-num>.
* If the worker is static, workerId is identical to the worker name. (no "#<instance-num>.
* @param {Broker~findWorkerCallback} [cb] Callback.
* @returns {Promise} Returns a promise if `cb` is not provided.
* See {@link Broker~findWorkerCallback}.
*/
Broker.prototype.findWorker = function (workerId, cb) {
var self = this;
return backoffRetry(this._option.retries, function (nRetries) {
debug('findWorker(): retries=' + nRetries);
return self._pub.evalshaAsync(
scripts.findOrCreateWorker.sha,
7,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.cz + ':' + self.clustername,
self._keys.bz + ':' + self.clustername,
self._keys.wz + ':' + self.id,
self._keys.rz,
'', // empty broker ID => only performes 'find'.
'', // empty worker name => wont' be used.
workerId,
'', // empty attributes => won't be used.
0, // redis time => won't be used.
0, // `ttl` => won't be used.
0 // `forRecovery` => won't be used.
)
.then(function (res) {
debug('findWorker: findOrCreateWorker result:', res);
if (!res || !Array.isArray(res)) {
// This should never happen.
self.logger.error('Unexpected result from findOrCreateWorker.lua:', res);
throw new Error('Internal error (invalid result from lua)');
}
var ret;
switch (res[0]) {
case 0:
// Successful case.
if (!res[1]) {
ret = null; // The worker not found
} else if (!Array.isArray(res[1]) || res[1].length < 2) {
self.logger.error('Unexpected result from findOrCreateWorker.lua:', res);
throw new Error('Internal error (invalid result value from lua)');
} else {
var brokerId = res[1][0];
var workerName = res[1][1];
// Check if you have the class registered.
var k = self._workerReg.get(workerName);
if (!k) {
throw new Error('Worker, ' + workerName + ' is not registered');
}
var Ctor = k.agent? k.agent:Agent;
ret = new Ctor(workerId, self, brokerId);
}
break;
case 1:
throw new Error('Try again');
default:
self.logger.error("Unexpected result code: " + res[0]);
ret = null;
}
return ret;
});
})
.nodeify(cb);
};
/**
* Callback function for Broker#destroyWorker.
* @callback Broker~destroyWorkerCallback
* @param {Error} err Error object.
*/
/**
* Destroy the broker.
* @param {object} [option] Options.
* @param {boolean} [option.noRecover] Tell the broker not to recover any
* workers (regardless of 'recoverable' option) this broker current have.
* @param {Broker~destroyWorkerCallback} [cb] Callback.
* @returns {Promise} Returns a promise if `cb` is not provided.
* See {@link Broker~destroyWorkerCallback}.
*/
Broker.prototype.destroy = function (option, cb) {
this.logger.info('Broker (' + this.id + ') is being destroyed: state=' + this._state);
if (typeof option !== 'object') {
cb = option; // assume cb
option = {};
}
if (this._state === State.INACTIVE) {
return Promise.resolve()
.nodeify(cb);
}
if (this._state === State.ACTIVATING) {
return Promise.reject(new Error('Cannot destroy while broker is starting'))
.nodeify(cb);
}
if (this._state === State.DESTROYING) {
return Promise.reject(new Error('Already destroying'))
.nodeify(cb);
}
if (this._state === State.DESTROYED) {
return Promise.reject(new Error('Already destroyed'))
.nodeify(cb);
}
option = _.defaults(option || {}, { noRecover: false });
this._setState(State.DESTROYING);
// Call onDestroy() on all the workers first.
var self = this;
var wIds = Object.keys(this._workers);
var promises = [];
wIds.forEach(function (id) {
var worker = self._workers[id];
promises.push(promisifyCb(worker.onDestroy, worker)({
cause: Worker.DestroyCause.SYSTEM
}));
});
// Close router first.
this._router.close();
return Promise.all(promises)
.catch(function () {}) // ignore the errors onDestroy returned
.then(function () {
// Lay off all workers and get them garbage-collected
// (as the broker instance may be hanging around until process exits)
self._workers = [];
var promises = [];
/* istanbul ignore else */
if (self._pub && self._pub.connected) {
promises.push(self._pub.zremAsync(self._keys.cz+':'+self.clustername, self.id));
promises.push(self._pub.zremAsync(self._keys.bz+':'+self.clustername, self.id));
}
/* istanbul ignore else */
if (self._sub && self._sub.connected) {
promises.push(new Promise(function (resolve) {
self._sub.unsubscribe(self._chBc, self._chUc);
self._sub.on('unsubscribe', function (ch, count) {
debug('on unsubscribe: ch=' + ch + ' count=' + count);
if (count === 0) {
self._sub.removeAllListeners('unsubscribe');
resolve();
}
});
}));
}
// Remove self from cz table so that others won't find this broker
// Also, unsubscribe pubsub channels at the same time
return Promise.all(promises);
})
.then(function () {
/* istanbul ignore if */
if (!self._pub || !self._pub.connected) {
return;
}
// Kick salvage process so that others can find the worker info
// in rz table right away during their findWorker operation.
return self._pub.evalshaAsync(
scripts.salvageWorkers.sha,
7,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.cz + ':' + self.clustername,
self._keys.bz + ':' + self.clustername,
self._keys.wz + ':' + self.id,
self._keys.rz,
self.id,
option.noRecover? 2:1
);
})
.finally(function () {
if (self._timer) {
clearTimeout(self._timer);
self._timer = null;
}
self._brokerCache.reset();
self._setState(State.DESTROYED);
self.logger.info('Broker (' + self.id + ') has been destroyed');
})
.nodeify(cb);
};
/**
* Callback function for {@link Broker.prototype.restart}.
* @callback Broker~restartCallback
* @param {Error} err Error object.
* @param {object} data Request data.
*/
/**
* Restart dworker. (Adminstrative purpose only)
* All active workers will be removed with no recovery.
* @param {Broker~restartCallback} [cb] Callback function.
* @returns {Promise} Returns a promise if `cb` is not provided.
* See {@link Broker~restartCallback}.
*/
Broker.prototype.restart = function (cb) {
var self = this;
return this.destroy({ noRecover: true })
.then(function () {
return self.start();
})
.nodeify(cb);
};
/**
* Trigger restart of all active brokers under the same namespace.
* This method does not wait for completion of actual restart. This
* method just signals `restart` command to all active brokers.
*/
Broker.prototype.restartAll = function () {
if (!this._pub || !this._pub.connected) {
this.logger.error('No connection to redis-server');
return;
}
this._pub.publish(this._chBc, JSON.stringify({
sig: SIG_RESTART
}));
};
/**
* Quit redis clients.
* This method is provided so that we can make sure to quit redis connections,
* or to do so for testing. However, the redis clients' lifetime may be
* managed by other place. Therefore, use this method with a care.
*/
Broker.prototype.quit = function () {
if (this._pub && this._pub.connected) {
this._pub.quit();
}
if (this._sub && this._sub.connected) {
this._sub.quit();
}
};
////////////////////////////////////////////////////////////////////////////////
// Private section
Broker.prototype._loadScripts = function () {
var self = this;
return new Promise(function (resolve, reject) {
if (!self._lur) {
self._lur = require('lured').create(self._pub, scripts);
self._lur.load(function (err) {
if (err) {
self.logger.error('Lua loading failed: ' + err.message);
return void(reject(err));
}
debug(self.id +': lua loading successful');
debug(self.id +': addBroker.lua: ', scripts.addBroker.sha);
debug(self.id +': getLeastLoadedBroker.lua: ', scripts.getLeastLoadedBroker.sha);
debug(self.id +': findOrCreateWorker.lua: ', scripts.findOrCreateWorker.sha);
debug(self.id +': findBroker.lua: ', scripts.findBroker.sha);
debug(self.id +': salvageWorkers.lua: ', scripts.salvageWorkers.sha);
debug(self.id +': fetchWorkersToRecover.lua: ', scripts.fetchWorkersToRecover.sha);
debug(self.id +': destroyWorker.lua: ', scripts.destroyWorker.sha);
resolve();
});
} else {
resolve();
}
});
};
/**
* Find broker by worker ID.
* @private
* @param {string} workerId Worker ID.
* @returns {Promise} Returns a promise. Resolved value will be an object that has
* the following properties:
* - {string} brokerId Broker ID that has the worker with the worker ID.
* - {string} clustername A name of cluster that the broker belongs to.
* - {string} status Status of the broker: 'active', 'invalid' or 'migrating'
* If the broker is not found, the resolved value will be null.
* The promise will be rejected if failed with communicating with redis.
*/
Broker.prototype._findBroker = function (workerId) {
var brInfo = this._brokerCache.get(workerId);
if (brInfo) {
this._brokerCacheHits++;
return Promise.resolve(brInfo);
}
this._brokerCacheMisses++;
var self = this;
return self._pub.evalshaAsync(
scripts.findBroker.sha,
3,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self.id,
workerId
)
.then(function (res) {
if (!res) {
self._brokerCache.del(workerId);
return null;
}
if (!Array.isArray(res)) {
debug('Error: non-null result must be an array');
self._brokerCache.del(workerId);
return null;
}
if (typeof res[0] !== 'number') {
debug('Error: non-null result must be an array');
self._brokerCache.del(workerId);
return null;
}
if (res[0] !== 0) {
// 1: no broker
// 2: retry
self._brokerCache.del(workerId);
return null;
}
if (!Array.isArray(res[1])) {
debug('Error: non-null result must be an array');
self._brokerCache.del(workerId);
return null;
}
var addr = res[1][3].split(':');
brInfo = {
brokerId: res[1][0],
clustername: res[1][1],
status: res[1][2],
address: {
host: addr[0],
port: parseInt(addr[1])
}
};
// Update broker cache
if (res[1][2] === 'active') {
self._brokerCache.set(workerId, brInfo);
}
return brInfo;
});
};
/**
* Recover a worker.
* This function is called internally when a worker needs a recovery. This function is identical
* to createWorker() except that it always return a Promise, and it sets option.cause to
* Worker.CreateCause.RECOVERY.
* @private
* @param {string} workerName Name of the worker to be recovered. (A class name, not worker ID)
* @param {object} [option] Options.
* @param {string} [option.id] Worker ID to be assigned. If not specified and the worker
* is dynamic (option.static is set to false), then dworker will assign a unique ID for
* the new worker.
* @param {string} [option.static] If "static", then this method will create
* a static worker (only one instance in the cluster).
* @param {object} [option.attributes] A bag of parameters passed to the new worker
* created on another process (broker). Defaults to false.
* which can be referenced by Worker#attributes.<param-name>.
* @param {boolean} [option.attributes.recoverable] Mark the new worker as
* recoverable. When a process (broker) dies, workers marked as recoverable
* may be reconstructed if condition allows.
* @returns {Promise} Always returns a promise.
* See {@link Broker~createWorkerCallback}.
*/
Broker.prototype._recoverWorker = function (workerName, option) {
// Make sure attributes object is always present.
if (!option.attributes) {
option.attributes = {};
}
option.cause = Worker.CreateCause.RECOVERY;
return this.createWorker(workerName, option);
};
/** @private */
Broker.prototype._onSubMessage = function (chId, data) {
void(chId);
debug('_onSubMessage: data=' + data + ' type=' + typeof(data));
if (data.length === 0) {
debug('ping received by broker ' + this.id);
return;
}
try {
var msg = JSON.parse(data);
} catch (e) {
this.logger.error('Malformed pubsub signal: ' + data);
return;
}
if (msg.sig === SIG_RECOVER) {
this._onSigRecover();
return;
}
else if (msg.sig === SIG_SALVAGE) {
this._onSigSalvage(msg.clustername, msg.brokerId);
return;
}
else if (msg.sig === SIG_RESTART) {
this._onSigRestart();
return;
}
debug('Error: unknown pubsub signal: ' + data);
};
Broker.prototype._onRequest = function (data, requesterId) {
debug('_onRequest:', data);
if (!data.wid) {
// Message to a broker
this._onBrokerRequest(data, requesterId);
} else {
// Message to a worker
this._onWorkerRequest(data, requesterId);
}
};
Broker.prototype._onResponse = function (data) {
debug('_onResponse:', data);
if (!data.wid) {
// Message to a broker
this._onBrokerResponse(data);
} else {
// Message to a worker
this._onWorkerResponse(data);
}
};
Broker.prototype._onRemoteDisconnect = function (remote) {
debug('[br:%s] _onRemoteDisconnect:', this.id, remote);
var self = this;
var workerIds = [];
this._brokerCache.forEach(function (value, key) {
/* istanbul ignore else */
if (remote.host === value.address.host && remote.port === value.address.port) {
workerIds.push(key);
}
});
workerIds.forEach(function (workerId) {
debug('invalidating cached address for worker ' + workerId);
self._brokerCache.del(workerId);
});
};
Broker.prototype._onSigRecover = function () {
debug('_onSigRecover() called: isRecovering=' + this._isRecovering + ' needsRecovery=' + this._needsRecovery);
if (this._isRecovering) {
this._needsRecovery = true;
return;
}
// Fetch one worker at a time then recreate it using _recoverWorker(), until
// rz table gets emptied.
var self = this;
this._isRecovering = true;
function recoveryLoop() {
self._needsRecovery = false;
// Fetch up to N worker to recover at a time.
// N = 1 for now..
return self._pub.evalshaAsync(
scripts.fetchWorkersToRecover.sha,
4,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.rz,
self.redisTime,
self._option.ttl,
0,
1
)
.then(function(res) {
if (!res || !Array.isArray(res)) {
self._isRecovering = false;
return;
}
if (!Array.isArray(res[0])) {
self._isRecovering = false;
return;
}
if (!res[0].length) {
self._isRecovering = false;
return;
}
var promises = res[0].map(function (item) {
var info = JSON.parse(item);
var option = {
id: info.id,
name: info.name,
static: info.attributes.static,
attributes: info.attributes
};
return self._recoverWorker(info.name, option)
.catch(function (err) {
self.logger.error('Worker recovery failed: workerId=' + info.id + ' err=' + JSON.stringify(err));
// Swallow the error, the continue the recovery loop.
});
});
return Promise.all(promises)
.then(function () {
if (res[1] > 0 || self._needsRecovery) {
return recoveryLoop();
}
});
})
.finally(function () {
self._isRecovering = false;
});
}
recoveryLoop().done();
};
Broker.prototype._onSigSalvage = function (clustername, targetBrokerId) {
debug(this.id + ':_onSigSalvage() called: cn=' + clustername + ' target=' + targetBrokerId);
var self = this;
return self._pub.evalshaAsync(
scripts.salvageWorkers.sha,
7,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.cz + ':' + clustername,
self._keys.bz + ':' + clustername,
self._keys.wz + ':' + targetBrokerId,
self._keys.rz,
targetBrokerId,
0
)
.then(function () {
// Now, kick the recovery loop
debug(self.id + ':_onSigSalvage() complete');
self._onSigRecover();
});
};
Broker.prototype._onSigRestart = function () {
debug(this.id + ':_onSigRestart() called');
this.restart().done();
};
Broker.prototype._onBrokerRequest = function (data, requesterId) {
debug('RPC broker request');
// RPC request
if (!this[data.m]) {
debug('Error: Broker does not have such method: ' + data.m);
var pl = {
err: { name: "Error", message: "Broker does not have such method: " + data.m }
};
this._replyToBroker(data.seq, pl, requesterId);
return;
}
var self = this;
this[data.m](data.pl)
.then(function (res) {
return { res: res };
}, function (err) {
/* istanbul ignore else */
if (err instanceof Error) {
err = { name: err.name, message: err.message };
} else {
err = { name: 'Error', message: 'unknown' };
}
return { err: err };
}).then(function (pl) {
self._replyToBroker(data.seq, pl, requesterId);
});
};
Broker.prototype._onBrokerResponse = function (data) {
debug('RPC broker response:', data);
// RPC response
if (typeof data.seq !== 'number') {
debug('Internal error: the data must have a sequence number');
return false;
}
if (!data.pl || typeof data.pl !== 'object') {
debug('Internal error: data must have a payload of type object');
return false;
}
var cb = this._cbs[data.seq];
delete this._cbs[data.seq];
if (cb) {
if (data.pl.err) {
data.pl.err = new Error(data.pl.err.message);
}
cb(data.pl.err, data.pl.res);
}
return true;
};
Broker.prototype._onWorkerRequest = function (data, requesterId) {
debug('RPC worker request');
var self = this;
var pl;
if (!data.m) {
debug('Error: Worker request must have a method property, m');
return false;
}
// Find worker instance
var worker = this._workers[data.wid];
if (!worker) {
debug('Error: Worker instance not found');
pl = {
err: { name: "Error", message: "Error: Worker instance not found" }
};
this._replyToWorker(data.seq, pl, data.wid, requesterId);
return false;
}
if (worker.state !== Worker.State.ACTIVE) {
debug('Error: Worker is not active');
pl = {
err: { name: "Error", message: "Error: Worker is not active" }
};
this._replyToWorker(data.seq, pl, data.wid, requesterId);
return false;
}
if (typeof data.seq === 'number') {
promisifyCb(worker.onAsk, worker)(data.m, data.pl)
.then(function(res) {
return { res: res };
}, function (err) {
/* istanbul ignore else */
if (err instanceof Error) {
err = { name: err.name, message: err.message };
} else {
err = { name: 'Error', message: 'unknown' };
}
return { err: err };
})
.then(function (pl) {
self._replyToWorker(data.seq, pl, data.wid, requesterId);
});
return true;
}
worker.onTell(data.m, data.pl);
return true;
};
Broker.prototype._onWorkerResponse = function (data) {
debug('RPC worker response');
// RPC response
if (typeof data.seq !== 'number') {
debug('Internal error: the data must have a sequence number');
return false;
}
if (!data.pl || typeof data.pl !== 'object') {
debug('Internal error: data must have a payload of type object');
return false;
}
var cb = this._cbs[data.seq];
delete this._cbs[data.seq];
if (cb) {
if (data.pl.err) {
data.pl.err = new Error(data.pl.err.message);
}
cb(data.pl.err, data.pl.res);
}
return true;
};
Broker.prototype._requestToBroker = function (method, data, dstAddress) {
var obj = {
m: method,
seq: this._seqCounter.get(),
pl: data
};
return this._request(obj, dstAddress);
};
Broker.prototype._replyToBroker = function (seq, data, requesterId) {
var obj = {
seq: seq,
pl: data
};
this._router.respond(requesterId, obj);
};
Broker.prototype._askWorker = function (method, data, workerId) {
var self = this;
return backoffRetry(this._option.retries, function (nRetries) {
debug('_askWorker(): retries=' + nRetries);
return self._findBroker(workerId)
.then(function (brInfo) {
if (!brInfo) {
throw new Error('Broker not found or unreachable (workerId =' + workerId + ')');
}
var obj = {
m: method,
wid: workerId,
seq: self._seqCounter.get(),
pl: data
};
return self._request(obj, brInfo.address)
.catch(function (err) {
self._brokerCache.del(workerId);
throw err;
});
});
});
};
Broker.prototype._tellWorker = function (method, data, workerId) {
var self = this;
return backoffRetry(this._option.retries, function (nRetries) {
debug('_tellWorker(): retries=' + nRetries);
return self._findBroker(workerId)
.then(function (brInfo) {
if (!brInfo) {
throw new Error('Broker not found or unreachable (workerId =' + workerId + ')');
}
var obj = {
m: method,
wid: workerId,
pl: data
};
return self._request(obj, brInfo.address)
.catch(function (err) {
self._brokerCache.del(workerId);
throw err;
});
});
});
};
Broker.prototype._replyToWorker = function (seq, data, workerId, requesterId) {
var obj = {
wid: workerId,
seq: seq,
pl: data
};
this._router.respond(requesterId, obj);
};
Broker.prototype._request = function (data, dstAddress) {
var seq = data.seq;
var self = this;
return this._router.request(dstAddress, data)
.then(function () {
if (!seq) {
// This is `tell` operation. No confirmation from remote is necessary.
return;
}
// Note: The remote broker may crash before returning responses for
// whatever reasons, we need to set a timeout for this callback.
return new Promise(function (resolve, reject) {
var ts = Date.now();
self._cbs[seq] = function (err, res) {
if (err) {
return void(reject(err));
}
return void(resolve(res));
};
self._cbList.push([ts, seq]);
});
});
};
Broker.prototype._onCreateWorker = function (data) {
debug('_onCreateWorker() called');
var self = this;
var brokerId;
var workerId;
return self._pub.evalshaAsync(
scripts.findOrCreateWorker.sha,
7,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.cz + ':' + self.clustername,
self._keys.bz + ':' + self.clustername,
self._keys.wz + ':' + self.id,
self._keys.rz,
self.id,
data.name,
data.id,
JSON.stringify(data.attributes),
self.redisTime,
0, // TODO: get this from config. 0 means forever.
(data.cause === Worker.CreateCause.RECOVERY)? 1:0
)
.then(function (res) {
if (!Array.isArray(res)) {
self.logger.error('The result from findOrCreateWorker must be an array');
throw new Error('Internal error');
}
if (res[0] !== 0) {
throw new Error('Unexpected result code ' + res[0]);
}
if (!Array.isArray(res[1]) || res[1].length < 3) {
self.logger.error('Unexpected result from findOrCreateWorker.lua:', res);
throw new Error('Internal error (invalid result value from lua)');
}
brokerId = res[1][0];
workerId = res[1][2];
var worker = null;
if (brokerId === self.id && !self._workers[workerId]) {
var Ctor = self._workerReg.get(data.name);
worker = new Ctor(workerId, self, data.attributes);
self._workers[workerId] = worker;
worker.__priv.state = Worker.State.ACTIVATING;
self.logger.debug('Broker(' + self.id + '): calling onCreate on worker ' + worker.id);
return promisifyCb(worker.onCreate, worker)({ cause: data.cause })
.catch(function (err) {
// The rejection from Worker#onCreate() is ignored by design.
self.logger.warn('Worker#onCreate() threw an error but will be ignored:', err);
})
.then(function () {
self.logger.debug('processing further for Broker(' + self.id + '): and worker ', worker.id);
if (worker.load === 0) {
worker.load = Worker.DEFAULT_LOAD;
}
worker.__priv.state = Worker.State.ACTIVE;
if (worker.__priv.mark === 'destroy') {
debug('_onCreateWorker: perform marked destroy');
self._destroyWorker(worker);
}
return {brokerId: brokerId, workerId: workerId};
});
}
return {brokerId: brokerId, workerId: workerId};
});
};
Broker.prototype._destroyWorker = function (worker, option) {
var self = this;
option = _.defaults(option || {}, {
// Self-destroyed worker won't be recovered by default.
noRecover: true
});
if (worker.state !== Worker.State.ACTIVE) {
// Leave a reminder then resolve
debug('_destroyWorker: mark destroy');
worker.__priv.mark = 'destroy';
return;
}
worker.__priv.mark = '';
worker.__priv.state = Worker.State.DESTROYING;
this._pub.evalshaAsync(
scripts.destroyWorker.sha,
4,
self._keys.gh,
self._keys.wh,
self._keys.wz + ':' + self.id,
self._keys.rz,
worker.id,
option.noRecover? 0:1
)
.catch(function (err) {
void(err); // we need to move on regardless...
})
.then(function () {
return promisifyCb(worker.onDestroy, worker)({
cause: Worker.DestroyCause.SELF
});
})
.then(function () {
worker.load = 0; // just in case
delete self._workers[worker.id];
worker.__priv.state = Worker.State.DESTROYED;
worker.__priv.br = null;
debug('total load= ' + self.load);
}).done();
};
Broker.prototype._updateLoad = function (worker, delta) {
void(worker); // currently now used
if (!delta) {
return;
}
var self = this;
this._load += delta;
this._loadUpdated = true;
// FIXME: This adds the element if the broker does not exist. Ideally,
// it should check the existence first. (If redis version >= 3.0.2,
// we would use XX option.)
// Consider converting this section to lua.
this._pub.zadd(
this._keys.cz+':'+this.clustername,
this._load,
this.id,
function (err) {
if (err) {
self.logger.error('Failed to update load value on redis');
}
}
);
};
Broker.prototype._syncRedisTime = function (now) {
var self = this;
this._pub.time(function (err, result) {
if (err) {
self.logger.error('Failed to get time on redis');
// Ignore this for now because what affected is growing
// discrepancy in between redis time and local time which
// should be very slow.
return;
}
// convert from [ seconds, micro-seconds ] to milliseconds
var redisTime = result[0] * 1000 + Math.floor(result[1] / 1000);
self._timeOffset = redisTime - now;
self._lastTimeSync = now;
debug('Redis time offset: ' + self._timeOffset + ' [msec]');
});
};
Broker.prototype._setState = function (newState) {
var oldState = this._state;
var ok = true;
switch (newState) {
case State.INACTIVE:
break;
case State.ACTIVATING:
if (oldState !== State.INACTIVE && oldState !== State.DESTROYED) {
debug('State transition error: %d -> %d', oldState, newState);
ok = false;
}
break;
case State.ACTIVE:
if (oldState !== State.ACTIVATING) {
debug('State transition error: %d -> %d', oldState, newState);
ok = false;
}
break;
case State.DESTROYING:
if (oldState !== State.ACTIVE) {
debug('State transition error: %d -> %d', oldState, newState);
ok = false;
}
break;
case State.DESTROYED:
if (oldState !== State.DESTROYING) {
debug('State transition error: %d -> %d', oldState, newState);
ok = false;
}
break;
default:
debug('State transition error: %d -> %d (invalid)', oldState, newState);
ok = false;
}
this._state = newState;
this.emit('state', oldState, newState);
return ok;
};
Broker.prototype._onTimer = function () {
//debug('#_onTimer()');
// Update load
var self = this;
var now = Date.now();
// Sync redis time
if (now - this._lastTimeSync >= TIME_SYNC_INTERVAL) {
this._syncRedisTime(now);
}
// Handle expired callbacks (RPC timeout)
while (this._cbList.length > 0) {
var item = this._cbList[0];
if (item[0] + this.rpcTimeout > now) {
break;
}
this._cbList.shift();
var cb = this._cbs[item[1]];
if (cb) {
delete this._cbs[item[1]];
debug('cb=', cb);
cb(new Error("RPC timeout. (elapsed=" + (now - item[0]) + ' [msec])'));
}
}
// Recalculate the total load.
if (this._loadUpdated) {
this._loadUpdated = false;
this._load = 0;
Object.keys(this._workers).forEach(function (id) {
self._load += self._workers[id].load;
});
}
if (this._option.healthCheckInterval > 0) {
this._healthCheckIn--;
if (this._healthCheckIn <= 0) {
this._healthCheckIn = this._option.healthCheckInterval;
this._pub.evalshaAsync(
scripts.healthCheck.sha,
5,
self._keys.gh,
self._keys.wh,
self._keys.bh,
self._keys.cz + ':' + self.clustername,
self._keys.bz + ':' + self.clustername,
self.id
)
.then(function (res) {
if (res[0] == 0) {
self.logger.debug('Broker (%s): health check %d', self.id, res[0]);
return;
}
if (res[0] == 1) {
self.logger.debug('Broker (%s): health check %d', self.id, res[0]);
return;
}
self.logger.warn('Broker (%s): health check warning: %s', self.id, res[1]);
}).done();
}
}
this._timer = setTimeout(this._onTimer.bind(this), TIMER_INTERVAL);
};
Broker.State = State;
module.exports.Broker = Broker;