> background-task@0.2.3 test /private/tmp/nbg-coverage.z5hJV4i > mocha Coverage

Coverage

91%
447
410
37

lib/background_task.js

91%
149
137
12
LineHitsSource
1// Copyright 2012 Kinvey, Inc
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
151"use strict";
16
171var redis = require("redis")
18 , EventEmitter = require('events').EventEmitter
19 , util = require('util')
20 , message = require('./messaging')
21 , task_limit = require('./task_limit')
22 , blacklist = require('./blacklist')
23 , wrapError;
24
25
26// We should handle this...
271wrapError = function(emitter){
28127 return function(error){
291 emitter.emit('error', error);
30 };
31};
32
331exports.connect = (function(){
341 var callbacks = []
35 , makeTimeoutError
36 , BackgroundTask
37 , extractResponse;
38
391 BackgroundTask = function(options){
4063 EventEmitter.call(this);
41
4263 if (!options){
431 options = {};
44 }
45
4663 if (options.isWorker){
4731 options.isResponder = true;
48 }
49
5063 if (options.taskKey){
5132 this.taskKey = options.taskKey;
5232 if (!options.maxTasksPerKey){
534 this.maxTasksPerKey = options.maxTasksPerKey = 5;
54 }
55
5632 this.taskLimit = new task_limit.TaskLimit(options);
5732 this.taskLimit.on('error', wrapError(this));
58
5932 this.blacklist = new blacklist.Blacklist(options);
6032 this.blacklist.on('error', wrapError(this));
61
62 }
63
6463 this.msgBus = new message.connect(options);
6563 this.timeout = 5000; // 5 second defualt timeout
66
6763 this.msgBus.on('error', wrapError(this));
68
6963 if (options.task){
701 options.broadcast = options.task + "Broadcast";
711 options.dataHash = options.task + "Table";
721 options.outputHash = options.task + "Hash";
73 }
74
7563 if (options && options.timeout){
762 this.timeout = options.timeout;
77 }
78
79
8063 if (options.isWorker){
8131 var that = this;
8231 this.msgBus.on('data_available', function(id){
8316 that.emit('TASK_AVAILABLE', id);
84 });
85 }
86
87
88 // Simple way to ensure we're not shut down
8963 this.isAvailable = true;
90 };
91
92 // Inherit EventEmitter's methods
931 util.inherits(BackgroundTask, EventEmitter);
94
951 BackgroundTask.prototype.end = function(){
9663 if (!this.isAvailable) return; // Nothing to do
97
9863 this.isAvailable = false;
99 // Hard end, don't worry about shutting down
100 // gracefully here...
10195 if (this.blacklist) this.blacklist.shutdown();
10295 if (this.taskLimit) this.taskLimit.shutdown();
10363 this.msgBus.shutdown();
10463 this.removeAllListeners();
105 };
106
1071 BackgroundTask.prototype.acceptTask = function(id, callback){
10818 var newCallback;
109
11018 if (!this.isAvailable){
1110 callback(new Error("Attempt to use invalid BackgroundTask"));
1120 return;
113 }
114
11518 if (!id || id.length === 0){
1163 throw new Error('Missing Task ID.');
117 }
118
11915 if (!callback || callback.length < 1){
1202 throw new Error('Invalid callback specified');
121 }
122
12313 newCallback = function(reply){
12413 if (reply instanceof Error){
1251 if (reply.message === "DB doesn't recognize message"){
1261 reply = new Error('Task not in database, do not accept');
127 }
128 }
129
13013 if (reply.taskDetails){
13112 callback(reply.taskDetails);
132 } else {
1331 callback(reply);
134 }
135 };
13613 this.msgBus.acceptMessage(id, newCallback);
137 };
138
1391 extractResponse = function(r){
14015 var response, id, respondedErr, o;
141
14215 if (!r.taskId && !r.taskDetails){
1430 throw new Error("Incomplete task response.");
144 }
145
14615 id = r.taskId;
14715 response = r.taskDetails;
148
14915 if (response.isError){
1501 respondedErr = new Error(response.message);
1511 for (o in response){
1523 if (response.hasOwnProperty(o) &&
153 o !== 'isError' &&
154 o !== 'message'){
1551 respondedErr[o] = response[o];
156 }
157 }
1581 response = respondedErr;
159 }
160
16115 return {id: id, details: response};
162 };
163
1641 BackgroundTask.prototype.addTask = function(msg, callback){
16520 var that = this
166 , id = message.makeId()
167 , cb, timeoutId, timedoutCb, msgToSend, tmpErr, startTheTask;
168
16920 if (!this.isAvailable){
1701 callback(id, new Error("Attempt to use invalid BackgroundTask"));
1711 return;
172 }
173
174
17519 startTheTask = function(){
17616 that.taskLimit.startTask(msg, function(tasks){
17716 var err, responseCb;
17816 if (tasks instanceof Error){
1791 err = new Error('Too many tasks');
1801 that.emit('TASK_ERROR', err);
1811 callback(id, err);
182 }
183
18416 callbacks[id] = callback;
185
18616 responseCb = function(resp){
1877 var uniqueIndex = id // Make this callback unique
188 , rply = extractResponse(resp);
189
1907 that.emit('TASK_DONE', rply.id, rply.details);
191 };
192
19316 cb = function(reply){
1948 var origCallback
195 , tid = id
196 , details = reply
197 , rply, fallback = false;
198
1998 try {
2008 rply = extractResponse(reply);
2018 details = rply.details;
2028 tid = rply.id;
203 } catch (e) {
204 // The system had an error
2050 that.emit('TASK_ERROR', e);
206 }
2078 origCallback = callbacks[tid];
208
2098 that.taskLimit.stopTask(msg);
2108 clearTimeout(timeoutId);
2118 origCallback(tid, details);
2128 delete callbacks[tid];
213 };
214
21516 timedoutCb = function(){
2165 var origCallback = callbacks[id];
217 // replace the "orig" callback with an empty function
218 // in case the request still completes in the future and
219 // tries to call our callback.
2205 callbacks[id] = function(reply){};
221
222 // Return an error
2235 that.taskLimit.stopTask(msg);
2245 origCallback(id, makeTimeoutError());
2255 that.msgBus.removeListener('responseReady', responseCb);
226 };
227
22816 msgToSend = {
229 taskId: id,
230 taskDetails: msg
231 };
232
23316 timeoutId = setTimeout(timedoutCb, that.timeout);
23416 that.msgBus.sendMessage(id, msgToSend, cb);
23516 that.msgBus.once('responseReady',responseCb);
236 });
237 };
238
23919 if (that.taskKey && msg[that.taskKey]){
24018 that.blacklist.blacklistStatus(msg, function(isBlacklisted, timeLeft, reason){
24118 var tmpErr;
24218 if (isBlacklisted){
2432 tmpErr = new Error('Blacklisted');
2442 tmpErr.debugMessage = "Blocked, reason: " + reason + ", remaining time: " + timeLeft;
2452 that.emit('TASK_ERROR', tmpErr);
2462 callback(id, tmpErr);
247 } else {
24816 startTheTask();
249 }
250 });
251 } else {
2521 tmpErr = new Error('Missing taskKey');
2531 that.emit('TASK_ERROR', tmpErr);
2541 callback(id, tmpErr);
255 }
256 };
257
2581 BackgroundTask.prototype.completeTask = function(taskId, status, msg){
25923 var that = this
260 , msgToSend, serializableError, o;
261
262
26323 if (!this.isAvailable){
2640 throw new Error("Attempt to use invalid BackgroundTask");
265 }
266
267 // We can't send Error's via JSON...
26823 if (msg instanceof Error){
2691 serializableError = {
270 isError: true,
271 message: msg.message
272 };
273
2741 for (o in msg){
2751 if (msg.hasOwnProperty(o)){
2761 serializableError[o] = msg[o];
277 }
278 }
279
2801 msg = serializableError;
281 }
282
283
28423 msgToSend = {
285 taskId: taskId,
286 taskDetails: msg
287 };
288
28923 if (!msg){
2901 throw new Error('Missing msgId, status or msg.');
291 }
292
29322 that.msgBus.sendResponse(taskId, status, msgToSend);
294 };
295
2961 BackgroundTask.prototype.reportBadTask = function(taskKey, reason, callback){
2970 var reasonToSend;
298
2990 if (!this.isAvailable){
3000 callback("ERR", "Attempt to use invalid BackgroundTask");
3010 return;
302 }
303
304
3050 if (reason instanceof Error){
3060 reasonToSend = reason.message;
307 }
308
3090 this.blacklist.addFailure(taskKey, reason, callback);
310
311 };
312
313
3141 makeTimeoutError = function(){
3155 return new Error('Task timed out');
316 };
317
318
3191 return function(options){
32063 return new BackgroundTask(options);
321 };
322
323}());

