Show:
/**
 *
 * Copyright 2013 Joel Grenon
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
var util = require('util'),
    http = require('http'),
    Q = require('q'),
    /* jshint ignore:start */
    JSON = require('json3'),
    /* jshint ignore:end */
    shortId = require('shortid'),
    _ = require('lodash'),
    moment = require('moment'),
    redis = require('redis'),
    async = require('async'),
    bunyan = require('bunyan'),
    WebSocketServer = require('ws').Server,
    WatchDog = require('./watchdog'),
    EventEmitter = require('eventemitter3');

require("underscore-query");

var Dispatcher = (function()    {

    var server;

    const protocolMap = require('./protocol');
    const VERSION = 100;
    const pendingCommands = [];
    const watchDogs = {};
    const monitors = {};

    /**
     * @class Dispatcher
     * @extend EventEmitter
     * @description The dispatcher is responsible for central coordination of all agents. It monitors their liveness and make sure they can communication between one and other.
     * The Dispatcher is an {EventEmitter} and will make sure events are remotely replicated between agents.
     * @param cfg
     * @constructor
     */
    function Dispatcher(cfg) {
        var _this = this;
        this.port = cfg.port || 8080;
        this.log = cfg.log || bunyan.createLogger({name:'dispatcher', level: cfg.logLevel || 'info'});
        this.pendingCleanupThreshold = cfg.command_cleanup_threshold || 600;
        this.commandHandlers = {};
        this.agentSockets = {};

        EventEmitter.call(this);

        if(!cfg.redis) {
            cfg.redis = {
                port: 6379,
                host: 'localhost'
            }
        }

        this.db = redis.createClient(cfg.redis.port, cfg.redis.host);

        this.db.on('error', _.bind(function(e) {

            /**
             * @event internal.db.error
             * @description Triggered if the internal Redis database cannot be accessed.
             * @param e {Error} The error that has been sent.
             */
            this.emit('internal.db.error', e);

        }, this));

        this.eventSubscriptions = [];

        server = http.createServer(function(req, res) {

            if(_this.suspended) {
                res.statusCode = 503;
                res.end("SUSPENDED");
            }
            else {
                if (req.method === 'POST') {
                    var body = '';
                    req.on('data', function (data) {
                        body += data;
                    });
                    req.on('error', function(err) {
                        _this.log.error(err);
                        res.send("error");
                    });
                    req.on('end', function () {
                        var content = {};
                        if(body.length > 0) {
                            try {
                                content = JSON.parse(body);

                                // Do something...
                                var handler = protocolMap[content.key];
                                if(handler) {
                                    Q.fcall(_.bind(handler, _this), content).then(function(result) {
                                        res.end(JSON.stringify({success:true, v:VERSION, data: result}));
                                    }, function(err) {
                                        res.end(JSON.stringify({success:false, v:VERSION, error:err}));
                                    }).catch(function(err) {
                                        res.end(JSON.stringify({success:false, v:VERSION, error:err}));
                                    });
                                }
                                else {
                                    res.end(JSON.stringify({success:false, error:'unknown-key:'+content.key, v:VERSION}));
                                }
                            }
                            catch(err) {
                                _this.log.error(err);
                            }

                        }

                    });
                }
                else
                    res.end(JSON.stringify({success:false, v:VERSION, error:"Unsupported method"}));
            }
        });

        server.on('connection', function(socket) {
            socket.setTimeout(300 * 1000);
        });

        // Initialize the WebSocket server
        if(cfg.wss) {
            var wss = new WebSocketServer({port: cfg.wss.port});

            // Handle agent connections. Register all message handlers
            wss.on('connection', function(ws) {

                ws.on('message', function(msg) {
                    try {
                        var content = JSON.parse(msg);
                        // Bind this socket to an agent id
                        _this.agentSockets[content.id] = ws;

                        _this.log.trace("Received message through websocket", content);

                        // Do something...
                        var handler = protocolMap[content.key];
                        if(handler) {

                            Q.fcall(_.bind(handler, _this), content).then(function(result) {
                                // Write response through the ws
                                ws.send(JSON.stringify({
                                    key: 'message-response',
                                    uuid: content.uuid,
                                    data: result
                                }));
                            }, function(err) {
                                // Write error through the ws
                                ws.send(JSON.stringify({
                                    key:'message-error',
                                    uuid: content.uuid,
                                    error: err
                                }));
                            }).catch(function(err) {
                                // Write error through the ws
                                ws.send(JSON.stringify({
                                    key:'message-error',
                                    uuid: content.uuid,
                                    error: err
                                }));
                            });
                        }
                        else
                            _this.log.warn("No handler was configured for command", content.key);
                    }
                    catch(err) {
                        _this.log.error(err);
                    }

                });

                ws.on('close', function(ws) {
                    _this.log.warn("TODO: Closing socket:",ws);
                });
            });
        }

        // Launch a command cleanup job to clear completed and lost commands
        this.commandMonitorInterval = setInterval(function() {
            _this.log.debug("Analyzing %d pending commands for cleanup", pendingCommands.length);

            var activeOrCompletedCommands = _.query(pendingCommands, {status:{ $ne : 'PENDING'}});

            _this.log.debug("Found %d candidate commands for cleanup (completed or dead)", activeOrCompletedCommands.length);

            async.forEach(activeOrCompletedCommands, function(cmd, callback) {
                var delta = moment().utc().unix() - cmd.ts;
                _this.log.trace(cmd, "Checking command for cleanup");

                // First process active commands with no results
                if(cmd.status === 'ACTIVE' && delta > _this.pendingCleanupThreshold) {
                    cmd.defer.reject(new Error("no-response"));
                    cmd.status = 'COMPLETE';
                    _this.log.warn("Command %s(%s) for agent %s is now dead and will be cleaned-up", cmd.id, cmd.key, cmd.agentId);
                }

                if(cmd.status === 'COMPLETE') {
                    _this.log.trace("Cleaning completed command %s", cmd.id);

                    // Remove from pendingCommands
                    _.remove(pendingCommands, function(c){ return c.id === cmd.id });

                    // Remove from Redis
                    _this.db.del(cmd.id);

                    _this.log.trace("Command %s:%s(%s) has been removed from system", cmd.agentId, cmd.id, cmd.key);
                }

                callback();

            }, function(err) {
                if(err)
                    _this.log.error(err, "There was a problem while we were cleaning up commands... error is", err);
                else
                    _this.log.debug("Cleanup report: %d remaining pending commands after cleanup", pendingCommands.length);
            });

        }, 30000);

        this.on('agent-connected', function(e) {

            e.meta = e.meta || {};
            e.agent = e.agent || {};

            // Create a new structure for this agent
            this.db.multi()
                .hset(e.id, "connectedTs", e.ts)
                .hset(e.id, "lastHeartbeatTs", e.ts)
                .hset(e.id, "version", e.agent.version || "1")
                .hset(e.id, "heartbeat-interval", e.meta.interval || 30)
                .exec(function(err) {
                    if(err)
                        _this.log.error(err);
                    else {
                        _this.ensureWatchDog({id: e.id, interval: e.meta.interval || 30}).then(function(watchdog) {

                            // Force the execution of a configure command on the agent
                            _this.executeOnAgent(e.id, 'configure', {restart: false});
                        }, function(err) {
                            _this.log.warn("Unable to install watchdog for agent %s. Error = ", e.id, err);
                        });
                    }
                });
        });

        this.on('agent-disconnected', function(e) {
            _this.log.warn("Agent %s was detected as disconnected", e.id);

            // Clear all pending commands and mark them as canceled

            // Close any open web socket connection
            if(_this.agentSockets[e.id]) {
                _this.agentSockets[e.id].close();
                delete _this.agentSockets[e.id];
            }

        });

    }

    util.inherits(Dispatcher, EventEmitter);

    /**
     * @method listen
     * @description Connect the HTTP server to the configured port.
     * @returns {Promise} Resolved when the dispatcher is ready to receive commands.
     */
    Dispatcher.prototype.listen = function() {
        var defer = Q.defer();
        server.listen(this.port, defer.makeNodeResolver());
        return defer.promise;
    };

    Dispatcher.prototype.suspend = function(duration) {
        var _this = this;
        if(!this.suspended) {
            this.suspended = true;
            this.log.warn("Dispatcher is now suspended");
            if(duration) {
                setTimeout(function() {
                    _this.resume();
                }, duration * 1000);
            }
        }
        else
            this.log.debug("Already suspended, new suspend request will be ignored");
    };

    Dispatcher.prototype.resume = function() {
        if(this.suspended) {
            this.suspended = false;
            this.log.warn("Dispatcher is now resuming normal operation");
        }
        else
            this.log.info("Dispatcher was already executing normally. Resume request will be ignored");
    };

    /**
     * {
            agent:request.id,
            key:request.payload.key,
            interval: request.payload.interval || 5,
            snapshots: request.payload.snapshots || "ALL"
        }
     *
     * @param cfg
     */
    Dispatcher.prototype.installMonitor = function(cfg) {

        if(!monitors[cfg.agent]) {
            monitors[cfg.agent] = setInterval(_.bind(function() {
                var snapshot = {
                    agents:this.listConnectedAgents(),
                    pendingCommands:this.listPendingCommands(cfg.agent).length
                };

                // Send a system-status event to the agent
                this.emitTo(cfg.agent, snapshot);

            }, this), cfg.interval);
        }
        else
            return Q(monitors[cfg.agent]);
    };

    Dispatcher.prototype.uninstallMonitor = function(agent, key) {
        if(monitors[agent]) {
            clearInterval(monitors[agent]);
            delete monitors[agent];
        }
    };

    Dispatcher.prototype.listConnectedAgents = function() {
        return _.map(_.values(watchDogs), function(w) {
            return w.agent;
        });
    };

    /**
     * @method listPendingCommands
     * @description List all pending commands for a specific agent
     * @param agentId {String} An agentId
     * @returns {Array} A list of pending commands. Each command has the following fields:
     *
     * - id
     * - agentId
     * - defer: The deferred result
     * - status: the status of the command : PENDING | ACTIVE | COMPLETE
     * - ts : The unix timestamp when this command was created
     */
    Dispatcher.prototype.listPendingCommands = function(agentId) {
        return _.q(pendingCommands, {agentId:agentId, status: 'PENDING'});
    };

    /**
     * Execute a command on a specific agent
     *
     * @method executeOnAgent
     * @param agentId {String} The agentId where the command will be executed
     * @param commandKey {String} The command key that identified this command
     * @param payload {Object} The data that will be passed with the command
     * @param options {Object} Contains options that affects the way the command is executed
     * @returns {Promise} A promise providing access to the command result when available.
     */
    Dispatcher.prototype.executeOnAgent = function(agentId, commandKey, payload, options) {
        var _this = this;
        var defer = Q.defer();

        options = options || {};
        payload = payload || {};

        Q.nextTick(_.bind(function() {
            var cmdId = options.uuid || shortId.generate();
            var ts = moment().utc().unix();

            this.log.debug("Executing command %s(%s) on agent %s", commandKey, cmdId, agentId);
            this.log.trace("Executing command %s(%s) with payload", commandKey, cmdId, payload);

            // We default to single command group
            var group = cmdId;

            // If a group is specified, we group commands based on their key by default or using a custom key provided by the caller
            if(options && options.group) {
                group = options.group.key || agentId+commandKey;
            }

            // Make sure we drop all pending requests (from other groups) if we're instructed to
            if(options && options.dropAllPending) {

                if(_.isString(options.dropAllPending)) {
                    _.each(_.query(pendingCommands, {agentId:agentId, key:options.dropAllPending, group: {$ne : group}, status:'PENDING'}), function(command) {
                        command.defer.resolve({success:true, dropped:true});
                        command.status = 'COMPLETE';
                    });
                }
                else {
                    _.each(_.query(pendingCommands, {agentId:agentId, group: {$ne : group}, status:'PENDING'}), function(command) {
                        command.defer.resolve({success:true, dropped:true});
                        command.status = 'COMPLETE';
                    });
                }

            }

            this.db.multi()
                .hset(cmdId, "id", cmdId)
                .hset(cmdId, "key", commandKey)
                .hset(cmdId, "agent", agentId)
                .hset(cmdId, "ts", ts)
                .hset(cmdId, "group", group)
                .hset(cmdId, "payload", JSON.stringify(payload || {}))
                .hset(cmdId, "options", JSON.stringify(options || {}))
                .exec(function(err) {
                    if(err) {
                        _this.log.error("Unable to register command in Redis", err);
                        defer.reject(err);
                    }
                    else {
                        _this.log.debug("Adding command %s(%s) to pending command list for agent %s", commandKey, cmdId, agentId);

                        // Add this command to our pending list
                        var cmd = {
                            id:cmdId,
                            agentId: agentId,
                            key: commandKey,
                            defer:defer,
                            group: group,
                            status: 'PENDING',
                            timeout: options.timeout || _this.timeout,
                            ts: ts
                        };

                        pendingCommands.push(cmd);
                    }
                });

        }, this));

        return defer.promise;
    };

    /**
     * @method emitTo
     * @description Emit an event to a specific agent only.
     * @param agentId {String|Array} The agent where the event is to be sent. This may be an array of agentIds.
     * @param key {String} The event key that is being sent
     * @param data The event payload
     * @param options
     * @returns {Promise} A promise resolved when the event has been retrieved by the agent (or agents)
     */
    Dispatcher.prototype.emitTo = function(agentId, key, data, options) {
        var _this = this;
        var defer = Q.defer();
        options = options || {};
        Q.nextTick(function() {
            if(_.isString(agentId)) agentId = [agentId];
            async.forEach(agentId, function(id, done){
                _this.db.rpush(id+"_events", JSON.stringify({key: key, data:data}));
                done();
            }, function(err) {
                if(err) defer.reject(err);
                else
                    defer.resolve();
            });
        });
        return defer.promise;
    };

    /**
     * @method applyCommandResponse
     * @description Called when an agent has sent back the response to an active command. This will apply the response and resolve the
     * associated promise.
     * @param cmdId The id of the command associated with the response
     * @param result The actual command response
     * @returns {defer.promise|*} The promise associated with the applyCommand operation and not the promise associated with the command itself.
     */
    Dispatcher.prototype.applyCommandResponse = function(cmdId, result) {
        var _this = this;
        var defer = Q.defer();
        Q.nextTick(function() {
            _this.log.debug("Applying command response for command %s", cmdId);
            _this.log.trace("Command %s response", result);

            var commands = _.query(pendingCommands, {id:cmdId});
            if(commands.length === 1) {
                _this.log.debug("Command %s was successfully found in our pending command list", cmdId);
                _this.log.trace("Command found", commands[0]);

                // We load all commands of the same group. They will all be resolved by the same result (including this one)
                var groupCommands = _.query(pendingCommands, {group: commands[0].group, status: 'PENDING'});
                groupCommands.push(commands[0]);

                _this.log.debug("Found %d commands that will be fulfilled by this response", groupCommands.length);
                async.forEach(groupCommands, function(command, done) {
                    _this.log.trace("Fulfilling command %s(%s)", command.key, command.id);
                    command.defer.resolve(result);
                    command.status = 'COMPLETE';
                    done();
                }, function() {
                    _this.log.debug("Command response was successfully applied");
                    defer.resolve({success:true});
                });
            }
            else {
                _this.log.warn("Unable to find command %s in our pending command list. Most probably a timeout and command was already cleaned up", cmdId);
                defer.reject({success:false, error:"Unknown command:"+cmdId});
            }
        });

        return defer.promise;
    };

    Dispatcher.prototype.applyCommandError = function(cmdId, error) {
        var defer = Q.defer();
        Q.nextTick(function() {
            var commands = _.query(pendingCommands, {id:cmdId});
            if(commands.length === 1) {

                var groupCommands = _.query(pendingCommands, {group: commands[0].group, status: 'PENDING'});
                groupCommands.push(commands[0]);

                async.forEach(groupCommands, function(command, done) {
                    command.defer.reject(error);
                    command.status = 'COMPLETE';
                    done()
                }, function() {
                    defer.resolve({success:true});
                });
            }
            else {
                defer.reject({success:false, error: "Unknown command:"+cmdId});
            }
        });

        return defer.promise;
    };

    Dispatcher.prototype.applyCommandProgress = function(cmdId, progress) {
        var defer = Q.defer();
        Q.nextTick(function() {
            var commands = _.query(pendingCommands, {id:cmdId});
            if(commands.length === 1) {

                var groupCommands = _.query(pendingCommands, {group: commands[0].group, status: 'PENDING'});
                groupCommands.push(commands[0]);

                async.forEach(groupCommands, function(command, done) {
                    command.defer.notify({cmd: _.pick(command, "id", "key", "group"), data:progress});
                    done();
                }, function() {
                    defer.resolve();
                });
            }
            else {
                defer.reject({success:false, error: "Unknown command:"+cmdId});
            }
        });

        return defer.promise;
    };

    Dispatcher.prototype.startAgentWatchDog = function(agent) {
        var _this = this;
        var defer = Q.defer();

        Q.nextTick(function() {
            watchDogs[agent.id] = new WatchDog(agent, _this);
            _this.log.info("Watchdog installed for agent %s", agent.id);
            defer.resolve(watchDogs[agent.id]);
        });

        return defer.promise;
    };

    Dispatcher.prototype.stopAgentWatchDog = function(agentId) {
        clearInterval(watchDogs[agentId]);
    };

    Dispatcher.prototype.ensureWatchDog = function(agent){
        if(!watchDogs[agent.id]) {
            return this.startAgentWatchDog(agent);
        }
        return Q(watchDogs[agent.id]);
    };

    Dispatcher.prototype.hasWatchDog = function(agentId) {
        return !_.isUndefined(watchDogs[agentId]);
    };

    return Dispatcher;
})();

module.exports = Dispatcher;