> background-task@0.2.3 test /private/tmp/nbg-coverage.z5hJV4i > mocha
Line | Hits | Source |
---|---|---|
1 | // Copyright 2012 Kinvey, Inc | |
2 | // | |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | // you may not use this file except in compliance with the License. | |
5 | // You may obtain a copy of the License at | |
6 | // | |
7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
8 | // | |
9 | // Unless required by applicable law or agreed to in writing, software | |
10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | // See the License for the specific language governing permissions and | |
13 | // limitations under the License. | |
14 | ||
15 | 1 | "use strict"; |
16 | ||
17 | 1 | var redis = require("redis") |
18 | , EventEmitter = require('events').EventEmitter | |
19 | , util = require('util') | |
20 | , message = require('./messaging') | |
21 | , task_limit = require('./task_limit') | |
22 | , blacklist = require('./blacklist') | |
23 | , wrapError; | |
24 | ||
25 | ||
26 | // We should handle this... | |
27 | 1 | wrapError = function(emitter){ |
28 | 127 | return function(error){ |
29 | 1 | emitter.emit('error', error); |
30 | }; | |
31 | }; | |
32 | ||
33 | 1 | exports.connect = (function(){ |
34 | 1 | var callbacks = [] |
35 | , makeTimeoutError | |
36 | , BackgroundTask | |
37 | , extractResponse; | |
38 | ||
39 | 1 | BackgroundTask = function(options){ |
40 | 63 | EventEmitter.call(this); |
41 | ||
42 | 63 | if (!options){ |
43 | 1 | options = {}; |
44 | } | |
45 | ||
46 | 63 | if (options.isWorker){ |
47 | 31 | options.isResponder = true; |
48 | } | |
49 | ||
50 | 63 | if (options.taskKey){ |
51 | 32 | this.taskKey = options.taskKey; |
52 | 32 | if (!options.maxTasksPerKey){ |
53 | 4 | this.maxTasksPerKey = options.maxTasksPerKey = 5; |
54 | } | |
55 | ||
56 | 32 | this.taskLimit = new task_limit.TaskLimit(options); |
57 | 32 | this.taskLimit.on('error', wrapError(this)); |
58 | ||
59 | 32 | this.blacklist = new blacklist.Blacklist(options); |
60 | 32 | this.blacklist.on('error', wrapError(this)); |
61 | ||
62 | } | |
63 | ||
64 | 63 | this.msgBus = new message.connect(options); |
65 | 63 | this.timeout = 5000; // 5 second defualt timeout |
66 | ||
67 | 63 | this.msgBus.on('error', wrapError(this)); |
68 | ||
69 | 63 | if (options.task){ |
70 | 1 | options.broadcast = options.task + "Broadcast"; |
71 | 1 | options.dataHash = options.task + "Table"; |
72 | 1 | options.outputHash = options.task + "Hash"; |
73 | } | |
74 | ||
75 | 63 | if (options && options.timeout){ |
76 | 2 | this.timeout = options.timeout; |
77 | } | |
78 | ||
79 | ||
80 | 63 | if (options.isWorker){ |
81 | 31 | var that = this; |
82 | 31 | this.msgBus.on('data_available', function(id){ |
83 | 16 | that.emit('TASK_AVAILABLE', id); |
84 | }); | |
85 | } | |
86 | ||
87 | ||
88 | // Simple way to ensure we're not shut down | |
89 | 63 | this.isAvailable = true; |
90 | }; | |
91 | ||
92 | // Inherit EventEmitter's methods | |
93 | 1 | util.inherits(BackgroundTask, EventEmitter); |
94 | ||
95 | 1 | BackgroundTask.prototype.end = function(){ |
96 | 63 | if (!this.isAvailable) return; // Nothing to do |
97 | ||
98 | 63 | this.isAvailable = false; |
99 | // Hard end, don't worry about shutting down | |
100 | // gracefully here... | |
101 | 95 | if (this.blacklist) this.blacklist.shutdown(); |
102 | 95 | if (this.taskLimit) this.taskLimit.shutdown(); |
103 | 63 | this.msgBus.shutdown(); |
104 | 63 | this.removeAllListeners(); |
105 | }; | |
106 | ||
107 | 1 | BackgroundTask.prototype.acceptTask = function(id, callback){ |
108 | 18 | var newCallback; |
109 | ||
110 | 18 | if (!this.isAvailable){ |
111 | 0 | callback(new Error("Attempt to use invalid BackgroundTask")); |
112 | 0 | return; |
113 | } | |
114 | ||
115 | 18 | if (!id || id.length === 0){ |
116 | 3 | throw new Error('Missing Task ID.'); |
117 | } | |
118 | ||
119 | 15 | if (!callback || callback.length < 1){ |
120 | 2 | throw new Error('Invalid callback specified'); |
121 | } | |
122 | ||
123 | 13 | newCallback = function(reply){ |
124 | 13 | if (reply instanceof Error){ |
125 | 1 | if (reply.message === "DB doesn't recognize message"){ |
126 | 1 | reply = new Error('Task not in database, do not accept'); |
127 | } | |
128 | } | |
129 | ||
130 | 13 | if (reply.taskDetails){ |
131 | 12 | callback(reply.taskDetails); |
132 | } else { | |
133 | 1 | callback(reply); |
134 | } | |
135 | }; | |
136 | 13 | this.msgBus.acceptMessage(id, newCallback); |
137 | }; | |
138 | ||
139 | 1 | extractResponse = function(r){ |
140 | 15 | var response, id, respondedErr, o; |
141 | ||
142 | 15 | if (!r.taskId && !r.taskDetails){ |
143 | 0 | throw new Error("Incomplete task response."); |
144 | } | |
145 | ||
146 | 15 | id = r.taskId; |
147 | 15 | response = r.taskDetails; |
148 | ||
149 | 15 | if (response.isError){ |
150 | 1 | respondedErr = new Error(response.message); |
151 | 1 | for (o in response){ |
152 | 3 | if (response.hasOwnProperty(o) && |
153 | o !== 'isError' && | |
154 | o !== 'message'){ | |
155 | 1 | respondedErr[o] = response[o]; |
156 | } | |
157 | } | |
158 | 1 | response = respondedErr; |
159 | } | |
160 | ||
161 | 15 | return {id: id, details: response}; |
162 | }; | |
163 | ||
164 | 1 | BackgroundTask.prototype.addTask = function(msg, callback){ |
165 | 20 | var that = this |
166 | , id = message.makeId() | |
167 | , cb, timeoutId, timedoutCb, msgToSend, tmpErr, startTheTask; | |
168 | ||
169 | 20 | if (!this.isAvailable){ |
170 | 1 | callback(id, new Error("Attempt to use invalid BackgroundTask")); |
171 | 1 | return; |
172 | } | |
173 | ||
174 | ||
175 | 19 | startTheTask = function(){ |
176 | 16 | that.taskLimit.startTask(msg, function(tasks){ |
177 | 16 | var err, responseCb; |
178 | 16 | if (tasks instanceof Error){ |
179 | 1 | err = new Error('Too many tasks'); |
180 | 1 | that.emit('TASK_ERROR', err); |
181 | 1 | callback(id, err); |
182 | } | |
183 | ||
184 | 16 | callbacks[id] = callback; |
185 | ||
186 | 16 | responseCb = function(resp){ |
187 | 7 | var uniqueIndex = id // Make this callback unique |
188 | , rply = extractResponse(resp); | |
189 | ||
190 | 7 | that.emit('TASK_DONE', rply.id, rply.details); |
191 | }; | |
192 | ||
193 | 16 | cb = function(reply){ |
194 | 8 | var origCallback |
195 | , tid = id | |
196 | , details = reply | |
197 | , rply, fallback = false; | |
198 | ||
199 | 8 | try { |
200 | 8 | rply = extractResponse(reply); |
201 | 8 | details = rply.details; |
202 | 8 | tid = rply.id; |
203 | } catch (e) { | |
204 | // The system had an error | |
205 | 0 | that.emit('TASK_ERROR', e); |
206 | } | |
207 | 8 | origCallback = callbacks[tid]; |
208 | ||
209 | 8 | that.taskLimit.stopTask(msg); |
210 | 8 | clearTimeout(timeoutId); |
211 | 8 | origCallback(tid, details); |
212 | 8 | delete callbacks[tid]; |
213 | }; | |
214 | ||
215 | 16 | timedoutCb = function(){ |
216 | 5 | var origCallback = callbacks[id]; |
217 | // replace the "orig" callback with an empty function | |
218 | // in case the request still completes in the future and | |
219 | // tries to call our callback. | |
220 | 5 | callbacks[id] = function(reply){}; |
221 | ||
222 | // Return an error | |
223 | 5 | that.taskLimit.stopTask(msg); |
224 | 5 | origCallback(id, makeTimeoutError()); |
225 | 5 | that.msgBus.removeListener('responseReady', responseCb); |
226 | }; | |
227 | ||
228 | 16 | msgToSend = { |
229 | taskId: id, | |
230 | taskDetails: msg | |
231 | }; | |
232 | ||
233 | 16 | timeoutId = setTimeout(timedoutCb, that.timeout); |
234 | 16 | that.msgBus.sendMessage(id, msgToSend, cb); |
235 | 16 | that.msgBus.once('responseReady',responseCb); |
236 | }); | |
237 | }; | |
238 | ||
239 | 19 | if (that.taskKey && msg[that.taskKey]){ |
240 | 18 | that.blacklist.blacklistStatus(msg, function(isBlacklisted, timeLeft, reason){ |
241 | 18 | var tmpErr; |
242 | 18 | if (isBlacklisted){ |
243 | 2 | tmpErr = new Error('Blacklisted'); |
244 | 2 | tmpErr.debugMessage = "Blocked, reason: " + reason + ", remaining time: " + timeLeft; |
245 | 2 | that.emit('TASK_ERROR', tmpErr); |
246 | 2 | callback(id, tmpErr); |
247 | } else { | |
248 | 16 | startTheTask(); |
249 | } | |
250 | }); | |
251 | } else { | |
252 | 1 | tmpErr = new Error('Missing taskKey'); |
253 | 1 | that.emit('TASK_ERROR', tmpErr); |
254 | 1 | callback(id, tmpErr); |
255 | } | |
256 | }; | |
257 | ||
258 | 1 | BackgroundTask.prototype.completeTask = function(taskId, status, msg){ |
259 | 23 | var that = this |
260 | , msgToSend, serializableError, o; | |
261 | ||
262 | ||
263 | 23 | if (!this.isAvailable){ |
264 | 0 | throw new Error("Attempt to use invalid BackgroundTask"); |
265 | } | |
266 | ||
267 | // We can't send Error's via JSON... | |
268 | 23 | if (msg instanceof Error){ |
269 | 1 | serializableError = { |
270 | isError: true, | |
271 | message: msg.message | |
272 | }; | |
273 | ||
274 | 1 | for (o in msg){ |
275 | 1 | if (msg.hasOwnProperty(o)){ |
276 | 1 | serializableError[o] = msg[o]; |
277 | } | |
278 | } | |
279 | ||
280 | 1 | msg = serializableError; |
281 | } | |
282 | ||
283 | ||
284 | 23 | msgToSend = { |
285 | taskId: taskId, | |
286 | taskDetails: msg | |
287 | }; | |
288 | ||
289 | 23 | if (!msg){ |
290 | 1 | throw new Error('Missing msgId, status or msg.'); |
291 | } | |
292 | ||
293 | 22 | that.msgBus.sendResponse(taskId, status, msgToSend); |
294 | }; | |
295 | ||
296 | 1 | BackgroundTask.prototype.reportBadTask = function(taskKey, reason, callback){ |
297 | 0 | var reasonToSend; |
298 | ||
299 | 0 | if (!this.isAvailable){ |
300 | 0 | callback("ERR", "Attempt to use invalid BackgroundTask"); |
301 | 0 | return; |
302 | } | |
303 | ||
304 | ||
305 | 0 | if (reason instanceof Error){ |
306 | 0 | reasonToSend = reason.message; |
307 | } | |
308 | ||
309 | 0 | this.blacklist.addFailure(taskKey, reason, callback); |
310 | ||
311 | }; | |
312 | ||
313 | ||
314 | 1 | makeTimeoutError = function(){ |
315 | 5 | return new Error('Task timed out'); |
316 | }; | |
317 | ||
318 | ||
319 | 1 | return function(options){ |
320 | 63 | return new BackgroundTask(options); |
321 | }; | |
322 | ||
323 | }()); |
Line | Hits | Source |
---|---|---|
1 | // Copyright 2012 Kinvey, Inc | |
2 | // | |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | // you may not use this file except in compliance with the License. | |
5 | // You may obtain a copy of the License at | |
6 | // | |
7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
8 | // | |
9 | // Unless required by applicable law or agreed to in writing, software | |
10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | // See the License for the specific language governing permissions and | |
13 | // limitations under the License. | |
14 | 1 | "use strict"; |
15 | ||
16 | 1 | var redis = require("redis") |
17 | , util = require('util') | |
18 | , EventEmitter = require('events').EventEmitter | |
19 | , wrapError; | |
20 | ||
21 | ||
22 | 1 | wrapError = function(emitter){ |
23 | 51 | return function(error){ |
24 | 1 | emitter.emit('error', error); |
25 | }; | |
26 | }; | |
27 | ||
28 | ||
29 | 1 | var Blacklist = exports.Blacklist = function(options){ |
30 | 51 | EventEmitter.call(this); |
31 | ||
32 | 51 | if (!options){ |
33 | 0 | throw new Error("I need a task key!"); |
34 | } | |
35 | ||
36 | 52 | if (options.host){ this.redisHost = options.host; } |
37 | 52 | if (options.port){ this.redisPort = options.port; } |
38 | 52 | if (options.password){ this.redisPassword = options.password; } |
39 | ||
40 | 51 | this.taskKey = options.taskKey; |
41 | ||
42 | 51 | this.failureInterval = options.failureInterval || 1; |
43 | 51 | this.blacklistThreshold = options.blacklistThreshold || 10; |
44 | 51 | this.globalBlacklistTimeout = options.globalBlacklistTimeout || 3600; |
45 | ||
46 | 51 | this.logBlacklist = options.logBlacklist || false; // NB: This won't work if you want the default to be true |
47 | ||
48 | 51 | this.redisKeyPrefix = "blacklist:"; |
49 | 51 | this.globalBlacklistKeyPrefix = this.redisKeyPrefix + "globalBlacklist:"; |
50 | 51 | this.blacklistLogKeyPrefix = this.redisKeyPrefix + "logs:" |
51 | ||
52 | 51 | this.blacklistClient = redis.createClient(this.redisPort, this.redisHost); |
53 | ||
54 | 51 | this.blacklistClient.on('error', wrapError(this)); |
55 | ||
56 | 51 | if (options.password){ |
57 | 1 | this.blacklistClient.auth(options.password); |
58 | } | |
59 | ||
60 | }; | |
61 | ||
62 | // Inherit EventEmitter's methods | |
63 | 1 | util.inherits(Blacklist, EventEmitter); |
64 | ||
65 | 1 | Blacklist.prototype.blacklistStatus = function(task, callback){ |
66 | 25 | var taskKey = task && task[this.taskKey] |
67 | , redisKey | |
68 | , that = this; | |
69 | ||
70 | 25 | if (!callback){ |
71 | 0 | callback = function(){}; |
72 | } | |
73 | ||
74 | 25 | if (!taskKey){ |
75 | 0 | callback(false, "No task key, can't check blacklist."); |
76 | } else { | |
77 | 25 | redisKey = that.globalBlacklistKeyPrefix + taskKey; |
78 | 25 | that.blacklistClient.get(redisKey, function(error, reply){ |
79 | 25 | if (reply){ |
80 | // We're blacklisted | |
81 | 7 | that.blacklistClient.ttl(redisKey, function(error, timeRemaining){ |
82 | 7 | if (timeRemaining){ |
83 | 7 | callback(true, timeRemaining, reply); |
84 | } else { | |
85 | 0 | callback(true, -1, reply); |
86 | } | |
87 | }); | |
88 | } else { | |
89 | 18 | callback(false, -1, ""); |
90 | } | |
91 | }); | |
92 | } | |
93 | }; | |
94 | ||
95 | 1 | Blacklist.prototype.addFailure = function(taskKey, reason, callback){ |
96 | 63 | var errKey, countKey, that = this; |
97 | ||
98 | 63 | if (!callback){ |
99 | 0 | callback = function(){}; |
100 | } | |
101 | ||
102 | 63 | if (!reason){ |
103 | 1 | callback(new Error("Must supply a reason for the failure")); |
104 | 1 | return; |
105 | } | |
106 | ||
107 | 62 | if (!taskKey){ |
108 | 0 | callback(new Error("Invalid task, not running.")); |
109 | } else { | |
110 | 62 | countKey = that.redisKeyPrefix + taskKey + ":count"; |
111 | ||
112 | 62 | that.blacklistClient.get(countKey, function(error, reply){ |
113 | 62 | var blacklistKey; |
114 | ||
115 | // Count not in redis | |
116 | 62 | if (!reply){ |
117 | 14 | that.blacklistClient.setex(countKey, that.failureInterval, "1", function(e, r){ |
118 | 14 | if (!error){ |
119 | 14 | callback("OK"); |
120 | } else { | |
121 | 0 | callback(error); |
122 | } | |
123 | }); | |
124 | } else { | |
125 | 48 | if (reply >= that.blacklistThreshold){ |
126 | // Blacklist | |
127 | 10 | blacklistKey = that.globalBlacklistKeyPrefix + taskKey; |
128 | 10 | that.blacklistClient.setex(blacklistKey, that.globalBlacklistTimeout, reason, function(e, r){ |
129 | 10 | var logKey, d; |
130 | ||
131 | 10 | if (!error){ |
132 | 10 | if (that.logBlacklist){ |
133 | 5 | d = new Date(); |
134 | 5 | logKey = that.blacklistLogKeyPrefix + taskKey; |
135 | // Fire and forget | |
136 | 5 | that.blacklistClient.rpush(logKey, d + "|" + reason); |
137 | } | |
138 | 10 | callback("Blacklisted"); |
139 | } else { | |
140 | 0 | callback(error); |
141 | } | |
142 | ||
143 | }); | |
144 | } else { | |
145 | 38 | that.blacklistClient.incr(countKey, function(error, reply){ |
146 | 38 | if (!error){ |
147 | 38 | callback("OK"); |
148 | } else { | |
149 | 0 | callback(error); |
150 | } | |
151 | }); | |
152 | } | |
153 | } | |
154 | }); | |
155 | } | |
156 | }; | |
157 | ||
158 | 1 | Blacklist.prototype.shutdown = function(){ |
159 | 32 | this.blacklistClient.end(); |
160 | }; | |
161 | ||
162 |
Line | Hits | Source |
---|---|---|
1 | // Copyright 2012 Kinvey, Inc | |
2 | // | |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | // you may not use this file except in compliance with the License. | |
5 | // You may obtain a copy of the License at | |
6 | // | |
7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
8 | // | |
9 | // Unless required by applicable law or agreed to in writing, software | |
10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | // See the License for the specific language governing permissions and | |
13 | // limitations under the License. | |
14 | ||
15 | ||
16 | 1 | "use strict"; |
17 | ||
18 | 1 | var uuid = require('node-uuid') |
19 | , util = require('util') | |
20 | , redis = require('redis') | |
21 | , EventEmitter = require('events').EventEmitter | |
22 | , connect | |
23 | , MessageBus | |
24 | , makeId | |
25 | , wrapError | |
26 | , initResponder | |
27 | , initCreator | |
28 | , authClient | |
29 | , completeTransaction | |
30 | , KILOBYTES = 1024 | |
31 | , MEGABYTES = 1024 * 1024 | |
32 | , log; | |
33 | ||
34 | ||
35 | 1 | log = function(msg){ |
36 | 0 | var d = new Date() |
37 | , t = d.toISOString(); | |
38 | 0 | util.debug(t + ": " + msg); |
39 | }; | |
40 | ||
41 | 1 | wrapError = function(that, evt){ |
42 | 1 | that.emit('error', evt); |
43 | }; | |
44 | ||
45 | 1 | exports.makeId = makeId = function(){ |
46 | 10258 | return uuid().replace(/-/g, ""); |
47 | }; | |
48 | ||
49 | 1 | initCreator = function(that){ |
50 | 57 | that.subClient.subscribe(that.listenChannel); |
51 | 57 | that.subClient.on('message', function(channel, message){ |
52 | 14 | completeTransaction(that, channel, message); |
53 | }); | |
54 | }; | |
55 | ||
56 | ||
57 | 1 | initResponder = function(that){ |
58 | 56 | var util = require('util'); |
59 | 56 | that.subClient.subscribe(that.broadcastChannel); |
60 | 56 | that.subClient.on('message', function(channel, message){ |
61 | 28 | var msgBody, id; |
62 | 28 | if (channel !== that.broadcastChannel){ |
63 | // This shouldn't happen, it would mean | |
64 | // that we've accidentally subscribed to | |
65 | // an extra redis channel... | |
66 | // If for some crazy reason we get an incorrect | |
67 | // channel we should ignore the message | |
68 | 0 | return; |
69 | } | |
70 | 28 | id = message; |
71 | 28 | that.emit('data_available', id); |
72 | }); | |
73 | }; | |
74 | ||
75 | 1 | authClient = function(password, client, debug){ |
76 | 339 | if (password){ |
77 | 3 | client.auth(password, function(res){ |
78 | 0 | if (debug){ |
79 | 0 | console.log("Auth'd redis messaging started with " + res); |
80 | } | |
81 | }); | |
82 | } | |
83 | }; | |
84 | ||
85 | 1 | exports.MessageBus = MessageBus = function(options){ |
86 | 113 | var that; |
87 | ||
88 | 113 | if (!options){ |
89 | 24 | options = {}; |
90 | } | |
91 | ||
92 | 113 | EventEmitter.call(this); |
93 | 113 | this.redisHost = null; |
94 | 113 | this.redisPort = null; |
95 | 113 | this.redisPassword = null; |
96 | 113 | this.debug_is_enabled = false; |
97 | 113 | this.idToChannelMap = {}; |
98 | 113 | this.callbackMap = {}; |
99 | 113 | this.listenChannel = "msgChannels:" + makeId(); |
100 | 113 | this.broadcastChannel = (options && options.broadcast) || "msgChannels:broadcast"; |
101 | 113 | this.dataHash = (options && options.dataHash) || "msgTable:normal"; |
102 | 113 | this.responseHash = (options && options.outputHash) || "responseTable:normal"; |
103 | 113 | this.shutdownFlag = false; |
104 | 113 | this.id = makeId(); // For debugging / logging |
105 | ||
106 | 117 | if (options && options.host){ this.redisHost = options.host; } |
107 | 115 | if (options && options.port){ this.redisPort = options.port; } |
108 | 114 | if (options && options.password){ this.redisPassword = options.password; } |
109 | ||
110 | 113 | this.pubClient = redis.createClient(this.redisPort, this.redisHost); |
111 | 113 | this.subClient = redis.createClient(this.redisPort, this.redisHost); |
112 | 113 | this.dataClient = redis.createClient(this.redisPort, this.redisHost); |
113 | ||
114 | 113 | authClient(this.redisPassword, this.dataClient, this.debug_is_enabled); |
115 | 113 | authClient(this.redisPassword, this.pubClient, this.debug_is_enabled); |
116 | 113 | authClient(this.redisPassword, this.subClient, this.debug_is_enabled); |
117 | ||
118 | 113 | if (options && options.isResponder){ |
119 | 56 | initResponder(this); |
120 | } else { | |
121 | 57 | initCreator(this); |
122 | } | |
123 | ||
124 | 113 | that = this; |
125 | 113 | this.dataClient.on('error', function(evt){wrapError(that, evt);}); |
126 | 114 | this.pubClient.on('error', function(evt){wrapError(that, evt);}); |
127 | 113 | this.subClient.on('error', function(evt){wrapError(that, evt);}); |
128 | }; | |
129 | ||
130 | 1 | util.inherits(MessageBus, EventEmitter); |
131 | ||
132 | 1 | completeTransaction = function(that, ch, msg){ |
133 | 14 | var response, id, status, parsedMsg, daFaq, callback, multi; |
134 | ||
135 | 14 | if (ch !== that.listenChannel){ |
136 | // This can NEVER happen (except for if there's an error) | |
137 | // in our code. So the check is left in, but | |
138 | // should never be hit. | |
139 | 0 | that.emit('error', new Error("Got message for some other channel (Expected: " + |
140 | that.listenChannel + ", Actual: " + ch + ")")); | |
141 | // Bail out! | |
142 | 0 | return; |
143 | } | |
144 | ||
145 | 14 | parsedMsg = msg.split(' '); |
146 | 14 | if (parsedMsg.length < 2){ |
147 | 1 | that.emit('error', new Error("Invalid message received!")); |
148 | // Bail out | |
149 | 1 | return; |
150 | } | |
151 | ||
152 | 13 | id = parsedMsg[0]; |
153 | 13 | status = parsedMsg[1]; |
154 | ||
155 | 13 | callback = that.callbackMap[id]; |
156 | ||
157 | 13 | multi = that.dataClient.multi(); |
158 | ||
159 | 13 | multi.hget(that.responseHash, id) |
160 | .hdel(that.responseHash, id) | |
161 | .exec(function(err, reply){ | |
162 | 12 | var myErr = null, respondedErr, o; |
163 | 12 | if (err){ |
164 | 0 | myErr = new Error("REDIS Error: " + err); |
165 | } | |
166 | ||
167 | 12 | if (!util.isArray(reply) && !myErr){ |
168 | 0 | myErr = new Error("Internal REDIS error (" + err + ", " + reply + ")"); |
169 | 12 | } else if (util.isArray(reply)){ |
170 | // Reply[0] => hget, Reply[1] => hdel | |
171 | 12 | reply = reply[0]; |
172 | } | |
173 | ||
174 | 12 | if (reply === null && !myErr){ |
175 | 1 | myErr = new Error("No message for id " + id); |
176 | } | |
177 | ||
178 | ||
179 | 12 | if (!myErr){ |
180 | 11 | try { |
181 | 11 | response = JSON.parse(reply); |
182 | } catch(e){ | |
183 | 1 | myErr = new Error("JSON parsing failed! " + e); |
184 | } | |
185 | } | |
186 | ||
187 | 12 | if (myErr){ |
188 | 2 | response = myErr; |
189 | 2 | that.emit('error', myErr); |
190 | 2 | return; |
191 | } | |
192 | ||
193 | 10 | if (status !== 'SUCCESS'){ |
194 | // Build the error | |
195 | 1 | that.emit('error', response); |
196 | ||
197 | // If we're hard failed we stop working here, | |
198 | // no success | |
199 | 1 | if (status === "FAILED"){ |
200 | 1 | if (callback){ |
201 | 1 | callback(response); |
202 | 1 | delete that.callbackMap[id]; |
203 | } | |
204 | 1 | return; |
205 | } | |
206 | } | |
207 | ||
208 | ||
209 | 9 | that.emit('responseReady', response); |
210 | ||
211 | 9 | if (callback){ |
212 | 9 | callback(response); |
213 | 9 | delete that.callbackMap[id]; |
214 | } | |
215 | }); | |
216 | ||
217 | }; | |
218 | ||
219 | 1 | MessageBus.prototype.sendMessage = function(id, msg, callback){ |
220 | 21 | var that = this |
221 | , err | |
222 | , msgString; | |
223 | ||
224 | 21 | if (this.shutdownFlag){ |
225 | 1 | callback(new Error("Attempt to use shutdown MessageBus.")); |
226 | 1 | return; |
227 | } | |
228 | ||
229 | 20 | msg._listenChannel = this.listenChannel; |
230 | 20 | msg._messageId = id; // Store the message id in-band |
231 | ||
232 | 20 | msgString = JSON.stringify(msg); |
233 | ||
234 | 20 | if (!msgString){ |
235 | 1 | callback(new Error("Error converting message to JSON.")); |
236 | 1 | return; |
237 | } | |
238 | ||
239 | // TODO: This needs to be an option for the class | |
240 | 19 | if (msgString.length > MEGABYTES){ |
241 | 1 | callback(new Error("Payload too large!")); |
242 | 1 | return; |
243 | } | |
244 | ||
245 | 18 | this.dataClient.hset(this.dataHash, id, msgString, function(err, reply){ |
246 | 17 | if (err){ |
247 | 0 | callback(new Error("Error sending message: " + err)); |
248 | 0 | return; |
249 | } | |
250 | 17 | that.pubClient.publish(that.broadcastChannel, id); |
251 | }); | |
252 | 18 | this.callbackMap[id] = callback; |
253 | }; | |
254 | ||
255 | 1 | MessageBus.prototype.acceptMessage = function(mid, callback){ |
256 | 23 | var that = this |
257 | , multi | |
258 | , derivedId; | |
259 | ||
260 | 23 | if (!mid){ |
261 | 2 | throw new Error('Missing Message ID.'); |
262 | } | |
263 | ||
264 | 21 | if (!callback){ |
265 | 1 | throw new Error('Invalid callback.'); |
266 | } | |
267 | ||
268 | 20 | if (callback.length < 1){ |
269 | 1 | throw new Error('Missing parameters in callback.'); |
270 | } | |
271 | ||
272 | 19 | multi = that.dataClient.multi(); |
273 | ||
274 | // Pipeline! | |
275 | 19 | multi |
276 | .hget(that.dataHash, mid) | |
277 | .hdel(that.dataHash, mid) | |
278 | .exec(function(err, reply){ | |
279 | 19 | var listenChannel, id, dataErr, msgBody; |
280 | ||
281 | // First make sure that everything is probably ok | |
282 | 19 | if (!util.isArray(reply)){ |
283 | 0 | dataErr = new Error("Internal REDIS error (" + err + ", " + reply + ")"); |
284 | 0 | callback(dataErr); |
285 | 0 | return; |
286 | } else { | |
287 | // reply is good, let's just get the first element | |
288 | // as the indeices should be hget,hdel | |
289 | 19 | reply = reply[0]; |
290 | } | |
291 | ||
292 | 19 | try { |
293 | 19 | msgBody = JSON.parse(reply); |
294 | } catch(e) { | |
295 | 1 | dataErr = new Error("Bad data in sent message, " + e); |
296 | 1 | callback(dataErr); |
297 | 1 | return; |
298 | } | |
299 | ||
300 | 18 | if (!msgBody){ |
301 | 3 | dataErr = new Error("DB doesn't recognize message"); |
302 | 3 | callback(dataErr); |
303 | 3 | return; |
304 | } | |
305 | ||
306 | 15 | derivedId = id = msgBody._messageId; |
307 | 15 | listenChannel = msgBody._listenChannel; |
308 | ||
309 | 15 | if (id !== mid){ |
310 | 0 | console.log("ERROR: Mis-match on ids! (" + id + " does not equal " + mid + ")"); |
311 | } | |
312 | ||
313 | 15 | that.idToChannelMap[id] = listenChannel; |
314 | ||
315 | ||
316 | 15 | delete msgBody._listenChannel; |
317 | 15 | delete msgBody._messageId; |
318 | ||
319 | 15 | callback(msgBody); |
320 | ||
321 | }); | |
322 | }; | |
323 | ||
324 | 1 | MessageBus.prototype.sendResponse = function(msgId, status, msg){ |
325 | 37 | if (!msgId || !status || !msg){ |
326 | 5 | throw new Error("Missing msgId, status or msg."); |
327 | } | |
328 | ||
329 | 32 | if (status !== 'SUCCESS' && |
330 | status !== 'ERROR' && | |
331 | status !== 'FAILED') | |
332 | { | |
333 | 12 | throw new Error(status + ' is not a valid status.'); |
334 | } | |
335 | ||
336 | 20 | var listenChannel = this.idToChannelMap[msgId] |
337 | , that = this, serializableError, o, tmpErr, testMode; | |
338 | ||
339 | 20 | if (arguments.length === 4){ |
340 | 3 | testMode = arguments[3]; |
341 | } | |
342 | ||
343 | 20 | if (!listenChannel && !testMode){ |
344 | // We have no record of this request | |
345 | // probably a callback being called twice, but | |
346 | // still need to throw | |
347 | 1 | tmpErr = new Error('Attempt to respond to message that was never accepted'); |
348 | 1 | tmpErr.debugMessage = msgId + " is not registered as having been accepted"; |
349 | 1 | throw tmpErr; |
350 | } | |
351 | ||
352 | 19 | this.dataClient.hset(this.responseHash, msgId, JSON.stringify(msg), function(err, reply){ |
353 | 16 | delete that.idToChannelMap[msgId]; |
354 | ||
355 | 16 | if (err){ |
356 | 0 | throw new Error(err); |
357 | } | |
358 | 16 | that.pubClient.publish(listenChannel, msgId + " " + status); |
359 | }); | |
360 | }; | |
361 | ||
362 | ||
363 | 1 | MessageBus.prototype.shutdown = function(){ |
364 | 106 | this.subClient.removeAllListeners('message'); |
365 | 106 | this.dataClient.removeAllListeners('error'); |
366 | 106 | this.pubClient.removeAllListeners('error'); |
367 | 106 | this.subClient.removeAllListeners('error'); |
368 | 106 | this.dataClient.end(); |
369 | 106 | this.pubClient.end(); |
370 | 106 | this.subClient.end(); |
371 | 106 | this.removeAllListeners(); |
372 | 106 | this.shutdownFlag = true; |
373 | }; | |
374 | ||
375 | 1 | exports.connect = connect = function(options){ |
376 | 113 | return new MessageBus(options); |
377 | }; |
Line | Hits | Source |
---|---|---|
1 | // Copyright 2012 Kinvey, Inc | |
2 | // | |
3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | // you may not use this file except in compliance with the License. | |
5 | // You may obtain a copy of the License at | |
6 | // | |
7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
8 | // | |
9 | // Unless required by applicable law or agreed to in writing, software | |
10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | // See the License for the specific language governing permissions and | |
13 | // limitations under the License. | |
14 | 1 | "use strict"; |
15 | ||
16 | 1 | var redis = require("redis") |
17 | , util = require('util') | |
18 | , EventEmitter = require('events').EventEmitter | |
19 | , wrapError; | |
20 | ||
21 | ||
22 | 1 | wrapError = function(emitter){ |
23 | 36 | return function(error){ |
24 | 1 | emitter.emit('error', error); |
25 | }; | |
26 | }; | |
27 | ||
28 | ||
29 | 1 | var TaskLimit = exports.TaskLimit = function(options){ |
30 | 37 | EventEmitter.call(this); |
31 | ||
32 | 37 | if (!options){ |
33 | 1 | throw new Error("I need a task key!"); |
34 | } | |
35 | ||
36 | 37 | if (options.host){ this.redisHost = options.host; } |
37 | 37 | if (options.port){ this.redisPort = options.port; } |
38 | 37 | if (options.password){ this.redisPassword = options.password; } |
39 | ||
40 | 36 | this.taskKey = options.taskKey; |
41 | 36 | this.redisKeyPrefix = "taskKey:"; |
42 | 36 | this.maxTasksPerKey = options.maxTasksPerKey||10; |
43 | 36 | this.taskLimitClient = redis.createClient(this.redisPort, this.redisHost); |
44 | ||
45 | 36 | this.taskLimitClient.on('error', wrapError(this)); |
46 | ||
47 | 36 | if (options.password){ |
48 | 1 | this.taskLimitClient.auth(options.password); |
49 | } | |
50 | ||
51 | }; | |
52 | ||
53 | // Inherit EventEmitter's methods | |
54 | 1 | util.inherits(TaskLimit, EventEmitter); |
55 | ||
56 | 1 | TaskLimit.prototype.startTask = function(task, callback){ |
57 | 40 | var value = task && task[this.taskKey] |
58 | , redisKey, that = this; | |
59 | ||
60 | 40 | if (!callback){ |
61 | 8 | callback = function(){}; |
62 | } | |
63 | ||
64 | 40 | if (!value){ |
65 | 4 | callback(new Error("Invalid task, not running.")); |
66 | } else { | |
67 | 36 | redisKey = that.redisKeyPrefix + value; |
68 | 36 | that.taskLimitClient.llen(redisKey, function(err, len){ |
69 | 36 | if (len >= that.maxTasksPerKey){ |
70 | 2 | callback(new Error("Too many tasks")); |
71 | } else { | |
72 | 34 | that.taskLimitClient.lpush(redisKey, Date(), function(x){ |
73 | 34 | callback(len+1); |
74 | }); | |
75 | } | |
76 | }); | |
77 | } | |
78 | }; | |
79 | ||
80 | 1 | TaskLimit.prototype.stopTask = function(task){ |
81 | 33 | var value = task && task[this.taskKey] |
82 | , redisKey, that = this; | |
83 | ||
84 | ||
85 | 33 | if (!value){ |
86 | 4 | throw new Error("Invalid task, can't stop."); |
87 | } else { | |
88 | 29 | redisKey = that.redisKeyPrefix + value; |
89 | 29 | that.taskLimitClient.lpop(redisKey); |
90 | } | |
91 | }; | |
92 | ||
93 | 1 | TaskLimit.prototype.shutdown = function(){ |
94 | 32 | this.taskLimitClient.end(); |
95 | }; |