Coverage

90%
161
145
16

lib/common.js

100%
14
14
0
LineHitsSource
1/**
2 * Queue status constants
3 */
4
51exports.STATE_ERROR = -1;
61exports.STATE_NEW = 0;
71exports.STATE_SHIFTED = 1;
81exports.STATE_PARTIAL = 2;
91exports.STATE_FINISHED = 3;
10
11
12/**
13 * Private fucntions
14 */
15
161exports.setDefaultOptions = function(options) {
173 options = options || {};
183 options.dbHost = options.dbHost || '127.0.0.1';
193 options.dbPort = options.dbPort || 27017;
203 options.dbName = options.dbName || 'nomoque_default_queue';
213 return options;
22};
23
241exports.ensureIndexes = function(db) {
253 db.collection('queues').ensureIndex([
26 ['date', 1],
27 ['topic', 1],
28 ['state', 1]
29 ]);
30 // index for `results`
313 db.collection('results').ensureIndex([
32 ['date', 1],
33 ['topic', 1],
34 ['state', 1],
35 ['created', -1],
36 ['shifted', -1],
37 ['finished', -1]
38 ]);
39};

lib/index.js

100%
2
2
0
LineHitsSource
11exports.common = require('./common');
21exports.Queue = require('./queue');

lib/queue.js