lib/blacklist.js

87%
73
64
9
LineHitsSource
1// Copyright 2012 Kinvey, Inc
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
141"use strict";
15
161var redis = require("redis")
17 , util = require('util')
18 , EventEmitter = require('events').EventEmitter
19 , wrapError;
20
21
221wrapError = function(emitter){
2351 return function(error){
241 emitter.emit('error', error);
25 };
26};
27
28
291var Blacklist = exports.Blacklist = function(options){
3051 EventEmitter.call(this);
31
3251 if (!options){
330 throw new Error("I need a task key!");
34 }
35
3652 if (options.host){ this.redisHost = options.host; }
3752 if (options.port){ this.redisPort = options.port; }
3852 if (options.password){ this.redisPassword = options.password; }
39
4051 this.taskKey = options.taskKey;
41
4251 this.failureInterval = options.failureInterval || 1;
4351 this.blacklistThreshold = options.blacklistThreshold || 10;
4451 this.globalBlacklistTimeout = options.globalBlacklistTimeout || 3600;
45
4651 this.logBlacklist = options.logBlacklist || false; // NB: This won't work if you want the default to be true
47
4851 this.redisKeyPrefix = "blacklist:";
4951 this.globalBlacklistKeyPrefix = this.redisKeyPrefix + "globalBlacklist:";
5051 this.blacklistLogKeyPrefix = this.redisKeyPrefix + "logs:"
51
5251 this.blacklistClient = redis.createClient(this.redisPort, this.redisHost);
53
5451 this.blacklistClient.on('error', wrapError(this));
55
5651 if (options.password){
571 this.blacklistClient.auth(options.password);
58 }
59
60};
61
62// Inherit EventEmitter's methods
631util.inherits(Blacklist, EventEmitter);
64
651Blacklist.prototype.blacklistStatus = function(task, callback){
6625 var taskKey = task && task[this.taskKey]
67 , redisKey
68 , that = this;
69
7025 if (!callback){
710 callback = function(){};
72 }
73
7425 if (!taskKey){
750 callback(false, "No task key, can't check blacklist.");
76 } else {
7725 redisKey = that.globalBlacklistKeyPrefix + taskKey;
7825 that.blacklistClient.get(redisKey, function(error, reply){
7925 if (reply){
80 // We're blacklisted
817 that.blacklistClient.ttl(redisKey, function(error, timeRemaining){
827 if (timeRemaining){
837 callback(true, timeRemaining, reply);
84 } else {
850 callback(true, -1, reply);
86 }
87 });
88 } else {
8918 callback(false, -1, "");
90 }
91 });
92 }
93};
94
951Blacklist.prototype.addFailure = function(taskKey, reason, callback){
9663 var errKey, countKey, that = this;
97
9863 if (!callback){
990 callback = function(){};
100 }
101
10263 if (!reason){
1031 callback(new Error("Must supply a reason for the failure"));
1041 return;
105 }
106
10762 if (!taskKey){
1080 callback(new Error("Invalid task, not running."));
109 } else {
11062 countKey = that.redisKeyPrefix + taskKey + ":count";
111
11262 that.blacklistClient.get(countKey, function(error, reply){
11362 var blacklistKey;
114
115 // Count not in redis
11662 if (!reply){
11714 that.blacklistClient.setex(countKey, that.failureInterval, "1", function(e, r){
11814 if (!error){
11914 callback("OK");
120 } else {
1210 callback(error);
122 }
123 });
124 } else {
12548 if (reply >= that.blacklistThreshold){
126 // Blacklist
12710 blacklistKey = that.globalBlacklistKeyPrefix + taskKey;
12810 that.blacklistClient.setex(blacklistKey, that.globalBlacklistTimeout, reason, function(e, r){
12910 var logKey, d;
130
13110 if (!error){
13210 if (that.logBlacklist){
1335 d = new Date();
1345 logKey = that.blacklistLogKeyPrefix + taskKey;
135 // Fire and forget
1365 that.blacklistClient.rpush(logKey, d + "|" + reason);
137 }
13810 callback("Blacklisted");
139 } else {
1400 callback(error);
141 }
142
143 });
144 } else {
14538 that.blacklistClient.incr(countKey, function(error, reply){
14638 if (!error){
14738 callback("OK");
148 } else {
1490 callback(error);
150 }
151 });
152 }
153 }
154 });
155 }
156};
157
1581Blacklist.prototype.shutdown = function(){
15932 this.blacklistClient.end();
160};
161
162

