> background-task@0.2.3 test /private/tmp/nbg-coverage.z5hJV4i > mocha
| Line | Hits | Source |
|---|---|---|
| 1 | // Copyright 2012 Kinvey, Inc | |
| 2 | // | |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 | // you may not use this file except in compliance with the License. | |
| 5 | // You may obtain a copy of the License at | |
| 6 | // | |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 | // | |
| 9 | // Unless required by applicable law or agreed to in writing, software | |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 | // See the License for the specific language governing permissions and | |
| 13 | // limitations under the License. | |
| 14 | ||
| 15 | 1 | "use strict"; |
| 16 | ||
| 17 | 1 | var redis = require("redis") |
| 18 | , EventEmitter = require('events').EventEmitter | |
| 19 | , util = require('util') | |
| 20 | , message = require('./messaging') | |
| 21 | , task_limit = require('./task_limit') | |
| 22 | , blacklist = require('./blacklist') | |
| 23 | , wrapError; | |
| 24 | ||
| 25 | ||
| 26 | // We should handle this... | |
| 27 | 1 | wrapError = function(emitter){ |
| 28 | 127 | return function(error){ |
| 29 | 1 | emitter.emit('error', error); |
| 30 | }; | |
| 31 | }; | |
| 32 | ||
| 33 | 1 | exports.connect = (function(){ |
| 34 | 1 | var callbacks = [] |
| 35 | , makeTimeoutError | |
| 36 | , BackgroundTask | |
| 37 | , extractResponse; | |
| 38 | ||
| 39 | 1 | BackgroundTask = function(options){ |
| 40 | 63 | EventEmitter.call(this); |
| 41 | ||
| 42 | 63 | if (!options){ |
| 43 | 1 | options = {}; |
| 44 | } | |
| 45 | ||
| 46 | 63 | if (options.isWorker){ |
| 47 | 31 | options.isResponder = true; |
| 48 | } | |
| 49 | ||
| 50 | 63 | if (options.taskKey){ |
| 51 | 32 | this.taskKey = options.taskKey; |
| 52 | 32 | if (!options.maxTasksPerKey){ |
| 53 | 4 | this.maxTasksPerKey = options.maxTasksPerKey = 5; |
| 54 | } | |
| 55 | ||
| 56 | 32 | this.taskLimit = new task_limit.TaskLimit(options); |
| 57 | 32 | this.taskLimit.on('error', wrapError(this)); |
| 58 | ||
| 59 | 32 | this.blacklist = new blacklist.Blacklist(options); |
| 60 | 32 | this.blacklist.on('error', wrapError(this)); |
| 61 | ||
| 62 | } | |
| 63 | ||
| 64 | 63 | this.msgBus = new message.connect(options); |
| 65 | 63 | this.timeout = 5000; // 5 second defualt timeout |
| 66 | ||
| 67 | 63 | this.msgBus.on('error', wrapError(this)); |
| 68 | ||
| 69 | 63 | if (options.task){ |
| 70 | 1 | options.broadcast = options.task + "Broadcast"; |
| 71 | 1 | options.dataHash = options.task + "Table"; |
| 72 | 1 | options.outputHash = options.task + "Hash"; |
| 73 | } | |
| 74 | ||
| 75 | 63 | if (options && options.timeout){ |
| 76 | 2 | this.timeout = options.timeout; |
| 77 | } | |
| 78 | ||
| 79 | ||
| 80 | 63 | if (options.isWorker){ |
| 81 | 31 | var that = this; |
| 82 | 31 | this.msgBus.on('data_available', function(id){ |
| 83 | 16 | that.emit('TASK_AVAILABLE', id); |
| 84 | }); | |
| 85 | } | |
| 86 | ||
| 87 | ||
| 88 | // Simple way to ensure we're not shut down | |
| 89 | 63 | this.isAvailable = true; |
| 90 | }; | |
| 91 | ||
| 92 | // Inherit EventEmitter's methods | |
| 93 | 1 | util.inherits(BackgroundTask, EventEmitter); |
| 94 | ||
| 95 | 1 | BackgroundTask.prototype.end = function(){ |
| 96 | 63 | if (!this.isAvailable) return; // Nothing to do |
| 97 | ||
| 98 | 63 | this.isAvailable = false; |
| 99 | // Hard end, don't worry about shutting down | |
| 100 | // gracefully here... | |
| 101 | 95 | if (this.blacklist) this.blacklist.shutdown(); |
| 102 | 95 | if (this.taskLimit) this.taskLimit.shutdown(); |
| 103 | 63 | this.msgBus.shutdown(); |
| 104 | 63 | this.removeAllListeners(); |
| 105 | }; | |
| 106 | ||
| 107 | 1 | BackgroundTask.prototype.acceptTask = function(id, callback){ |
| 108 | 18 | var newCallback; |
| 109 | ||
| 110 | 18 | if (!this.isAvailable){ |
| 111 | 0 | callback(new Error("Attempt to use invalid BackgroundTask")); |
| 112 | 0 | return; |
| 113 | } | |
| 114 | ||
| 115 | 18 | if (!id || id.length === 0){ |
| 116 | 3 | throw new Error('Missing Task ID.'); |
| 117 | } | |
| 118 | ||
| 119 | 15 | if (!callback || callback.length < 1){ |
| 120 | 2 | throw new Error('Invalid callback specified'); |
| 121 | } | |
| 122 | ||
| 123 | 13 | newCallback = function(reply){ |
| 124 | 13 | if (reply instanceof Error){ |
| 125 | 1 | if (reply.message === "DB doesn't recognize message"){ |
| 126 | 1 | reply = new Error('Task not in database, do not accept'); |
| 127 | } | |
| 128 | } | |
| 129 | ||
| 130 | 13 | if (reply.taskDetails){ |
| 131 | 12 | callback(reply.taskDetails); |
| 132 | } else { | |
| 133 | 1 | callback(reply); |
| 134 | } | |
| 135 | }; | |
| 136 | 13 | this.msgBus.acceptMessage(id, newCallback); |
| 137 | }; | |
| 138 | ||
| 139 | 1 | extractResponse = function(r){ |
| 140 | 15 | var response, id, respondedErr, o; |
| 141 | ||
| 142 | 15 | if (!r.taskId && !r.taskDetails){ |
| 143 | 0 | throw new Error("Incomplete task response."); |
| 144 | } | |
| 145 | ||
| 146 | 15 | id = r.taskId; |
| 147 | 15 | response = r.taskDetails; |
| 148 | ||
| 149 | 15 | if (response.isError){ |
| 150 | 1 | respondedErr = new Error(response.message); |
| 151 | 1 | for (o in response){ |
| 152 | 3 | if (response.hasOwnProperty(o) && |
| 153 | o !== 'isError' && | |
| 154 | o !== 'message'){ | |
| 155 | 1 | respondedErr[o] = response[o]; |
| 156 | } | |
| 157 | } | |
| 158 | 1 | response = respondedErr; |
| 159 | } | |
| 160 | ||
| 161 | 15 | return {id: id, details: response}; |
| 162 | }; | |
| 163 | ||
| 164 | 1 | BackgroundTask.prototype.addTask = function(msg, callback){ |
| 165 | 20 | var that = this |
| 166 | , id = message.makeId() | |
| 167 | , cb, timeoutId, timedoutCb, msgToSend, tmpErr, startTheTask; | |
| 168 | ||
| 169 | 20 | if (!this.isAvailable){ |
| 170 | 1 | callback(id, new Error("Attempt to use invalid BackgroundTask")); |
| 171 | 1 | return; |
| 172 | } | |
| 173 | ||
| 174 | ||
| 175 | 19 | startTheTask = function(){ |
| 176 | 16 | that.taskLimit.startTask(msg, function(tasks){ |
| 177 | 16 | var err, responseCb; |
| 178 | 16 | if (tasks instanceof Error){ |
| 179 | 1 | err = new Error('Too many tasks'); |
| 180 | 1 | that.emit('TASK_ERROR', err); |
| 181 | 1 | callback(id, err); |
| 182 | } | |
| 183 | ||
| 184 | 16 | callbacks[id] = callback; |
| 185 | ||
| 186 | 16 | responseCb = function(resp){ |
| 187 | 7 | var uniqueIndex = id // Make this callback unique |
| 188 | , rply = extractResponse(resp); | |
| 189 | ||
| 190 | 7 | that.emit('TASK_DONE', rply.id, rply.details); |
| 191 | }; | |
| 192 | ||
| 193 | 16 | cb = function(reply){ |
| 194 | 8 | var origCallback |
| 195 | , tid = id | |
| 196 | , details = reply | |
| 197 | , rply, fallback = false; | |
| 198 | ||
| 199 | 8 | try { |
| 200 | 8 | rply = extractResponse(reply); |
| 201 | 8 | details = rply.details; |
| 202 | 8 | tid = rply.id; |
| 203 | } catch (e) { | |
| 204 | // The system had an error | |
| 205 | 0 | that.emit('TASK_ERROR', e); |
| 206 | } | |
| 207 | 8 | origCallback = callbacks[tid]; |
| 208 | ||
| 209 | 8 | that.taskLimit.stopTask(msg); |
| 210 | 8 | clearTimeout(timeoutId); |
| 211 | 8 | origCallback(tid, details); |
| 212 | 8 | delete callbacks[tid]; |
| 213 | }; | |
| 214 | ||
| 215 | 16 | timedoutCb = function(){ |
| 216 | 5 | var origCallback = callbacks[id]; |
| 217 | // replace the "orig" callback with an empty function | |
| 218 | // in case the request still completes in the future and | |
| 219 | // tries to call our callback. | |
| 220 | 5 | callbacks[id] = function(reply){}; |
| 221 | ||
| 222 | // Return an error | |
| 223 | 5 | that.taskLimit.stopTask(msg); |
| 224 | 5 | origCallback(id, makeTimeoutError()); |
| 225 | 5 | that.msgBus.removeListener('responseReady', responseCb); |
| 226 | }; | |
| 227 | ||
| 228 | 16 | msgToSend = { |
| 229 | taskId: id, | |
| 230 | taskDetails: msg | |
| 231 | }; | |
| 232 | ||
| 233 | 16 | timeoutId = setTimeout(timedoutCb, that.timeout); |
| 234 | 16 | that.msgBus.sendMessage(id, msgToSend, cb); |
| 235 | 16 | that.msgBus.once('responseReady',responseCb); |
| 236 | }); | |
| 237 | }; | |
| 238 | ||
| 239 | 19 | if (that.taskKey && msg[that.taskKey]){ |
| 240 | 18 | that.blacklist.blacklistStatus(msg, function(isBlacklisted, timeLeft, reason){ |
| 241 | 18 | var tmpErr; |
| 242 | 18 | if (isBlacklisted){ |
| 243 | 2 | tmpErr = new Error('Blacklisted'); |
| 244 | 2 | tmpErr.debugMessage = "Blocked, reason: " + reason + ", remaining time: " + timeLeft; |
| 245 | 2 | that.emit('TASK_ERROR', tmpErr); |
| 246 | 2 | callback(id, tmpErr); |
| 247 | } else { | |
| 248 | 16 | startTheTask(); |
| 249 | } | |
| 250 | }); | |
| 251 | } else { | |
| 252 | 1 | tmpErr = new Error('Missing taskKey'); |
| 253 | 1 | that.emit('TASK_ERROR', tmpErr); |
| 254 | 1 | callback(id, tmpErr); |
| 255 | } | |
| 256 | }; | |
| 257 | ||
| 258 | 1 | BackgroundTask.prototype.completeTask = function(taskId, status, msg){ |
| 259 | 23 | var that = this |
| 260 | , msgToSend, serializableError, o; | |
| 261 | ||
| 262 | ||
| 263 | 23 | if (!this.isAvailable){ |
| 264 | 0 | throw new Error("Attempt to use invalid BackgroundTask"); |
| 265 | } | |
| 266 | ||
| 267 | // We can't send Error's via JSON... | |
| 268 | 23 | if (msg instanceof Error){ |
| 269 | 1 | serializableError = { |
| 270 | isError: true, | |
| 271 | message: msg.message | |
| 272 | }; | |
| 273 | ||
| 274 | 1 | for (o in msg){ |
| 275 | 1 | if (msg.hasOwnProperty(o)){ |
| 276 | 1 | serializableError[o] = msg[o]; |
| 277 | } | |
| 278 | } | |
| 279 | ||
| 280 | 1 | msg = serializableError; |
| 281 | } | |
| 282 | ||
| 283 | ||
| 284 | 23 | msgToSend = { |
| 285 | taskId: taskId, | |
| 286 | taskDetails: msg | |
| 287 | }; | |
| 288 | ||
| 289 | 23 | if (!msg){ |
| 290 | 1 | throw new Error('Missing msgId, status or msg.'); |
| 291 | } | |
| 292 | ||
| 293 | 22 | that.msgBus.sendResponse(taskId, status, msgToSend); |
| 294 | }; | |
| 295 | ||
| 296 | 1 | BackgroundTask.prototype.reportBadTask = function(taskKey, reason, callback){ |
| 297 | 0 | var reasonToSend; |
| 298 | ||
| 299 | 0 | if (!this.isAvailable){ |
| 300 | 0 | callback("ERR", "Attempt to use invalid BackgroundTask"); |
| 301 | 0 | return; |
| 302 | } | |
| 303 | ||
| 304 | ||
| 305 | 0 | if (reason instanceof Error){ |
| 306 | 0 | reasonToSend = reason.message; |
| 307 | } | |
| 308 | ||
| 309 | 0 | this.blacklist.addFailure(taskKey, reason, callback); |
| 310 | ||
| 311 | }; | |
| 312 | ||
| 313 | ||
| 314 | 1 | makeTimeoutError = function(){ |
| 315 | 5 | return new Error('Task timed out'); |
| 316 | }; | |
| 317 | ||
| 318 | ||
| 319 | 1 | return function(options){ |
| 320 | 63 | return new BackgroundTask(options); |
| 321 | }; | |
| 322 | ||
| 323 | }()); |
| Line | Hits | Source |
|---|---|---|
| 1 | // Copyright 2012 Kinvey, Inc | |
| 2 | // | |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 | // you may not use this file except in compliance with the License. | |
| 5 | // You may obtain a copy of the License at | |
| 6 | // | |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 | // | |
| 9 | // Unless required by applicable law or agreed to in writing, software | |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 | // See the License for the specific language governing permissions and | |
| 13 | // limitations under the License. | |
| 14 | 1 | "use strict"; |
| 15 | ||
| 16 | 1 | var redis = require("redis") |
| 17 | , util = require('util') | |
| 18 | , EventEmitter = require('events').EventEmitter | |
| 19 | , wrapError; | |
| 20 | ||
| 21 | ||
| 22 | 1 | wrapError = function(emitter){ |
| 23 | 51 | return function(error){ |
| 24 | 1 | emitter.emit('error', error); |
| 25 | }; | |
| 26 | }; | |
| 27 | ||
| 28 | ||
| 29 | 1 | var Blacklist = exports.Blacklist = function(options){ |
| 30 | 51 | EventEmitter.call(this); |
| 31 | ||
| 32 | 51 | if (!options){ |
| 33 | 0 | throw new Error("I need a task key!"); |
| 34 | } | |
| 35 | ||
| 36 | 52 | if (options.host){ this.redisHost = options.host; } |
| 37 | 52 | if (options.port){ this.redisPort = options.port; } |
| 38 | 52 | if (options.password){ this.redisPassword = options.password; } |
| 39 | ||
| 40 | 51 | this.taskKey = options.taskKey; |
| 41 | ||
| 42 | 51 | this.failureInterval = options.failureInterval || 1; |
| 43 | 51 | this.blacklistThreshold = options.blacklistThreshold || 10; |
| 44 | 51 | this.globalBlacklistTimeout = options.globalBlacklistTimeout || 3600; |
| 45 | ||
| 46 | 51 | this.logBlacklist = options.logBlacklist || false; // NB: This won't work if you want the default to be true |
| 47 | ||
| 48 | 51 | this.redisKeyPrefix = "blacklist:"; |
| 49 | 51 | this.globalBlacklistKeyPrefix = this.redisKeyPrefix + "globalBlacklist:"; |
| 50 | 51 | this.blacklistLogKeyPrefix = this.redisKeyPrefix + "logs:" |
| 51 | ||
| 52 | 51 | this.blacklistClient = redis.createClient(this.redisPort, this.redisHost); |
| 53 | ||
| 54 | 51 | this.blacklistClient.on('error', wrapError(this)); |
| 55 | ||
| 56 | 51 | if (options.password){ |
| 57 | 1 | this.blacklistClient.auth(options.password); |
| 58 | } | |
| 59 | ||
| 60 | }; | |
| 61 | ||
| 62 | // Inherit EventEmitter's methods | |
| 63 | 1 | util.inherits(Blacklist, EventEmitter); |
| 64 | ||
| 65 | 1 | Blacklist.prototype.blacklistStatus = function(task, callback){ |
| 66 | 25 | var taskKey = task && task[this.taskKey] |
| 67 | , redisKey | |
| 68 | , that = this; | |
| 69 | ||
| 70 | 25 | if (!callback){ |
| 71 | 0 | callback = function(){}; |
| 72 | } | |
| 73 | ||
| 74 | 25 | if (!taskKey){ |
| 75 | 0 | callback(false, "No task key, can't check blacklist."); |
| 76 | } else { | |
| 77 | 25 | redisKey = that.globalBlacklistKeyPrefix + taskKey; |
| 78 | 25 | that.blacklistClient.get(redisKey, function(error, reply){ |
| 79 | 25 | if (reply){ |
| 80 | // We're blacklisted | |
| 81 | 7 | that.blacklistClient.ttl(redisKey, function(error, timeRemaining){ |
| 82 | 7 | if (timeRemaining){ |
| 83 | 7 | callback(true, timeRemaining, reply); |
| 84 | } else { | |
| 85 | 0 | callback(true, -1, reply); |
| 86 | } | |
| 87 | }); | |
| 88 | } else { | |
| 89 | 18 | callback(false, -1, ""); |
| 90 | } | |
| 91 | }); | |
| 92 | } | |
| 93 | }; | |
| 94 | ||
| 95 | 1 | Blacklist.prototype.addFailure = function(taskKey, reason, callback){ |
| 96 | 63 | var errKey, countKey, that = this; |
| 97 | ||
| 98 | 63 | if (!callback){ |
| 99 | 0 | callback = function(){}; |
| 100 | } | |
| 101 | ||
| 102 | 63 | if (!reason){ |
| 103 | 1 | callback(new Error("Must supply a reason for the failure")); |
| 104 | 1 | return; |
| 105 | } | |
| 106 | ||
| 107 | 62 | if (!taskKey){ |
| 108 | 0 | callback(new Error("Invalid task, not running.")); |
| 109 | } else { | |
| 110 | 62 | countKey = that.redisKeyPrefix + taskKey + ":count"; |
| 111 | ||
| 112 | 62 | that.blacklistClient.get(countKey, function(error, reply){ |
| 113 | 62 | var blacklistKey; |
| 114 | ||
| 115 | // Count not in redis | |
| 116 | 62 | if (!reply){ |
| 117 | 14 | that.blacklistClient.setex(countKey, that.failureInterval, "1", function(e, r){ |
| 118 | 14 | if (!error){ |
| 119 | 14 | callback("OK"); |
| 120 | } else { | |
| 121 | 0 | callback(error); |
| 122 | } | |
| 123 | }); | |
| 124 | } else { | |
| 125 | 48 | if (reply >= that.blacklistThreshold){ |
| 126 | // Blacklist | |
| 127 | 10 | blacklistKey = that.globalBlacklistKeyPrefix + taskKey; |
| 128 | 10 | that.blacklistClient.setex(blacklistKey, that.globalBlacklistTimeout, reason, function(e, r){ |
| 129 | 10 | var logKey, d; |
| 130 | ||
| 131 | 10 | if (!error){ |
| 132 | 10 | if (that.logBlacklist){ |
| 133 | 5 | d = new Date(); |
| 134 | 5 | logKey = that.blacklistLogKeyPrefix + taskKey; |
| 135 | // Fire and forget | |
| 136 | 5 | that.blacklistClient.rpush(logKey, d + "|" + reason); |
| 137 | } | |
| 138 | 10 | callback("Blacklisted"); |
| 139 | } else { | |
| 140 | 0 | callback(error); |
| 141 | } | |
| 142 | ||
| 143 | }); | |
| 144 | } else { | |
| 145 | 38 | that.blacklistClient.incr(countKey, function(error, reply){ |
| 146 | 38 | if (!error){ |
| 147 | 38 | callback("OK"); |
| 148 | } else { | |
| 149 | 0 | callback(error); |
| 150 | } | |
| 151 | }); | |
| 152 | } | |
| 153 | } | |
| 154 | }); | |
| 155 | } | |
| 156 | }; | |
| 157 | ||
| 158 | 1 | Blacklist.prototype.shutdown = function(){ |
| 159 | 32 | this.blacklistClient.end(); |
| 160 | }; | |
| 161 | ||
| 162 |
| Line | Hits | Source |
|---|---|---|
| 1 | // Copyright 2012 Kinvey, Inc | |
| 2 | // | |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 | // you may not use this file except in compliance with the License. | |
| 5 | // You may obtain a copy of the License at | |
| 6 | // | |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 | // | |
| 9 | // Unless required by applicable law or agreed to in writing, software | |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 | // See the License for the specific language governing permissions and | |
| 13 | // limitations under the License. | |
| 14 | ||
| 15 | ||
| 16 | 1 | "use strict"; |
| 17 | ||
| 18 | 1 | var uuid = require('node-uuid') |
| 19 | , util = require('util') | |
| 20 | , redis = require('redis') | |
| 21 | , EventEmitter = require('events').EventEmitter | |
| 22 | , connect | |
| 23 | , MessageBus | |
| 24 | , makeId | |
| 25 | , wrapError | |
| 26 | , initResponder | |
| 27 | , initCreator | |
| 28 | , authClient | |
| 29 | , completeTransaction | |
| 30 | , KILOBYTES = 1024 | |
| 31 | , MEGABYTES = 1024 * 1024 | |
| 32 | , log; | |
| 33 | ||
| 34 | ||
| 35 | 1 | log = function(msg){ |
| 36 | 0 | var d = new Date() |
| 37 | , t = d.toISOString(); | |
| 38 | 0 | util.debug(t + ": " + msg); |
| 39 | }; | |
| 40 | ||
| 41 | 1 | wrapError = function(that, evt){ |
| 42 | 1 | that.emit('error', evt); |
| 43 | }; | |
| 44 | ||
| 45 | 1 | exports.makeId = makeId = function(){ |
| 46 | 10258 | return uuid().replace(/-/g, ""); |
| 47 | }; | |
| 48 | ||
| 49 | 1 | initCreator = function(that){ |
| 50 | 57 | that.subClient.subscribe(that.listenChannel); |
| 51 | 57 | that.subClient.on('message', function(channel, message){ |
| 52 | 14 | completeTransaction(that, channel, message); |
| 53 | }); | |
| 54 | }; | |
| 55 | ||
| 56 | ||
| 57 | 1 | initResponder = function(that){ |
| 58 | 56 | var util = require('util'); |
| 59 | 56 | that.subClient.subscribe(that.broadcastChannel); |
| 60 | 56 | that.subClient.on('message', function(channel, message){ |
| 61 | 28 | var msgBody, id; |
| 62 | 28 | if (channel !== that.broadcastChannel){ |
| 63 | // This shouldn't happen, it would mean | |
| 64 | // that we've accidentally subscribed to | |
| 65 | // an extra redis channel... | |
| 66 | // If for some crazy reason we get an incorrect | |
| 67 | // channel we should ignore the message | |
| 68 | 0 | return; |
| 69 | } | |
| 70 | 28 | id = message; |
| 71 | 28 | that.emit('data_available', id); |
| 72 | }); | |
| 73 | }; | |
| 74 | ||
| 75 | 1 | authClient = function(password, client, debug){ |
| 76 | 339 | if (password){ |
| 77 | 3 | client.auth(password, function(res){ |
| 78 | 0 | if (debug){ |
| 79 | 0 | console.log("Auth'd redis messaging started with " + res); |
| 80 | } | |
| 81 | }); | |
| 82 | } | |
| 83 | }; | |
| 84 | ||
| 85 | 1 | exports.MessageBus = MessageBus = function(options){ |
| 86 | 113 | var that; |
| 87 | ||
| 88 | 113 | if (!options){ |
| 89 | 24 | options = {}; |
| 90 | } | |
| 91 | ||
| 92 | 113 | EventEmitter.call(this); |
| 93 | 113 | this.redisHost = null; |
| 94 | 113 | this.redisPort = null; |
| 95 | 113 | this.redisPassword = null; |
| 96 | 113 | this.debug_is_enabled = false; |
| 97 | 113 | this.idToChannelMap = {}; |
| 98 | 113 | this.callbackMap = {}; |
| 99 | 113 | this.listenChannel = "msgChannels:" + makeId(); |
| 100 | 113 | this.broadcastChannel = (options && options.broadcast) || "msgChannels:broadcast"; |
| 101 | 113 | this.dataHash = (options && options.dataHash) || "msgTable:normal"; |
| 102 | 113 | this.responseHash = (options && options.outputHash) || "responseTable:normal"; |
| 103 | 113 | this.shutdownFlag = false; |
| 104 | 113 | this.id = makeId(); // For debugging / logging |
| 105 | ||
| 106 | 117 | if (options && options.host){ this.redisHost = options.host; } |
| 107 | 115 | if (options && options.port){ this.redisPort = options.port; } |
| 108 | 114 | if (options && options.password){ this.redisPassword = options.password; } |
| 109 | ||
| 110 | 113 | this.pubClient = redis.createClient(this.redisPort, this.redisHost); |
| 111 | 113 | this.subClient = redis.createClient(this.redisPort, this.redisHost); |
| 112 | 113 | this.dataClient = redis.createClient(this.redisPort, this.redisHost); |
| 113 | ||
| 114 | 113 | authClient(this.redisPassword, this.dataClient, this.debug_is_enabled); |
| 115 | 113 | authClient(this.redisPassword, this.pubClient, this.debug_is_enabled); |
| 116 | 113 | authClient(this.redisPassword, this.subClient, this.debug_is_enabled); |
| 117 | ||
| 118 | 113 | if (options && options.isResponder){ |
| 119 | 56 | initResponder(this); |
| 120 | } else { | |
| 121 | 57 | initCreator(this); |
| 122 | } | |
| 123 | ||
| 124 | 113 | that = this; |
| 125 | 113 | this.dataClient.on('error', function(evt){wrapError(that, evt);}); |
| 126 | 114 | this.pubClient.on('error', function(evt){wrapError(that, evt);}); |
| 127 | 113 | this.subClient.on('error', function(evt){wrapError(that, evt);}); |
| 128 | }; | |
| 129 | ||
| 130 | 1 | util.inherits(MessageBus, EventEmitter); |
| 131 | ||
| 132 | 1 | completeTransaction = function(that, ch, msg){ |
| 133 | 14 | var response, id, status, parsedMsg, daFaq, callback, multi; |
| 134 | ||
| 135 | 14 | if (ch !== that.listenChannel){ |
| 136 | // This can NEVER happen (except for if there's an error) | |
| 137 | // in our code. So the check is left in, but | |
| 138 | // should never be hit. | |
| 139 | 0 | that.emit('error', new Error("Got message for some other channel (Expected: " + |
| 140 | that.listenChannel + ", Actual: " + ch + ")")); | |
| 141 | // Bail out! | |
| 142 | 0 | return; |
| 143 | } | |
| 144 | ||
| 145 | 14 | parsedMsg = msg.split(' '); |
| 146 | 14 | if (parsedMsg.length < 2){ |
| 147 | 1 | that.emit('error', new Error("Invalid message received!")); |
| 148 | // Bail out | |
| 149 | 1 | return; |
| 150 | } | |
| 151 | ||
| 152 | 13 | id = parsedMsg[0]; |
| 153 | 13 | status = parsedMsg[1]; |
| 154 | ||
| 155 | 13 | callback = that.callbackMap[id]; |
| 156 | ||
| 157 | 13 | multi = that.dataClient.multi(); |
| 158 | ||
| 159 | 13 | multi.hget(that.responseHash, id) |
| 160 | .hdel(that.responseHash, id) | |
| 161 | .exec(function(err, reply){ | |
| 162 | 12 | var myErr = null, respondedErr, o; |
| 163 | 12 | if (err){ |
| 164 | 0 | myErr = new Error("REDIS Error: " + err); |
| 165 | } | |
| 166 | ||
| 167 | 12 | if (!util.isArray(reply) && !myErr){ |
| 168 | 0 | myErr = new Error("Internal REDIS error (" + err + ", " + reply + ")"); |
| 169 | 12 | } else if (util.isArray(reply)){ |
| 170 | // Reply[0] => hget, Reply[1] => hdel | |
| 171 | 12 | reply = reply[0]; |
| 172 | } | |
| 173 | ||
| 174 | 12 | if (reply === null && !myErr){ |
| 175 | 1 | myErr = new Error("No message for id " + id); |
| 176 | } | |
| 177 | ||
| 178 | ||
| 179 | 12 | if (!myErr){ |
| 180 | 11 | try { |
| 181 | 11 | response = JSON.parse(reply); |
| 182 | } catch(e){ | |
| 183 | 1 | myErr = new Error("JSON parsing failed! " + e); |
| 184 | } | |
| 185 | } | |
| 186 | ||
| 187 | 12 | if (myErr){ |
| 188 | 2 | response = myErr; |
| 189 | 2 | that.emit('error', myErr); |
| 190 | 2 | return; |
| 191 | } | |
| 192 | ||
| 193 | 10 | if (status !== 'SUCCESS'){ |
| 194 | // Build the error | |
| 195 | 1 | that.emit('error', response); |
| 196 | ||
| 197 | // If we're hard failed we stop working here, | |
| 198 | // no success | |
| 199 | 1 | if (status === "FAILED"){ |
| 200 | 1 | if (callback){ |
| 201 | 1 | callback(response); |
| 202 | 1 | delete that.callbackMap[id]; |
| 203 | } | |
| 204 | 1 | return; |
| 205 | } | |
| 206 | } | |
| 207 | ||
| 208 | ||
| 209 | 9 | that.emit('responseReady', response); |
| 210 | ||
| 211 | 9 | if (callback){ |
| 212 | 9 | callback(response); |
| 213 | 9 | delete that.callbackMap[id]; |
| 214 | } | |
| 215 | }); | |
| 216 | ||
| 217 | }; | |
| 218 | ||
| 219 | 1 | MessageBus.prototype.sendMessage = function(id, msg, callback){ |
| 220 | 21 | var that = this |
| 221 | , err | |
| 222 | , msgString; | |
| 223 | ||
| 224 | 21 | if (this.shutdownFlag){ |
| 225 | 1 | callback(new Error("Attempt to use shutdown MessageBus.")); |
| 226 | 1 | return; |
| 227 | } | |
| 228 | ||
| 229 | 20 | msg._listenChannel = this.listenChannel; |
| 230 | 20 | msg._messageId = id; // Store the message id in-band |
| 231 | ||
| 232 | 20 | msgString = JSON.stringify(msg); |
| 233 | ||
| 234 | 20 | if (!msgString){ |
| 235 | 1 | callback(new Error("Error converting message to JSON.")); |
| 236 | 1 | return; |
| 237 | } | |
| 238 | ||
| 239 | // TODO: This needs to be an option for the class | |
| 240 | 19 | if (msgString.length > MEGABYTES){ |
| 241 | 1 | callback(new Error("Payload too large!")); |
| 242 | 1 | return; |
| 243 | } | |
| 244 | ||
| 245 | 18 | this.dataClient.hset(this.dataHash, id, msgString, function(err, reply){ |
| 246 | 17 | if (err){ |
| 247 | 0 | callback(new Error("Error sending message: " + err)); |
| 248 | 0 | return; |
| 249 | } | |
| 250 | 17 | that.pubClient.publish(that.broadcastChannel, id); |
| 251 | }); | |
| 252 | 18 | this.callbackMap[id] = callback; |
| 253 | }; | |
| 254 | ||
| 255 | 1 | MessageBus.prototype.acceptMessage = function(mid, callback){ |
| 256 | 23 | var that = this |
| 257 | , multi | |
| 258 | , derivedId; | |
| 259 | ||
| 260 | 23 | if (!mid){ |
| 261 | 2 | throw new Error('Missing Message ID.'); |
| 262 | } | |
| 263 | ||
| 264 | 21 | if (!callback){ |
| 265 | 1 | throw new Error('Invalid callback.'); |
| 266 | } | |
| 267 | ||
| 268 | 20 | if (callback.length < 1){ |
| 269 | 1 | throw new Error('Missing parameters in callback.'); |
| 270 | } | |
| 271 | ||
| 272 | 19 | multi = that.dataClient.multi(); |
| 273 | ||
| 274 | // Pipeline! | |
| 275 | 19 | multi |
| 276 | .hget(that.dataHash, mid) | |
| 277 | .hdel(that.dataHash, mid) | |
| 278 | .exec(function(err, reply){ | |
| 279 | 19 | var listenChannel, id, dataErr, msgBody; |
| 280 | ||
| 281 | // First make sure that everything is probably ok | |
| 282 | 19 | if (!util.isArray(reply)){ |
| 283 | 0 | dataErr = new Error("Internal REDIS error (" + err + ", " + reply + ")"); |
| 284 | 0 | callback(dataErr); |
| 285 | 0 | return; |
| 286 | } else { | |
| 287 | // reply is good, let's just get the first element | |
| 288 | // as the indeices should be hget,hdel | |
| 289 | 19 | reply = reply[0]; |
| 290 | } | |
| 291 | ||
| 292 | 19 | try { |
| 293 | 19 | msgBody = JSON.parse(reply); |
| 294 | } catch(e) { | |
| 295 | 1 | dataErr = new Error("Bad data in sent message, " + e); |
| 296 | 1 | callback(dataErr); |
| 297 | 1 | return; |
| 298 | } | |
| 299 | ||
| 300 | 18 | if (!msgBody){ |
| 301 | 3 | dataErr = new Error("DB doesn't recognize message"); |
| 302 | 3 | callback(dataErr); |
| 303 | 3 | return; |
| 304 | } | |
| 305 | ||
| 306 | 15 | derivedId = id = msgBody._messageId; |
| 307 | 15 | listenChannel = msgBody._listenChannel; |
| 308 | ||
| 309 | 15 | if (id !== mid){ |
| 310 | 0 | console.log("ERROR: Mis-match on ids! (" + id + " does not equal " + mid + ")"); |
| 311 | } | |
| 312 | ||
| 313 | 15 | that.idToChannelMap[id] = listenChannel; |
| 314 | ||
| 315 | ||
| 316 | 15 | delete msgBody._listenChannel; |
| 317 | 15 | delete msgBody._messageId; |
| 318 | ||
| 319 | 15 | callback(msgBody); |
| 320 | ||
| 321 | }); | |
| 322 | }; | |
| 323 | ||
| 324 | 1 | MessageBus.prototype.sendResponse = function(msgId, status, msg){ |
| 325 | 37 | if (!msgId || !status || !msg){ |
| 326 | 5 | throw new Error("Missing msgId, status or msg."); |
| 327 | } | |
| 328 | ||
| 329 | 32 | if (status !== 'SUCCESS' && |
| 330 | status !== 'ERROR' && | |
| 331 | status !== 'FAILED') | |
| 332 | { | |
| 333 | 12 | throw new Error(status + ' is not a valid status.'); |
| 334 | } | |
| 335 | ||
| 336 | 20 | var listenChannel = this.idToChannelMap[msgId] |
| 337 | , that = this, serializableError, o, tmpErr, testMode; | |
| 338 | ||
| 339 | 20 | if (arguments.length === 4){ |
| 340 | 3 | testMode = arguments[3]; |
| 341 | } | |
| 342 | ||
| 343 | 20 | if (!listenChannel && !testMode){ |
| 344 | // We have no record of this request | |
| 345 | // probably a callback being called twice, but | |
| 346 | // still need to throw | |
| 347 | 1 | tmpErr = new Error('Attempt to respond to message that was never accepted'); |
| 348 | 1 | tmpErr.debugMessage = msgId + " is not registered as having been accepted"; |
| 349 | 1 | throw tmpErr; |
| 350 | } | |
| 351 | ||
| 352 | 19 | this.dataClient.hset(this.responseHash, msgId, JSON.stringify(msg), function(err, reply){ |
| 353 | 16 | delete that.idToChannelMap[msgId]; |
| 354 | ||
| 355 | 16 | if (err){ |
| 356 | 0 | throw new Error(err); |
| 357 | } | |
| 358 | 16 | that.pubClient.publish(listenChannel, msgId + " " + status); |
| 359 | }); | |
| 360 | }; | |
| 361 | ||
| 362 | ||
| 363 | 1 | MessageBus.prototype.shutdown = function(){ |
| 364 | 106 | this.subClient.removeAllListeners('message'); |
| 365 | 106 | this.dataClient.removeAllListeners('error'); |
| 366 | 106 | this.pubClient.removeAllListeners('error'); |
| 367 | 106 | this.subClient.removeAllListeners('error'); |
| 368 | 106 | this.dataClient.end(); |
| 369 | 106 | this.pubClient.end(); |
| 370 | 106 | this.subClient.end(); |
| 371 | 106 | this.removeAllListeners(); |
| 372 | 106 | this.shutdownFlag = true; |
| 373 | }; | |
| 374 | ||
| 375 | 1 | exports.connect = connect = function(options){ |
| 376 | 113 | return new MessageBus(options); |
| 377 | }; |
| Line | Hits | Source |
|---|---|---|
| 1 | // Copyright 2012 Kinvey, Inc | |
| 2 | // | |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 | // you may not use this file except in compliance with the License. | |
| 5 | // You may obtain a copy of the License at | |
| 6 | // | |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 | // | |
| 9 | // Unless required by applicable law or agreed to in writing, software | |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 | // See the License for the specific language governing permissions and | |
| 13 | // limitations under the License. | |
| 14 | 1 | "use strict"; |
| 15 | ||
| 16 | 1 | var redis = require("redis") |
| 17 | , util = require('util') | |
| 18 | , EventEmitter = require('events').EventEmitter | |
| 19 | , wrapError; | |
| 20 | ||
| 21 | ||
| 22 | 1 | wrapError = function(emitter){ |
| 23 | 36 | return function(error){ |
| 24 | 1 | emitter.emit('error', error); |
| 25 | }; | |
| 26 | }; | |
| 27 | ||
| 28 | ||
| 29 | 1 | var TaskLimit = exports.TaskLimit = function(options){ |
| 30 | 37 | EventEmitter.call(this); |
| 31 | ||
| 32 | 37 | if (!options){ |
| 33 | 1 | throw new Error("I need a task key!"); |
| 34 | } | |
| 35 | ||
| 36 | 37 | if (options.host){ this.redisHost = options.host; } |
| 37 | 37 | if (options.port){ this.redisPort = options.port; } |
| 38 | 37 | if (options.password){ this.redisPassword = options.password; } |
| 39 | ||
| 40 | 36 | this.taskKey = options.taskKey; |
| 41 | 36 | this.redisKeyPrefix = "taskKey:"; |
| 42 | 36 | this.maxTasksPerKey = options.maxTasksPerKey||10; |
| 43 | 36 | this.taskLimitClient = redis.createClient(this.redisPort, this.redisHost); |
| 44 | ||
| 45 | 36 | this.taskLimitClient.on('error', wrapError(this)); |
| 46 | ||
| 47 | 36 | if (options.password){ |
| 48 | 1 | this.taskLimitClient.auth(options.password); |
| 49 | } | |
| 50 | ||
| 51 | }; | |
| 52 | ||
| 53 | // Inherit EventEmitter's methods | |
| 54 | 1 | util.inherits(TaskLimit, EventEmitter); |
| 55 | ||
| 56 | 1 | TaskLimit.prototype.startTask = function(task, callback){ |
| 57 | 40 | var value = task && task[this.taskKey] |
| 58 | , redisKey, that = this; | |
| 59 | ||
| 60 | 40 | if (!callback){ |
| 61 | 8 | callback = function(){}; |
| 62 | } | |
| 63 | ||
| 64 | 40 | if (!value){ |
| 65 | 4 | callback(new Error("Invalid task, not running.")); |
| 66 | } else { | |
| 67 | 36 | redisKey = that.redisKeyPrefix + value; |
| 68 | 36 | that.taskLimitClient.llen(redisKey, function(err, len){ |
| 69 | 36 | if (len >= that.maxTasksPerKey){ |
| 70 | 2 | callback(new Error("Too many tasks")); |
| 71 | } else { | |
| 72 | 34 | that.taskLimitClient.lpush(redisKey, Date(), function(x){ |
| 73 | 34 | callback(len+1); |
| 74 | }); | |
| 75 | } | |
| 76 | }); | |
| 77 | } | |
| 78 | }; | |
| 79 | ||
| 80 | 1 | TaskLimit.prototype.stopTask = function(task){ |
| 81 | 33 | var value = task && task[this.taskKey] |
| 82 | , redisKey, that = this; | |
| 83 | ||
| 84 | ||
| 85 | 33 | if (!value){ |
| 86 | 4 | throw new Error("Invalid task, can't stop."); |
| 87 | } else { | |
| 88 | 29 | redisKey = that.redisKeyPrefix + value; |
| 89 | 29 | that.taskLimitClient.lpop(redisKey); |
| 90 | } | |
| 91 | }; | |
| 92 | ||
| 93 | 1 | TaskLimit.prototype.shutdown = function(){ |
| 94 | 32 | this.taskLimitClient.end(); |
| 95 | }; |