88%
145
129
16
LineHitsSource
1/**
2 * Module dependencies
3 */
41var EventEmitter = require('events').EventEmitter;
51var common = require('./common');
61var dateformat = require('dateformat');
71var debug = require('debug')('nmq:queue');
8
9
10/**
11 * Queue
12 */
131var Queue = module.exports = function Queue(db, options) {
143 EventEmitter.call(this);
153 common.ensureIndexes(db);
163 this._tasks = {};
173 this._topics = [];
183 this.delay = 1;
193 this._state = 'stopped';
203 this._maxWorkers = 2;
213 this._currentWorkers = 0;
223 this.db = db;
233 var queColl = 'queues';
243 var resultColl = 'results';
253 var configColl = 'configs';
263 options = options || {};
273 var collectionPrefix = options.collectionPrefix;
283 if (collectionPrefix) {
290 queColl = collectionPrefix + '_' + queColl;
300 resultColl = collectionPrefix + '_' + resultColl;
310 configColl = collectionPrefix + '_' + configColl;
32 }
333 this.queColl = db.collection(queColl);
343 this.resultColl = db.collection(resultColl);
353 this.configColl = db.collection(configColl);
36};
37
38/**
39 * Inherits `EventEmitter`
40 */
411Queue.prototype.__proto__ = EventEmitter.prototype;
42
43/**
44 * Push a message to a topic queue
45 *
46 * @param {String} topic
47 * @param {Object} message
48 */
491Queue.prototype.push = function (topic, message) {
502 var self = this;
51
52 function saveMsg(defer, nextIdDoc) {
532 var qid = nextIdDoc.currentId;
542 var date = new Date;
552 var msg = {_id: qid, topic: topic, message: message,
56 state: common.STATE_NEW,
57 date: dateformat(date, 'yyyymmdd'),
58 created: date
59 };
602 self.queColl.insert(msg, {safe: true}).then(function (doc) {
612 if (doc) {
622 self.emit('queued', Array.isArray(doc) ? doc[0] : doc);
63 }
64 }).fail(defer.error);
65 }
66
672 nextQueueId(this.configColl).and(saveMsg).fail(function (err) {
680 self.emit('fault', err);
69 });
70};
71
72
73/**
74 * Register a named task for specific topic
75 *
76 * @param {String} topic Topic of the queue
77 * @param {String} name Name of the task
78 * @param {Function} fn
79 */
801Queue.prototype.process = function (topic, name, fn) {
812 var taskHash = {};
822 if (typeof name === 'object') {
830 taskHash = name;
84 } else {
852 taskHash[name] = fn;
86 }
872 if (this._topics.indexOf(topic) === -1) {
882 this._topics.push(topic);
89 }
902 if (!this._tasks[topic]) {
912 this._tasks[topic] = [];
92 }
932 var self = this;
942 Object.keys(taskHash).forEach(function (taskName) {
952 self._tasks[topic].push({name: taskName, fn: taskHash[taskName]});
96 });
97};
98
99/**
100 * Move message from one collection to another
101 *
102 * @param message
103 * @param fromColl
104 * @param toColl
105 * @returns {*}
106 * @private
107 */
1081Queue.prototype._mvMessages = function (message, fromColl, toColl) {
109 // update don't like `_id`
1102 var id = message._id;
1112 delete message['_id'];
112 // upsert the message into `toColl`
1132 return toColl.update({_id: id}, {$set: message}, {safe: true, upsert: true})
114 .and(function (defer) {
115 // remove the old one from `from`
1162 fromColl.remove({_id: id}, {safe: true})
117 .then(function () {
1182 message._id = id;
1192 defer.next(message);
120 }).fail(defer.error);
121 }).fail(this._onFault(this, true));
122};
123
1241Queue.prototype._shift = function () {
1254 if (this._state === 'stopped' || this._currentWorkers++ >= this._maxWorkers) {
1261 return;
127 }
1283 var self = this;
129 // get messages we are interested
1303 this.queColl.findAndModify({topic: {$in: self._topics}, state: common.STATE_NEW},
131 // use `nature` order
132 [],
133 // modify the message status
134 {$set: {shifted: new Date, state: common.STATE_SHIFTED}},
135 // return updated message
136 {remove: false, 'new': true, upsert: false}).then(function (message) {
1373 if (!message) {
1381 tryNext();
1391 return;
140 }
141 // handle shifted message
142 // move the message from `queues` to collection `results`
1432 self._mvMessages(message, self.queColl, self.resultColl).then(function (message) {
144 // perform tasks
1452 self._process(message);
146 });
147 }).fail(tryNext);
148 function tryNext() {
149 // if nothing, wait for `delay` and try shift again
1501 self._currentWorkers--;
1511 setTimeout(function () {
1520 self._shift();
153 }, self.delay * 1000);
154 }
155};
156
1571Queue.prototype._runTask = function (task, message, errors, results, done) {
1582 var self = this;
1592 var log = {
160 _id: message._id,
161 topic: message.topic,
162 name: task.name,
163 started: new Date(),
164 state: common.STATE_INPROGRESS
165 };
1662 try {
1672 task.fn.call(message, message.message, function (taskErr, result) {
1682 log.finished = new Date();
1692 if (taskErr) {
170 // record error
1711 log.state = common.STATE_ERROR;
1721 log.error = taskErr;
1731 errors[task.name] = taskErr;
1741 self._log(log, message, done);
175 } else {
1761 log.state = common.STATE_FINISHED;
177 // record result if any
1781 if ('undefined' !== typeof result) {
1791 results[task.name] = result;
1801 log.result = result;
181 }
1821 self._log(log, message, done);
183 }
184 });
185 } catch (e) {
1860 log.state = common.STATE_ERROR;
1870 log.error = e.stack || e;
1880 log.finished = new Date();
1890 self._log(log, message, done);
190 }
191};
192
1931Queue.prototype._process = function (message) {
1942 var tasks = this._tasks[message.topic];
1952 var self = this;
1962 var count = 0;
1972 var results = {};
1982 var errors = {};
199
200 function checkFinish() {
2012 if (++count === tasks.length) {
2022 if (Object.keys(errors).length > 0) {
203 // errors found
2041 self.resultColl
205 .update({_id: message._id}, {$set: {state: common.STATE_PARTIAL}}, {safe: true})
206 .then(function () {
2071 self._emitOrLog('finished', [message.topic, errors, results]);
208 // shift next
2091 self._currentWorkers--;
2101 self._shift();
211 });
212 } else {
2131 self._emitOrLog('finished', [message.topic, errors, results]);
214 // shift next
2151 self._currentWorkers--;
2161 self._shift();
217 }
218 }
219 }
220
221 // run each tasks
2222 tasks.forEach(function (task) {
2232 self._runTask(task, message, errors, results, checkFinish);
224 });
225};
226
2271Queue.prototype._log = function (log, message, callback) {
2282 var logName = 'logs.' + log.name;
2292 var setDoc = {
230 topic: log.topic,
231 state: log.state
232 };
2332 var pushDoc = {};
2342 pushDoc[logName] = {
235 started: log.started,
236 finished: log.finished,
237 state: log.state
238 };
2392 if (log.error) {
2401 pushDoc[logName].error = log.error;
241 }
2422 if (log.result) {
2431 pushDoc[logName].result = log.result;
244 }
2452 var deferred = this.resultColl
246 .update({_id: log._id}, {$set:setDoc, $push: pushDoc}, {upsert: true, safe: true})
247 .fail(this._onFault(this));
2482 if (log.error) {
2491 var self = this;
2501 deferred.then(function () {
2511 self._emitOrLog('error', [log, message]);
252 });
253 }
2542 if ('function' === typeof callback) {
2552 deferred.then(callback);
256 }
257};
258
2591Queue.prototype._onFault = function (self, workerFinished) {
2604 return function onFault(err) {
2610 if (err) {
2620 self._emitOrLog('fault', [err.stack || err]);
263 }
2640 if (workerFinished) {
2650 self._currentWorkers--;
266 }
267 };
268};
269
2701Queue.prototype._emitOrLog = function (event, args) {
2713 if (this.listeners(event).length > 0) {
2723 args.unshift(event);
2733 this.emit.apply(this, args);
274 } else {
2750 debug("Event %s: %s", event, JSON.stringify(args));
276 }
277};
278
2791Queue.prototype.start = function (workers) {
2801 this._state = 'running';
2811 if (workers) {
2820 this._maxWorkers = workers;
283 }
2841 for (var i = 0; i < this._maxWorkers; i++) {
2852 this._shift();
286 }
287};
288
2891Queue.prototype.stop = function () {
2903 this._state = 'stopped';
291};
292
2931Queue.prototype.close = function () {
2943 this.stop();
2953 this.db.close();
296};
297
298function nextQueueId(coll) {
2992 return coll.findAndModify({_id: 'nextQueueId'}, [], {$inc: {currentId: 1}}, {'new': true, upsert: 'true'});
300}