lib/messaging.js

91%
185
169
16
LineHitsSource
1// Copyright 2012 Kinvey, Inc
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15
161"use strict";
17
181var uuid = require('node-uuid')
19 , util = require('util')
20 , redis = require('redis')
21 , EventEmitter = require('events').EventEmitter
22 , connect
23 , MessageBus
24 , makeId
25 , wrapError
26 , initResponder
27 , initCreator
28 , authClient
29 , completeTransaction
30 , KILOBYTES = 1024
31 , MEGABYTES = 1024 * 1024
32 , log;
33
34
351log = function(msg){
360 var d = new Date()
37 , t = d.toISOString();
380 util.debug(t + ": " + msg);
39};
40
411wrapError = function(that, evt){
421 that.emit('error', evt);
43};
44
451exports.makeId = makeId = function(){
4610258 return uuid().replace(/-/g, "");
47};
48
491initCreator = function(that){
5057 that.subClient.subscribe(that.listenChannel);
5157 that.subClient.on('message', function(channel, message){
5214 completeTransaction(that, channel, message);
53 });
54};
55
56
571initResponder = function(that){
5856 var util = require('util');
5956 that.subClient.subscribe(that.broadcastChannel);
6056 that.subClient.on('message', function(channel, message){
6128 var msgBody, id;
6228 if (channel !== that.broadcastChannel){
63 // This shouldn't happen, it would mean
64 // that we've accidentally subscribed to
65 // an extra redis channel...
66 // If for some crazy reason we get an incorrect
67 // channel we should ignore the message
680 return;
69 }
7028 id = message;
7128 that.emit('data_available', id);
72 });
73};
74
751authClient = function(password, client, debug){
76339 if (password){
773 client.auth(password, function(res){
780 if (debug){
790 console.log("Auth'd redis messaging started with " + res);
80 }
81 });
82 }
83};
84
851exports.MessageBus = MessageBus = function(options){
86113 var that;
87
88113 if (!options){
8924 options = {};
90 }
91
92113 EventEmitter.call(this);
93113 this.redisHost = null;
94113 this.redisPort = null;
95113 this.redisPassword = null;
96113 this.debug_is_enabled = false;
97113 this.idToChannelMap = {};
98113 this.callbackMap = {};
99113 this.listenChannel = "msgChannels:" + makeId();
100113 this.broadcastChannel = (options && options.broadcast) || "msgChannels:broadcast";
101113 this.dataHash = (options && options.dataHash) || "msgTable:normal";
102113 this.responseHash = (options && options.outputHash) || "responseTable:normal";
103113 this.shutdownFlag = false;
104113 this.id = makeId(); // For debugging / logging
105
106117 if (options && options.host){ this.redisHost = options.host; }
107115 if (options && options.port){ this.redisPort = options.port; }
108114 if (options && options.password){ this.redisPassword = options.password; }
109
110113 this.pubClient = redis.createClient(this.redisPort, this.redisHost);
111113 this.subClient = redis.createClient(this.redisPort, this.redisHost);
112113 this.dataClient = redis.createClient(this.redisPort, this.redisHost);
113
114113 authClient(this.redisPassword, this.dataClient, this.debug_is_enabled);
115113 authClient(this.redisPassword, this.pubClient, this.debug_is_enabled);
116113 authClient(this.redisPassword, this.subClient, this.debug_is_enabled);
117
118113 if (options && options.isResponder){
11956 initResponder(this);
120 } else {
12157 initCreator(this);
122 }
123
124113 that = this;
125113 this.dataClient.on('error', function(evt){wrapError(that, evt);});
126114 this.pubClient.on('error', function(evt){wrapError(that, evt);});
127113 this.subClient.on('error', function(evt){wrapError(that, evt);});
128};
129
1301util.inherits(MessageBus, EventEmitter);
131
1321completeTransaction = function(that, ch, msg){
13314 var response, id, status, parsedMsg, daFaq, callback, multi;
134
13514 if (ch !== that.listenChannel){
136 // This can NEVER happen (except for if there's an error)
137 // in our code. So the check is left in, but
138 // should never be hit.
1390 that.emit('error', new Error("Got message for some other channel (Expected: " +
140 that.listenChannel + ", Actual: " + ch + ")"));
141 // Bail out!
1420 return;
143 }
144
14514 parsedMsg = msg.split(' ');
14614 if (parsedMsg.length < 2){
1471 that.emit('error', new Error("Invalid message received!"));
148 // Bail out
1491 return;
150 }
151
15213 id = parsedMsg[0];
15313 status = parsedMsg[1];
154
15513 callback = that.callbackMap[id];
156
15713 multi = that.dataClient.multi();
158
15913 multi.hget(that.responseHash, id)
160 .hdel(that.responseHash, id)
161 .exec(function(err, reply){
16212 var myErr = null, respondedErr, o;
16312 if (err){
1640 myErr = new Error("REDIS Error: " + err);
165 }
166
16712 if (!util.isArray(reply) && !myErr){
1680 myErr = new Error("Internal REDIS error (" + err + ", " + reply + ")");
16912 } else if (util.isArray(reply)){
170 // Reply[0] => hget, Reply[1] => hdel
17112 reply = reply[0];
172 }
173
17412 if (reply === null && !myErr){
1751 myErr = new Error("No message for id " + id);
176 }
177
178
17912 if (!myErr){
18011 try {
18111 response = JSON.parse(reply);
182 } catch(e){
1831 myErr = new Error("JSON parsing failed! " + e);
184 }
185 }
186
18712 if (myErr){
1882 response = myErr;
1892 that.emit('error', myErr);
1902 return;
191 }
192
19310 if (status !== 'SUCCESS'){
194 // Build the error
1951 that.emit('error', response);
196
197 // If we're hard failed we stop working here,
198 // no success
1991 if (status === "FAILED"){
2001 if (callback){
2011 callback(response);
2021 delete that.callbackMap[id];
203 }
2041 return;
205 }
206 }
207
208
2099 that.emit('responseReady', response);
210
2119 if (callback){
2129 callback(response);
2139 delete that.callbackMap[id];
214 }
215 });
216
217};
218
2191MessageBus.prototype.sendMessage = function(id, msg, callback){
22021 var that = this
221 , err
222 , msgString;
223
22421 if (this.shutdownFlag){
2251 callback(new Error("Attempt to use shutdown MessageBus."));
2261 return;
227 }
228
22920 msg._listenChannel = this.listenChannel;
23020 msg._messageId = id; // Store the message id in-band
231
23220 msgString = JSON.stringify(msg);
233
23420 if (!msgString){
2351 callback(new Error("Error converting message to JSON."));
2361 return;
237 }
238
239 // TODO: This needs to be an option for the class
24019 if (msgString.length > MEGABYTES){
2411 callback(new Error("Payload too large!"));
2421 return;
243 }
244
24518 this.dataClient.hset(this.dataHash, id, msgString, function(err, reply){
24617 if (err){
2470 callback(new Error("Error sending message: " + err));
2480 return;
249 }
25017 that.pubClient.publish(that.broadcastChannel, id);
251 });
25218 this.callbackMap[id] = callback;
253};
254
2551MessageBus.prototype.acceptMessage = function(mid, callback){
25623 var that = this
257 , multi
258 , derivedId;
259
26023 if (!mid){
2612 throw new Error('Missing Message ID.');
262 }
263
26421 if (!callback){
2651 throw new Error('Invalid callback.');
266 }
267
26820 if (callback.length < 1){
2691 throw new Error('Missing parameters in callback.');
270 }
271
27219 multi = that.dataClient.multi();
273
274 // Pipeline!
27519 multi
276 .hget(that.dataHash, mid)
277 .hdel(that.dataHash, mid)
278 .exec(function(err, reply){
27919 var listenChannel, id, dataErr, msgBody;
280
281 // First make sure that everything is probably ok
28219 if (!util.isArray(reply)){
2830 dataErr = new Error("Internal REDIS error (" + err + ", " + reply + ")");
2840 callback(dataErr);
2850 return;
286 } else {
287 // reply is good, let's just get the first element
288 // as the indeices should be hget,hdel
28919 reply = reply[0];
290 }
291
29219 try {
29319 msgBody = JSON.parse(reply);
294 } catch(e) {
2951 dataErr = new Error("Bad data in sent message, " + e);
2961 callback(dataErr);
2971 return;
298 }
299
30018 if (!msgBody){
3013 dataErr = new Error("DB doesn't recognize message");
3023 callback(dataErr);
3033 return;
304 }
305
30615 derivedId = id = msgBody._messageId;
30715 listenChannel = msgBody._listenChannel;
308
30915 if (id !== mid){
3100 console.log("ERROR: Mis-match on ids! (" + id + " does not equal " + mid + ")");
311 }
312
31315 that.idToChannelMap[id] = listenChannel;
314
315
31615 delete msgBody._listenChannel;
31715 delete msgBody._messageId;
318
31915 callback(msgBody);
320
321 });
322};
323
3241MessageBus.prototype.sendResponse = function(msgId, status, msg){
32537 if (!msgId || !status || !msg){
3265 throw new Error("Missing msgId, status or msg.");
327 }
328
32932 if (status !== 'SUCCESS' &&
330 status !== 'ERROR' &&
331 status !== 'FAILED')
332 {
33312 throw new Error(status + ' is not a valid status.');
334 }
335
33620 var listenChannel = this.idToChannelMap[msgId]
337 , that = this, serializableError, o, tmpErr, testMode;
338
33920 if (arguments.length === 4){
3403 testMode = arguments[3];
341 }
342
34320 if (!listenChannel && !testMode){
344 // We have no record of this request
345 // probably a callback being called twice, but
346 // still need to throw
3471 tmpErr = new Error('Attempt to respond to message that was never accepted');
3481 tmpErr.debugMessage = msgId + " is not registered as having been accepted";
3491 throw tmpErr;
350 }
351
35219 this.dataClient.hset(this.responseHash, msgId, JSON.stringify(msg), function(err, reply){
35316 delete that.idToChannelMap[msgId];
354
35516 if (err){
3560 throw new Error(err);
357 }
35816 that.pubClient.publish(listenChannel, msgId + " " + status);
359 });
360};
361
362
3631MessageBus.prototype.shutdown = function(){
364106 this.subClient.removeAllListeners('message');
365106 this.dataClient.removeAllListeners('error');
366106 this.pubClient.removeAllListeners('error');
367106 this.subClient.removeAllListeners('error');
368106 this.dataClient.end();
369106 this.pubClient.end();
370106 this.subClient.end();
371106 this.removeAllListeners();
372106 this.shutdownFlag = true;
373};
374
3751exports.connect = connect = function(options){
376113 return new MessageBus(options);
377};

