/**
*
* Copyright 2013 Joel Grenon
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
(function() {
var util = require('util'),
request = require('request'),
Q = require('q'),
_ = require('lodash'),
EventEmitter = require('eventemitter3'),
Layer = require('./transports'),
shortid = require('shortid'),
bunyan = require('bunyan');
require('underscore-query');
var Agent = (function() {
/**
* @class Agent
* @description Primary client-side abstraction used to establish and manage communication with a central dispatcher
* @param cfg
* @constructor
*/
function Agent(cfg) {
var id = cfg.id || shortid.generate();
// Create our transport layer
this.layer = new Layer(_.extend(_.pick(cfg, 'logLevel'), {logName: id+"-transport-layer"}));
this.id = id;
this.version = cfg.version || 100;
this.url = cfg.endpoint;
this.timeout = cfg.timeout || 30;
this.websocket = cfg.websocket || false;
this.log = cfg.log || bunyan.createLogger({name:id, level:cfg.logLevel || 'info'});
this.heartbeatSpec = cfg.heartbeat || { interval: 30};
this.pollingSpec = cfg.polling || { interval: 5 };
this.commandHandlers = _.defaults(cfg.commandHandlers || {}, {
configure: simpleCommandHandler(this, 'configure')
});
// Add basic event handlers
this.eventHandlers = [{
key:'command-progress',
scope:this,
handler:onCommandProgress
}];
EventEmitter.call(this);
}
util.inherits(Agent, EventEmitter);
/**
* Called to initiate the heartbeat process and install all transports
* @return {Promise} resolved when the agent is started
*/
Agent.prototype.start = function() {
var _this = this;
var defer = Q.defer();
Q.nextTick(function() {
// Hook to let host perform extra configuration before the agent is started
_this.emit('before-start');
// Register an internal handler to send heartbeat when requested
_this.on('send-heartbeat', _.bind(heartbeat, _this));
// Avoid starting automatic heartbeat if our host is requesting manual heartbeat control
if(!_this.heartbeatSpec.manual) {
// Send the first heartbeat
_this.emit('send-heartbeat');
}
// Launch the pending command polling
if(_this.pendingCommandHandle)
clearInterval(_this.pendingCommandHandle);
_this.pendingCommandHandle = setInterval(_.bind(retrievePendingCommands, _this), _this.pollingSpec.interval * 1000);
if(_this.pendingEventsHandle)
clearInterval(_this.pendingEventsHandle);
_this.pendingEventsHandle = setInterval(_.bind(retrievePendingEvents, _this), _this.pollingSpec.interval * 1000);
// Register our command execution handler
_this.on('command', _.bind(executeCommand, _this));
// Establish the web socket connection
if(_this.websocket) {
_this.layer.ws.init(_this.websocket);
}
defer.resolve(_this);
});
return defer.promise;
};
Agent.prototype.stop = function() {
this.log.debug("Stopping agent %s", this.id);
this.suspend();
};
Agent.prototype.suspend = function(duration) {
var _this = this;
this.suspended = true;
if(this.pendingCommandHandle) {
this.log.trace("Stopping pending command handler");
clearInterval(this.pendingCommandHandle);
}
if(this.pendingEventsHandle) {
this.log.trace("Stopping pending event handler");
clearInterval(this.pendingEventsHandle);
}
if(this.nextHeartbeat) {
this.log.trace("Preventing next heartbeat from firing");
clearTimeout(this.nextHeartbeat);
}
if(this.resumeTimer) {
this.log.trace("Cancel the resume timer");
clearTimeout(this.resumeTimer);
}
if(duration) {
this.resumeTimer = setTimeout(function() {
_this.resume();
}, duration * 1000);
}
};
Agent.prototype.resume = function() {
this.suspended = false;
// Launch the pending command polling
if(this.pendingCommandHandle)
clearInterval(this.pendingCommandHandle);
this.pendingCommandHandle = setInterval(_.bind(retrievePendingCommands, this), this.pollingSpec.interval * 1000);
if(this.pendingEventsHandle)
clearInterval(this.pendingEventsHandle);
this.pendingEventsHandle = setInterval(_.bind(retrievePendingEvents, this), this.pollingSpec.interval * 1000);
delete this.resumeTimer;
if(!this.heartbeatSpec.manual) {
this.emit('send-heartbeat');
}
};
/**
* @method connect
* @param options Supported options are :
* - waitForDispatcher: Indicate if the agent will exit of retry connection with the dispatcher indefinitely. Default to false.
* @obsolete Not used any more since version 0.9.0. Mapped to start.
* @returns {*}
*/
Agent.prototype.connect = function(options) {
return this.start(options);
};
/**
* @method subscribeTo
* @description Will connect this agent with remote events emitted by the dispatcher. These events may be triggered by the dispatcher or other agents connected
* on the network. For now, you can only subscribe to events that are sent to you, but in the future, we will add support for channels to receive 'room-like' messages.
* @param key {String} The event key we wish to subscribe to remotely
* @param fn {Function} The handler that will be called when the event is triggered remotely
* @param options Options modifying the behavior to the function.
*
* - **force** : Avoid reusing a subscription if already in place and
* send the request to the dispatcher anyway. This is used when the dispatcher just failed and we are instructed to reconnect our event handlers.
*/
Agent.prototype.subscribeTo = function(key, fn, options) {
var promise, _this = this;
options = options || {};
var handler = _.query.build(this.eventHandlers).and({ key: key }).first();
if(!handler || options.force) {
// Notify the dispatch of our subscription interest
promise = _this.layer.send(this, 'subscribe',{
event:key,
options:options
}, _.pick(options, "timeout"));
// Perform internal subscription
promise.then(function() {
if(!options.force) {
_this.log.debug("Registering a new event handler for event %s", key);
_this.eventHandlers.push({
key: key,
handler: fn,
scope: _this
});
}
}, function(err) {
_this.log.error(err, "Unable to subscribe to event %s", key);
});
}
else
_this.log.warn("Already subscribed to event %s. Existing subscription will be reused", key);
return promise;
};
/**
* @method unsubscribeFrom
* @param key
*/
Agent.prototype.unsubscribeFrom = function(key) {
var _this = this, promise;
var handler = _.query(this.eventHandlers, { key: key });
if(handler) {
promise = _this.layer.send(this, 'unsubscribe', {
channelId: handler.channelId,
event:key
});
promise.then(function() {
_this.log.debug("Removing event handler for event %s in channel %s", key, handler.channelId);
_this.eventHandlers.remove(handler);
}, function(err) {
_this.log.error(err, "Unable to subscribe to event %s", key);
});
}
else
_this.log.warn("No subscription found for event %s. Unable to unsubscribe", key);
return promise;
};
/**
* @method broadcast
* @description Send a notification to whomever is listening to. This is not directed to a specific agent, but to anyone
* on the network, including the dispatcher.
* @param event {String} The key identifying the event
* @param payload {Object} The event data
* @param options Options modifying the behavior of the broadcast.
* @returns {Promise} A promise resolved as soon as the event has been broadcasted. Nothing is returned (fire and forget)
*/
Agent.prototype.broadcast = function(event, payload, options) {
return this.layer.send(this, 'broadcast',{ event:event, payload: payload || {}, options:options }, _.pick(options || {}, "timeout"));
};
/**
* @param key
* @param options
*/
Agent.prototype.monitor = function(key, options) {
return this.layer.send(this, 'start-monitoring', {key:key, interval:options.interval || 5});
};
Agent.prototype.cancelMonitoring = function(key) {
return this.layer.send(this, 'stop-monitoring', {key:key});
};
/**
* @method execute
* @description Execute a remote command. Depending on the dispatcher context, the command might be executed by
* the dispatcher itself or by another agent, having registered a remote command handler with the dispacher.
* @param key The key of the command to execute. It must be registered by the dispatcher or an agent somewhere to receive a response.
* @param payload The data to send with the command.
* @param options Options that may change the way the command is executed. Supported options are:
*
* - **timeout** : The number of seconds to wait for a response. Default to 30 seconds.
*
* @returns {Promise} A promise for the command result.
*/
Agent.prototype.execute = function(key, payload, options) {
return this.layer.send(this, 'execute-command', { commandKey:key, payload:payload, options:options||{} }, _.pick(options || {}, "timeout"));
};
/**
* @method executeOn
* @description Execute a remote command on a specific agent. The command will be forward directly to this agent.
* @param key The key of the command to execute. It must be registered by the dispatcher or an agent somewhere to receive a response.
* @param payload The data to send with the command.
* @param options Options that may change the way the command is executed. Supported options are:
*
* - **timeout** : The number of seconds to wait for a response. Default to 30 seconds.
*
* @returns {Promise} A promise for the command result.
*/
Agent.prototype.executeOn = function(agentId, key, payload, options) {
return this.layer.send(this, 'execute-command-on', { agentId: agentId, commandKey:key, payload:payload || {}, options: options || {} }, _.pick(options || {}, "timeout"));
};
/**
*
* @param agentId {String|Array} One or more agentIds that should receive this event.
* @param event {String} The key of the event to send
* @param payload {Payload} Data that will be associated with this event.
* @param options {Object} Options that will affect the way the event is emitted
*/
Agent.prototype.emitTo = function(agentId, event, payload, options) {
return this.layer.send(this, 'emit-to', { agents: agentId, event:event, payload: payload, options:options }, _.pick(options, "timeout"));
};
/**
* @method registerCommandHandler
* @description Add or replace a command handler. This handler will be execute each time a command of type **key** will be received.
* @param key The command key to attach this handler
* @param fn The handler that will be executed when a command of the specified key is received
*/
Agent.prototype.registerCommandHandler = function(key, fn) {
this.log.debug("Register a new command handler %s", key);
this.commandHandlers = this.commandHandlers || {};
this.commandHandlers[key] = fn;
};
/**
* @method retrieveCommands
* @description Force the retrieval of any pending commands from the dispatcher.
* *
* @returns {*}
*/
Agent.prototype.retrieveCommands = function() {
var promise, _this = this;
promise = _this.layer.send(this, 'retrieve-pending-commands');
promise.then(function(result) {
if(_.isArray(result.data)) {
_this.log.debug("Received %d commands to execute", result.data.length);
_.each(result.data, function(command) {
_this.log.trace("Executing command %s", command.key);
_this.emit('command', command);
});
}
else {
_this.emit('transport-error', {
action:'retrieve-pending-commands',
error: "Invalid command list"
});
}
}, function(err) {
_this.log.error("Unable to retrieve pending commands", err);
/**
* @event network-error
* @description Emitted when there is a network problem and we were unable to communicate with the dispatcher.
* @param action {String} The name of the action that triggered the error
* @param error {Error|String} The actual error that was thrown
*/
_this.emit('transport-error', {
action:'retrieve-pending-commands',
error: err
});
}).catch(function(err) {
_this.log.error("Exception encountered while retrieving pending commands", err);
/**
* @event network-error
* @description Emitted when there is a network problem and we were unable to communicate with the dispatcher.
* @param action {String} The name of the action that triggered the error
* @param error {Error|String} The actual error that was thrown
*/
_this.emit('transport-error', {
action:'retrieve-pending-commands',
error: err
});
});
return promise;
};
/**
* @method retrieveEvents
* @description Retrieve all pending events for this agent.
* @param options
*
* - delay: The number of seconds we have to wait before retrieving the events. Default to 0.
* @returns {*}
*/
Agent.prototype.retrieveEvents = function(options) {
var promise, _this = this;
promise = _this.layer.send(this, 'retrieve-pending-events');
promise.then(function(result) {
_this.log.trace("Retrieve pending events successfully completed", result);
if(_.isArray(result.data)) {
_this.log.debug("Received %d events to handle", result.data.length);
_.each(result.data, function(e) {
_this.log.trace("Received event", e);
// Notify all registered event handlers
//FIXME: Why no use our emitter interface here?
var handlers = _.query(_this.eventHandlers, {key: e.key});
_.each(handlers, function(h) {
h.handler.call(h.scope || _this, e.data);
});
});
}
else {
_this.emit('transport-error', {
action:'retrieve-pending-events',
error: "Invalid event list"
});
}
}, function(err) {
_this.log.error("Unable to retrieve pending events", err);
/**
* @event transport-error
* @description Emitted when there is a network problem and we were unable to communicate with the dispatcher.
* @param action {String} The name of the action that triggered the error
* @param error {Error|String} The actual error that was thrown
*/
_this.emit('transport-error', {
action:'retrieve-pending-events',
error: err
});
}).catch(function(err){
_this.log.error("Exception encountered while retrieving pending events", err);
_this.emit('transport-error', {
action:'retrieve-pending-events',
error: err
});
});
return promise;
};
function executeCommand(command) {
var _this = this;
command.timeout = command.timeout || _this.timeout;
_this.log.debug("Executing command with timeout ", command.timeout);
// Make sure to parse the payload
if(_.isString(command.payload)) {
_this.log.trace("Automatically converting string payload to object using JSON parse");
try {
command.payload = JSON.parse(command.payload);
}
catch(err) {
_this.log.error("Unable to parse command %s(%s) payload. error=", command.id, command.key, err);
}
}
_this.log.debug("Looking for installed handle for command %s (%s)", command.id, command.key);
var handler = this.commandHandlers[command.key];
if(handler) {
var deferredResponse = Q.defer();
Q.nextTick(function() {
_this.log.trace("Executing command handler", handler);
_.bind(handler, _this)(command, deferredResponse);
});
_this.log.trace("Waiting for command %s(%s) results for %s seconds", command.id, command.key, _this.timeout);
Q.timeout(deferredResponse.promise, command.timeout * 1000).then(function(result) {
_this.log.debug("Received command %s(%s) result", command.id, command.key);
_this.log.trace("Command %s(%s) result = ", command.id, command.key, result);
var promise = _this.layer.send(_this, 'command-response', {
commandId: command.id,
result: result
});
promise.then(function(){
_this.log.trace("Response for command %s(%s) was successfully posted", command.key, command.id);
}, function(err) {
_this.log.error("Unable to post command-response for command %s(%s)", command.key, command.id, err);
});
return promise;
}, function(err) {
_this.log.error("Error while processing command %s(%s): ", command.key, command.id, err);
_this.layer.send(_this, 'command-error', {commandId: command.id, type: 'timeout', error: err});
}, function(progress) {
_this.log.trace("Progress notification received for command %s(%s)", command.key, command.id);
_this.layer.send(_this, "post-command-progress", {commandId: command.id, progress:progress}, {noResponse:true})
}).catch(function(err) {
_this.log.error("Exception! Error while processing command %s(%s): ", command.key, command.id, err);
_this.layer.send(_this, 'command-error', {commandId: command.id, type: 'timeout', error: err});
});
}
else {
_this.log.warn("No handler configured for command %s(%s). We assume that a custom event handler has been installed on the 'command' event", command.key, command.id);
}
}
function heartbeat() {
var _this = this, data, result;
_this.log.debug("Sending heartbeat to our dispatcher");
result = Q.fcall(function() {
var meta = {meta: {interval: _this.heartbeatSpec.interval}};
if(_this.heartbeatSpec.payload) {
if(_.isFunction(_this.heartbeatSpec.payload)) {
_this.log.trace("We have a payload generator function");
var payloadDefer = Q.defer();
if(_this.heartbeatSpec.payload.length === 1) {
var defer = Q.defer();
_this.log.debug("Producing deferred payload");
// Request the payload
_this.heartbeatSpec.payload.call(_this.heartbeatSpec.scope || _this, defer);
Q.timeout(defer.promise, (_this.heartbeatSpec.interval-5) * 1000 || 25000).then(function(data) {
_this.log.trace(data, "Heartbeat payload produced");
payloadDefer.resolve(_this.layer.send(_this, "heartbeat", {payload: data, meta: meta}));
}, function(err) {
// Send a late heartbeat without a payload
_this.log.warn(err, "Unable to produce heartbeat payload. Empty payload will be used", err);
payloadDefer.resolve(_this.layer.send(_this, "heartbeat", {meta: meta}));
}).catch(function(err) {
// Send a late heartbeat without a payload
_this.log.warn(err, "Exception! Unable to produce heartbeat payload. Empty payload will be used", err);
payloadDefer.resolve(_this.layer.send(_this, "heartbeat", {meta: meta}));
});
return payloadDefer.promise;
}
else {
_this.log.debug("Producing heartbeat payload synchronously");
data = _this.heartbeatSpec.payload.call(_this.heartbeatSpec.scope || _this);
_this.log.trace(data, "Heartbeat payload produced");
return _this.layer.send(_this, "heartbeat", {payload: data, meta: meta});
}
}
else {
_this.log.trace(data, "Heartbeat payload produced");
return _this.layer.send(_this, "heartbeat", {payload: _this.heartbeatSpec.payload, meta:meta});
}
}
else
return _this.layer.send(_this, "heartbeat", {meta:meta});
});
// Handle result and reprogram next heartbeat
result.then(function(resp) {
_this.log.debug("Heartbeat successfully sent", resp);
}, function(err) {
_this.log.error("Unable to send heartbeat", err);
}).catch(function(err) {
_this.log.error("Exception! Unable to send heartbeat", err);
}).finally(function() {
if(!_this.heartbeatSpec.manual) {
_this.log.debug("Programming next heartbeat for %d seconds", _this.heartbeatSpec.interval);
_this.nextHeartbeat = setTimeout(function(){ _this.emit('send-heartbeat')}, _this.heartbeatSpec.interval * 1000);
}
});
return result;
}
function retrievePendingCommands() {
this.log.trace("Retrieving pending commands");
this.retrieveCommands();
}
function retrievePendingEvents() {
this.log.trace("Retrieving pending events");
this.retrieveEvents();
}
function simpleCommandHandler(dispatcher, key) {
return _.bind(function(command, deferredResult) {
this.emit(key, command);
deferredResult.resolve();
}, dispatcher);
}
function onCommandProgress(e) {
this.log.debug("Received command %s(%s) progress", e.cmd.key, e.cmd.id, e.data);
var cmd = this.layer.getResult(e.cmd.id);
if(cmd) {
this.log.trace("command %s(%s) has been found and progress will be updated", e.cmd.key, e.cmd.id);
cmd.defer.notify(e.data);
}
else
this.log.warn("command %s(%s) wasn't found in waitingCommands", e.cmd.key, e.cmd.id);
}
return Agent;
})();
module.exports = Agent;
})();