| 1 | | /** |
| 2 | | * Module dependencies |
| 3 | | */ |
| 4 | 1 | var EventEmitter = require('events').EventEmitter; |
| 5 | 1 | var common = require('./common'); |
| 6 | 1 | var dateformat = require('dateformat'); |
| 7 | 1 | var debug = require('debug')('nmq:queue'); |
| 8 | | |
| 9 | | |
| 10 | | /** |
| 11 | | * Queue |
| 12 | | */ |
| 13 | 1 | var Queue = module.exports = function Queue(db, options) { |
| 14 | 3 | EventEmitter.call(this); |
| 15 | 3 | common.ensureIndexes(db); |
| 16 | 3 | this._tasks = {}; |
| 17 | 3 | this._topics = []; |
| 18 | 3 | this.delay = 1; |
| 19 | 3 | this._state = 'stopped'; |
| 20 | 3 | this._maxWorkers = 2; |
| 21 | 3 | this._currentWorkers = 0; |
| 22 | 3 | this.db = db; |
| 23 | 3 | var queColl = 'queues'; |
| 24 | 3 | var resultColl = 'results'; |
| 25 | 3 | var configColl = 'configs'; |
| 26 | 3 | options = options || {}; |
| 27 | 3 | var collectionPrefix = options.collectionPrefix; |
| 28 | 3 | if (collectionPrefix) { |
| 29 | 0 | queColl = collectionPrefix + '_' + queColl; |
| 30 | 0 | resultColl = collectionPrefix + '_' + resultColl; |
| 31 | 0 | configColl = collectionPrefix + '_' + configColl; |
| 32 | | } |
| 33 | 3 | this.queColl = db.collection(queColl); |
| 34 | 3 | this.resultColl = db.collection(resultColl); |
| 35 | 3 | this.configColl = db.collection(configColl); |
| 36 | | }; |
| 37 | | |
| 38 | | /** |
| 39 | | * Inherits `EventEmitter` |
| 40 | | */ |
| 41 | 1 | Queue.prototype.__proto__ = EventEmitter.prototype; |
| 42 | | |
| 43 | | /** |
| 44 | | * Push a message to a topic queue |
| 45 | | * |
| 46 | | * @param {String} topic |
| 47 | | * @param {Object} message |
| 48 | | */ |
| 49 | 1 | Queue.prototype.push = function (topic, message) { |
| 50 | 2 | var self = this; |
| 51 | | |
| 52 | | function saveMsg(defer, nextIdDoc) { |
| 53 | 2 | var qid = nextIdDoc.currentId; |
| 54 | 2 | var date = new Date; |
| 55 | 2 | var msg = {_id: qid, topic: topic, message: message, |
| 56 | | state: common.STATE_NEW, |
| 57 | | date: dateformat(date, 'yyyymmdd'), |
| 58 | | created: date |
| 59 | | }; |
| 60 | 2 | self.queColl.insert(msg, {safe: true}).then(function (doc) { |
| 61 | 2 | if (doc) { |
| 62 | 2 | self.emit('queued', Array.isArray(doc) ? doc[0] : doc); |
| 63 | | } |
| 64 | | }).fail(defer.error); |
| 65 | | } |
| 66 | | |
| 67 | 2 | nextQueueId(this.configColl).and(saveMsg).fail(function (err) { |
| 68 | 0 | self.emit('fault', err); |
| 69 | | }); |
| 70 | | }; |
| 71 | | |
| 72 | | |
| 73 | | /** |
| 74 | | * Register a named task for specific topic |
| 75 | | * |
| 76 | | * @param {String} topic Topic of the queue |
| 77 | | * @param {String} name Name of the task |
| 78 | | * @param {Function} fn |
| 79 | | */ |
| 80 | 1 | Queue.prototype.process = function (topic, name, fn) { |
| 81 | 2 | var taskHash = {}; |
| 82 | 2 | if (typeof name === 'object') { |
| 83 | 0 | taskHash = name; |
| 84 | | } else { |
| 85 | 2 | taskHash[name] = fn; |
| 86 | | } |
| 87 | 2 | if (this._topics.indexOf(topic) === -1) { |
| 88 | 2 | this._topics.push(topic); |
| 89 | | } |
| 90 | 2 | if (!this._tasks[topic]) { |
| 91 | 2 | this._tasks[topic] = []; |
| 92 | | } |
| 93 | 2 | var self = this; |
| 94 | 2 | Object.keys(taskHash).forEach(function (taskName) { |
| 95 | 2 | self._tasks[topic].push({name: taskName, fn: taskHash[taskName]}); |
| 96 | | }); |
| 97 | | }; |
| 98 | | |
| 99 | | /** |
| 100 | | * Move message from one collection to another |
| 101 | | * |
| 102 | | * @param message |
| 103 | | * @param fromColl |
| 104 | | * @param toColl |
| 105 | | * @returns {*} |
| 106 | | * @private |
| 107 | | */ |
| 108 | 1 | Queue.prototype._mvMessages = function (message, fromColl, toColl) { |
| 109 | | // update don't like `_id` |
| 110 | 2 | var id = message._id; |
| 111 | 2 | delete message['_id']; |
| 112 | | // upsert the message into `toColl` |
| 113 | 2 | return toColl.update({_id: id}, {$set: message}, {safe: true, upsert: true}) |
| 114 | | .and(function (defer) { |
| 115 | | // remove the old one from `from` |
| 116 | 2 | fromColl.remove({_id: id}, {safe: true}) |
| 117 | | .then(function () { |
| 118 | 2 | message._id = id; |
| 119 | 2 | defer.next(message); |
| 120 | | }).fail(defer.error); |
| 121 | | }).fail(this._onFault(this, true)); |
| 122 | | }; |
| 123 | | |
| 124 | 1 | Queue.prototype._shift = function () { |
| 125 | 4 | if (this._state === 'stopped' || this._currentWorkers++ >= this._maxWorkers) { |
| 126 | 1 | return; |
| 127 | | } |
| 128 | 3 | var self = this; |
| 129 | | // get messages we are interested |
| 130 | 3 | this.queColl.findAndModify({topic: {$in: self._topics}, state: common.STATE_NEW}, |
| 131 | | // use `nature` order |
| 132 | | [], |
| 133 | | // modify the message status |
| 134 | | {$set: {shifted: new Date, state: common.STATE_SHIFTED}}, |
| 135 | | // return updated message |
| 136 | | {remove: false, 'new': true, upsert: false}).then(function (message) { |
| 137 | 3 | if (!message) { |
| 138 | 1 | tryNext(); |
| 139 | 1 | return; |
| 140 | | } |
| 141 | | // handle shifted message |
| 142 | | // move the message from `queues` to collection `results` |
| 143 | 2 | self._mvMessages(message, self.queColl, self.resultColl).then(function (message) { |
| 144 | | // perform tasks |
| 145 | 2 | self._process(message); |
| 146 | | }); |
| 147 | | }).fail(tryNext); |
| 148 | | function tryNext() { |
| 149 | | // if nothing, wait for `delay` and try shift again |
| 150 | 1 | self._currentWorkers--; |
| 151 | 1 | setTimeout(function () { |
| 152 | 0 | self._shift(); |
| 153 | | }, self.delay * 1000); |
| 154 | | } |
| 155 | | }; |
| 156 | | |
| 157 | 1 | Queue.prototype._runTask = function (task, message, errors, results, done) { |
| 158 | 2 | var self = this; |
| 159 | 2 | var log = { |
| 160 | | _id: message._id, |
| 161 | | topic: message.topic, |
| 162 | | name: task.name, |
| 163 | | started: new Date(), |
| 164 | | state: common.STATE_INPROGRESS |
| 165 | | }; |
| 166 | 2 | try { |
| 167 | 2 | task.fn.call(message, message.message, function (taskErr, result) { |
| 168 | 2 | log.finished = new Date(); |
| 169 | 2 | if (taskErr) { |
| 170 | | // record error |
| 171 | 1 | log.state = common.STATE_ERROR; |
| 172 | 1 | log.error = taskErr; |
| 173 | 1 | errors[task.name] = taskErr; |
| 174 | 1 | self._log(log, message, done); |
| 175 | | } else { |
| 176 | 1 | log.state = common.STATE_FINISHED; |
| 177 | | // record result if any |
| 178 | 1 | if ('undefined' !== typeof result) { |
| 179 | 1 | results[task.name] = result; |
| 180 | 1 | log.result = result; |
| 181 | | } |
| 182 | 1 | self._log(log, message, done); |
| 183 | | } |
| 184 | | }); |
| 185 | | } catch (e) { |
| 186 | 0 | log.state = common.STATE_ERROR; |
| 187 | 0 | log.error = e.stack || e; |
| 188 | 0 | log.finished = new Date(); |
| 189 | 0 | self._log(log, message, done); |
| 190 | | } |
| 191 | | }; |
| 192 | | |
| 193 | 1 | Queue.prototype._process = function (message) { |
| 194 | 2 | var tasks = this._tasks[message.topic]; |
| 195 | 2 | var self = this; |
| 196 | 2 | var count = 0; |
| 197 | 2 | var results = {}; |
| 198 | 2 | var errors = {}; |
| 199 | | |
| 200 | | function checkFinish() { |
| 201 | 2 | if (++count === tasks.length) { |
| 202 | 2 | if (Object.keys(errors).length > 0) { |
| 203 | | // errors found |
| 204 | 1 | self.resultColl |
| 205 | | .update({_id: message._id}, {$set: {state: common.STATE_PARTIAL}}, {safe: true}) |
| 206 | | .then(function () { |
| 207 | 1 | self._emitOrLog('finished', [message.topic, errors, results]); |
| 208 | | // shift next |
| 209 | 1 | self._currentWorkers--; |
| 210 | 1 | self._shift(); |
| 211 | | }); |
| 212 | | } else { |
| 213 | 1 | self._emitOrLog('finished', [message.topic, errors, results]); |
| 214 | | // shift next |
| 215 | 1 | self._currentWorkers--; |
| 216 | 1 | self._shift(); |
| 217 | | } |
| 218 | | } |
| 219 | | } |
| 220 | | |
| 221 | | // run each tasks |
| 222 | 2 | tasks.forEach(function (task) { |
| 223 | 2 | self._runTask(task, message, errors, results, checkFinish); |
| 224 | | }); |
| 225 | | }; |
| 226 | | |
| 227 | 1 | Queue.prototype._log = function (log, message, callback) { |
| 228 | 2 | var logName = 'logs.' + log.name; |
| 229 | 2 | var setDoc = { |
| 230 | | topic: log.topic, |
| 231 | | state: log.state |
| 232 | | }; |
| 233 | 2 | var pushDoc = {}; |
| 234 | 2 | pushDoc[logName] = { |
| 235 | | started: log.started, |
| 236 | | finished: log.finished, |
| 237 | | state: log.state |
| 238 | | }; |
| 239 | 2 | if (log.error) { |
| 240 | 1 | pushDoc[logName].error = log.error; |
| 241 | | } |
| 242 | 2 | if (log.result) { |
| 243 | 1 | pushDoc[logName].result = log.result; |
| 244 | | } |
| 245 | 2 | var deferred = this.resultColl |
| 246 | | .update({_id: log._id}, {$set:setDoc, $push: pushDoc}, {upsert: true, safe: true}) |
| 247 | | .fail(this._onFault(this)); |
| 248 | 2 | if (log.error) { |
| 249 | 1 | var self = this; |
| 250 | 1 | deferred.then(function () { |
| 251 | 1 | self._emitOrLog('error', [log, message]); |
| 252 | | }); |
| 253 | | } |
| 254 | 2 | if ('function' === typeof callback) { |
| 255 | 2 | deferred.then(callback); |
| 256 | | } |
| 257 | | }; |
| 258 | | |
| 259 | 1 | Queue.prototype._onFault = function (self, workerFinished) { |
| 260 | 4 | return function onFault(err) { |
| 261 | 0 | if (err) { |
| 262 | 0 | self._emitOrLog('fault', [err.stack || err]); |
| 263 | | } |
| 264 | 0 | if (workerFinished) { |
| 265 | 0 | self._currentWorkers--; |
| 266 | | } |
| 267 | | }; |
| 268 | | }; |
| 269 | | |
| 270 | 1 | Queue.prototype._emitOrLog = function (event, args) { |
| 271 | 3 | if (this.listeners(event).length > 0) { |
| 272 | 3 | args.unshift(event); |
| 273 | 3 | this.emit.apply(this, args); |
| 274 | | } else { |
| 275 | 0 | debug("Event %s: %s", event, JSON.stringify(args)); |
| 276 | | } |
| 277 | | }; |
| 278 | | |
| 279 | 1 | Queue.prototype.start = function (workers) { |
| 280 | 1 | this._state = 'running'; |
| 281 | 1 | if (workers) { |
| 282 | 0 | this._maxWorkers = workers; |
| 283 | | } |
| 284 | 1 | for (var i = 0; i < this._maxWorkers; i++) { |
| 285 | 2 | this._shift(); |
| 286 | | } |
| 287 | | }; |
| 288 | | |
| 289 | 1 | Queue.prototype.stop = function () { |
| 290 | 3 | this._state = 'stopped'; |
| 291 | | }; |
| 292 | | |
| 293 | 1 | Queue.prototype.close = function () { |
| 294 | 3 | this.stop(); |
| 295 | 3 | this.db.close(); |
| 296 | | }; |
| 297 | | |
| 298 | | function nextQueueId(coll) { |
| 299 | 2 | return coll.findAndModify({_id: 'nextQueueId'}, [], {$inc: {currentId: 1}}, {'new': true, upsert: 'true'}); |
| 300 | | } |