API Docs for: 0.0.1
Show:

File: core/registry.js

var assert = require('assert');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var zmq = require('zmq');
var uuid = require('node-uuid');
var utils = require('./utils');
var protocol = require('./protocol');
var errors = protocol.errors;

var HEARTBEAT = 5000;
var keys = Object.keys;

inherits(Registry, EventEmitter);
function Registry(endpoint, options) {
  assert(endpoint, 'Registry requires endpoint parameter');
  assert(endpoint.indexOf('ipc://') === 0, 'endpoint must be ipc: ipc://...');

  EventEmitter.call(this);

  this._id = uuid.v4();
  this._name = (options && options.name) || 'registry-' + this._id;

  // zmq endpoints
  this._routerEndpoint = endpoint;
  this._pubEndpoint = 'ipc:///tmp/centaur-registry-pub-' + this._id;

  // zmq sockets
  this._routerSocket = zmq.socket('router');
  this._dealerSocket = zmq.socket('dealer');
  this._pubSocket = zmq.socket('pub');
  this._subSocket = zmq.socket('sub');

  // stores centaur state
  this._centaurs = {};

  // stores registry state
  this._registries = {};

  // interval that calls `_prune()`
  // see `_startPruner()` and `_stopPruner()`
  this._pruner = null;
}

Registry.prototype._initSockets = function () {
  var self = this;

  // only the router and pub need to bind
  // dealer and sub should connect to other registries
  this._routerSocket.bindSync(this._dealerEndpoint);
  this._pubSocket.bindSync(this._pubEndpoint);

  // the main router handler
  this._routerSocket.on('message', function (frames) {
    var id = frames[0];
    var msg = frames[1];

    try {
      msg = JSON.parse(msg);
    } catch (e) {
      return self._send([id, new errors.BadPayload('bad json')]);
    }

    if (!msg || typeof msg != 'object') {
      return self._send([id, new errors.BadPayload('bad message')]);
    }

    if (!msg.id) {
      return self._send([id, new Error('missing node id')]);
    }

    if (msg.type === 'heartbeat') {
      self._onHeartbeat(msg);
      self.emit('heartbeat', msg);
    } else if (msg.type === 'register') {
      self._onRegister(msg, function (err, peers) {
        return self._send([id, err ? err : peers]);
      });
      self.emit('register', msg.node);
    } else {
      self._send([id, new Error('invalid message type')]);
    }
  });
};

Registry.prototype._startPruner = function () {
  if (!this._pruner) {
    this._pruner = setInterval(this._prune.bind(this), 100);
    this.emit('prune:start');
  }
};

Registry.prototype._stopPruner = function () {
  if (this._pruner) {
    clearInterval(this.pruner);
    this._pruner = null;
    this.emit('prune:stop');
  }
};

/**
 * Called when centaurs send heartbeats.
 * @param  {[type]}   id [description]
 * @param  {Function} cb [description]
 * @return {[type]}      [description]
 */
Registry.prototype._onHeartbeat = function (id) {
  this._map[id].heartbeatTime = utils.currentTime() + HEARTBEAT;
};

Registry.prototype._onRegister = function (data, cb) {

  if (!data.id) {
    return cb(new Error('missing node id'));
  }

  // save node
  this._map[data.id] = {
    id: data.id,
    endpoint: data.endpoint,
    names: data.names
  };

  // respond with other nodes
  var reply = {
    nodes: this._map,
    subscribe: this._pubEndpoint
  };

  cb(null, reply);
};

/**
 * Removes dead centaurs from the registry and updates everyone else.
 */
Registry.prototype._prune = function () {
  var self = this;
  var current = utils.currentTime();

  keys(self._map).forEach(function (key) {
    var node = self._map[key];
    if (node.heartbeatTime > current) {
      var msg = {
        type: 'prune',
        node: { id: node.id }
      };
      self._publish(msg);
      self.emit('prune', msg);
      delete self._map[key];
    }
  });
};

/**
 * Publish message to all the centaurs in the cluster.
 *
 * @param  {Object}   data
 * @param  {Function} cb
 */
Registry.prototype._publish = function (data, cb) {
  this._pubSocket.send(JSON.stringify(data));
};

/**
 * Reply to individual centaur requests.
 *
 * @param  {Array} frames The first frame must be the socket id.
 * @return Registry       The registry itself.
 */
Registry.prototype._send = function (frames) {
  var id = frames[0];
  var data = frames[1];

  if (data instanceof Error) {
    // send error message
    return this._routerSocket.send({
      type: 'error',
      message: data.message,
      stackframe: null // TODO
    });
  }

  return this._routerSocket.send(data);
};

Registry.prototype.start = function (cb) {
  var self = this;
  self._initSockets();
  self._startPruner();
  process.nextTick(function () {
    self.emit('start');
    if (cb) cb(null);
  });
};

module.exports = Registry;