lib/task_limit.js

100%
40
40
0
LineHitsSource
1// Copyright 2012 Kinvey, Inc
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
141"use strict";
15
161var redis = require("redis")
17 , util = require('util')
18 , EventEmitter = require('events').EventEmitter
19 , wrapError;
20
21
221wrapError = function(emitter){
2336 return function(error){
241 emitter.emit('error', error);
25 };
26};
27
28
291var TaskLimit = exports.TaskLimit = function(options){
3037 EventEmitter.call(this);
31
3237 if (!options){
331 throw new Error("I need a task key!");
34 }
35
3637 if (options.host){ this.redisHost = options.host; }
3737 if (options.port){ this.redisPort = options.port; }
3837 if (options.password){ this.redisPassword = options.password; }
39
4036 this.taskKey = options.taskKey;
4136 this.redisKeyPrefix = "taskKey:";
4236 this.maxTasksPerKey = options.maxTasksPerKey||10;
4336 this.taskLimitClient = redis.createClient(this.redisPort, this.redisHost);
44
4536 this.taskLimitClient.on('error', wrapError(this));
46
4736 if (options.password){
481 this.taskLimitClient.auth(options.password);
49 }
50
51};
52
53// Inherit EventEmitter's methods
541util.inherits(TaskLimit, EventEmitter);
55
561TaskLimit.prototype.startTask = function(task, callback){
5740 var value = task && task[this.taskKey]
58 , redisKey, that = this;
59
6040 if (!callback){
618 callback = function(){};
62 }
63
6440 if (!value){
654 callback(new Error("Invalid task, not running."));
66 } else {
6736 redisKey = that.redisKeyPrefix + value;
6836 that.taskLimitClient.llen(redisKey, function(err, len){
6936 if (len >= that.maxTasksPerKey){
702 callback(new Error("Too many tasks"));
71 } else {
7234 that.taskLimitClient.lpush(redisKey, Date(), function(x){
7334 callback(len+1);
74 });
75 }
76 });
77 }
78};
79
801TaskLimit.prototype.stopTask = function(task){
8133 var value = task && task[this.taskKey]
82 , redisKey, that = this;
83
84
8533 if (!value){
864 throw new Error("Invalid task, can't stop.");
87 } else {
8829 redisKey = that.redisKeyPrefix + value;
8929 that.taskLimitClient.lpop(redisKey);
90 }
91};
92
931TaskLimit.prototype.shutdown = function(){
9432 this.taskLimitClient.end();
95};