var util = require('util'),
child_process = require('child_process'),
events = require('events'),
_ = require('underscore');This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
var util = require('util'),
child_process = require('child_process'),
events = require('events'),
_ = require('underscore');A instance of Pool manages a pool of processes. Jobs can be enqueued and tasked to processes. When all the processes in the pool are busy, Jobs are kept in a FIFO backlog.
function Pool (options) {
var self = this;
events.EventEmitter.call(this);
this.id = _.uniqueId('pool-');
this.options = _.defaults(options || {}, Pool.DEFAULT_OPTIONS);
this.enqueue_ct = 0;
this.jobs_ct = 0;
this.backlog = [];
this.worker_processes = {};
this.respawn_workers = true;
}Pool.DEFAULT_OPTIONS = {max_jobs_per_process - Max number of jobs allowed per worker max_jobs_per_process: 64,max_processes - Max number of worker processes max_processes: Math.ceil(require('os').cpus().length * 1.25),retries - Number of times a failed job will be retried before
reporting failure retries: 0,Worker processes are allowed to spend a limited amount of time in various states before moving on. If this time is exceeded, the Pool will kill the process, assuming that the worker has become unresponsive.
timeout_initializing - Timeout while the process initializes timeout_initializing: 5000,timeout_working - Timeout while process works on a job timeout_working: 10000,timeout_recovering - Timeout while process recovers after a job timeout_recovering: 5000
};A Pool instance offers several events to which handlers can listen. These are mostly useful for monitoring the activity of the Pool, and aren't necessary for getting work done.
ready - the process pool is full and ready for jobsspawn (Process) - a Process has been spawnedexit (Process) - a Process has exitedtask (job, process) - a Process has been tasked with a Jobbacklog (job) - a Job has been queued into the backlogdone (job) - a Job has been completed (success or error)idle - the backlog has been drained and all Processes are idleutil.inherits(Pool, events.EventEmitter);
_.extend(Pool.prototype, { enqueue: function (options, cb) {
var self = this;
var job = new Job(self, options);
var _done = function (err, result) {
if (cb) {
try {
cb(err, result);
} catch(e) { // TODO: What to do here?
util.error("Problem with callback: " + e);
}
}
self.jobs_ct++;
self.emit('done', job);
};
job.on('success', function (r) {
_done(null, r);
}).on('failure', function (err) {
_done(err, null);
}).on('abort', function () {
self.backlog = _.without(self.backlog, job);
}).on('retry', function () {
self._taskWorkerProcess(job);
});
Job.ALL_EVENTS.forEach(function (name) {
job.on(name, function (data) {
self.emit('Job:' + name, job, data);
})
});
process.nextTick(function () {
self.enqueue_ct++;
self.emit('enqueue', job);
self._taskWorkerProcess(job);
});
return job;
},Spawn enough workers to fill the pool. Once they're all ready, emit a 'ready' event for the pool.
spawnWorkers: function (cb) {
var self = this;
var ready = {};
_.times(this.options.max_processes, function () {
var wp = self._spawnWorkerProcess();
if (wp) {
ready[wp.getPID()] = false;
wp.once('ready', function (wp) {
ready[wp.getPID()] = true;
if (_.every(_.values(ready))) {
self.emit('ready', self);
cb && cb();
}
});
}
});TODO: Need to handle failing spawns during startup
}, exit: function () {
this.respawn_workers = false;
for (pid in this.worker_processes) {
this.worker_processes[pid].exit();
}
this.worker_processes = {};
}, getStats: function () {
var backlog_ct = this.backlog.length;
var worker_processes = this.worker_processes;
var workers_ct = 0;
var busy_ct = 0;
for (pid in worker_processes) {
workers_ct++;
if (!worker_processes[pid].isReady()) { busy_ct++; }
}
return {
jobs: this.jobs_ct,
backlog: backlog_ct,
workers: workers_ct,
busy: busy_ct
};
},Given a Job, task a Process with its execution. If no Process is free, push it onto the backlog.
_taskWorkerProcess: function (job) {
var worker_process = this._findFreeWorkerProcess();
if (!worker_process) {
this.backlog.push(job);
this.emit('backlog', job);
} else {
job._watchWorker(worker_process);
worker_process.acceptJob(job);
this.emit('task', job, worker_process);
}
},Find an idle Process available for a new Job. Spawn a new process if the pool is not yet full. Return null, if the pool is full and completely busy.
_findFreeWorkerProcess: function () {
for (pid in this.worker_processes) {
var wp = this.worker_processes[pid];
if (wp.isReady()) {
return wp;
}
}
this._spawnWorkerProcess();
return null;
}, _spawnWorkerProcess: function () {
var self = this;Mind the process limit!
var pids = _.keys(this.worker_processes);
if (pids.length >= this.options.max_processes) { return null; }Spawn and track a new process...
var wp = new WorkerProcess(this, this.options.options);
this.worker_processes[wp.getPID()] = wp;
wp.on('ready', _.bind(this._onWorkerProcessReady, this))
.on('exit', _.bind(this._onWorkerProcessExit, this));Proxy all the known worker process events
WorkerProcess.ALL_EVENTS.forEach(function (name) {
wp.on(name, function (job, wp) {
self.emit('WorkerProcess:' + name, job, wp);
});
});
this.emit('spawn', wp);
return wp;
},React to a Process signalling its readiness to handle a job. If there's work waiting in the backlog, pick a job off the top. Otherwise, signal that the pool is idle.
_onWorkerProcessReady: function (worker_process) {
var self = this;
if (this.backlog.length) {HACK: Give other listeners a chance to do something with the ready message before the next job gets tasked.
process.nextTick(function () {
self._taskWorkerProcess(self.backlog.shift());
});
} else {
this.idle = _.chain(this.worker_processes).values()
.every(function (wp) { return wp.isReady(); })
.value();
if (this.idle && this.enqueue_ct > 0) {Refrain from emiting 'idle' event until at least some work has been done.
this.emit('idle');
}
}
},React to the exit of a Process. Drop the Process from the pool, spawn a replacement if necessary.
_onWorkerProcessExit: function (worker_process) {
var pid = worker_process.getPID();
delete this.worker_processes[pid];
if (this.respawn_workers) {
this._spawnWorkerProcess();
}
this.emit('exit', worker_process);
}
});A Worker instance provides the scaffolding for a worker module.
function Worker (options) {
var self = this;
this.options = options || {};
this.id = 'worker-' + process.pid;
events.EventEmitter.call(this);
process.on('message', function (msg) {
var fn_name = '_handle_' + msg.op;
if (fn_name in self) {
self[fn_name](msg.data);
}
});
process.on('uncaughtException', function (err) {
var err_str = ('stack' in err) ? err.stack : (''+err);
self.terminalFailure(err_str);
});
}A Worker instance emits several events in response to messages from the Pool. These are useful in getting things done in a worker module:
init (options) - received options for initialization & configurationjob (work_data) - received a job on which to workutil.inherits(Worker, events.EventEmitter);worker.ready()worker.start()worker.progress(data)worker.success(data)worker.failure(data)['ready', 'start', 'progress', 'success', 'failure'].forEach(function (name) {
Worker.prototype[name] = function (data) {
return this._send(name, data);
};
});worker.terminalFailure(data)Worker.prototype.terminalFailure = function (data) {
var self = this;
try {
process.send({op: 'terminalFailure', data: data});
} catch (e) {TODO: Anything better to do here? Master process died, usually.
util.error("Cannot contact master process, exiting: " + e);
}
process.exit();
};
_.extend(Worker.prototype, { _send: function (name, data) {
try {
process.send({op: name, data: data});
} catch (e) {TODO: Anything better to do here? Master process died, usually.
util.error(this.id + " tried to send " + name + ". " +
"Cannot contact master process, exiting: " + e);
process.exit();
}
}, _handle_init: function (data) {
this.options = _.defaults(
this.options || {},
data || {}
);
this.emit('init', this.options);
}, _handle_job: function (data) {
this.emit('job', data);
}
});A Job instance is obtained from Pool.enqueue(), and it represents a unit of work for the Process pool. It may eventually be picked up as a task by a Process. It emits events narrating the status of the task.
function Job (pool, options) {
var self = this;
events.EventEmitter.call(this);
this.id = _.uniqueId('job-');
this.options = options || {};
this.retries_left = pool.options.retries;
this.done = false;
this.worker_process = null;
this.listeners = {};
}A Job instance emits several events that are useful for monitoring the status of work:
start - work has been started on the jobabort - the job has been abortedprogress (result) - progress reported on the worksuccess (result) - the work has been completed successfullyfailure (error) - the process failed in performing the workretry - work on the job failed, but another attempt will be madeJob.ALL_EVENTS = ['start', 'abort', 'progress', 'success', 'failure', 'retry'];
util.inherits(Job, events.EventEmitter);_.extend(Job.prototype, {abort() - abort this job, killing the Process if it's in progress. abort: function () {
this._unwatchWorker();
this.emit('abort');
},
_watchWorker: function (wp) {
var self = this;
self.worker_process = wp;
var prefix = '_handleWorker_';
_.chain(this).functions().filter(function (name) {
return name.indexOf(prefix) === 0;
}).each(function (name) {
var ev_name = name.substr(prefix.length);
var handler = _.bind(self[name], self);
wp.on(ev_name, handler);
self.listeners[ev_name] = [wp, handler];
});
},
_unwatchWorker: function () {
var self = this;
_.each(this.listeners, function (pair, name) {
pair[0].removeListener(name, pair[1]);
delete self.listeners[name];
});
self.worker_process = null;
},
_handleWorker_start: function () {
this.emit('start');
},
_handleWorker_progress: function (wp, job, data) {
this.emit('progress', data);
},
_handleWorker_success: function (wp, job, data) {
this._unwatchWorker();
this.emit('success', data);
},
_handleWorker_failure: function (wp, job, data) {
this._unwatchWorker();
if (0 == this.retries_left) {
this.emit('failure', data);
} else {
this.retries_left--;
this.emit('retry', data);
}
},
_handleWorker_exit: function (wp) {
this._handleWorker_failure(wp, this, 'exit');
}
});A WorkerProcess manages a child process that can be associated with a Job. As a user of node-hirelings, you should never have to work with WorkerProcess instances. All the heavy lifting is done with Job instances.
function WorkerProcess (pool, options) {
var self = this;
events.EventEmitter.call(this);
this.state = WorkerProcess.spawning;
this.options = options;
this.job = null;
this.jobs_left = pool.options.max_jobs_per_process;
this._cprocess = child_process.fork(
pool.options.module,
[],
{ env: this._getEnvForWorker() }
);
this.id = 'workerprocess-' + this._cprocess.pid;
this._cprocess.on('exit', function () {
self.emit('exit', self);
});
process.nextTick(function () {
self._cprocess.on('message', _.bind(self._onMessage, self));
self.send({op: 'init', data: self.options});
self.setState('initializing');
});
}
WorkerProcess.STATES = {
initializing: 'initializing',
ready: 'ready',
working: 'working',
recovering: 'recovering',
exiting: 'exiting'
};
WorkerProcess.ALL_EVENTS = [
'initializing', 'ready', 'working', 'recovering', 'exiting',
'start', 'progress', 'success', 'failure', 'done', 'exit'
];
util.inherits(WorkerProcess, events.EventEmitter);
_.extend(WorkerProcess.prototype, {
setState: function (state) {
if (!(state in WorkerProcess.STATES)) { return; }
this.state = WorkerProcess.STATES[state];
this.emit(this.state, this);
},
send: function (data) {
this._cprocess.send(data);
},
isReady: function () {
return (this.state == WorkerProcess.STATES.ready);
},
acceptJob: function (job) {
var self = this;
if (this.state !== WorkerProcess.STATES.ready) {
throw "Process is not ready for work";
}
this.job = job;
job.on('abort', this.job_listener = function () {
self.exit();
});
this._cprocess.send({op: 'job', data: job.options});
this.setState('working');
},
flushJob: function () {
if (!this.job) { return; }
this.job.removeListener('abort', this.job_listener);
this.job = null;
},
getPID: function () {
return this._cprocess.pid;
},
exit: function () {
this.setState('exiting');
this._cprocess.kill();
this.flushJob();
},
_onMessage: function (msg) {
var fn_name = '_handleProcess_' + msg.op;
if (fn_name in this) {
return this[fn_name](msg);
} else {No-op
}
},
_handleProcess_ready: function (msg) {
this.jobs_left--;
if (this.jobs_left <= 0) {
return this.exit();
}
return this.setState('ready');
},
_handleProcess_start: function (msg) {
this.emit(msg.op, this, this.job);
},
_handleProcess_progress: function (msg) {
this.emit(msg.op, this, this.job, msg.data);
},
_handleProcess_success: function (msg) {
this.emit(msg.op, this, this.job, msg.data);
this.flushJob();
this.setState('recovering');
},
_handleProcess_failure: function (msg) {
this.emit(msg.op, this, this.job, msg.data);
this.flushJob();
this.setState('recovering');
},
_handleProcess_terminalFailure: function (msg) {
this.emit('failure', this, this.job, msg.data);
this.setState('exiting');
},
_getEnvForWorker: function() {
var env = {};
for (var i in process.env) {
env[i] = process.env[i];
}
delete env.NODE_WORKER_ID; //Node.js cluster worker marker for v0.6
delete env.NODE_UNIQUE_ID; //Node.js cluster worker marker for v0.7
return env;
}
});
Pool.ALL_EVENTS = _.union(
['spawn', 'ready', 'enqueue', 'task', 'backlog', 'done', 'idle'],
_.map(Job.ALL_EVENTS,
function (n) { return 'Job:' + n; }),
_.map(WorkerProcess.ALL_EVENTS,
function (n) { return 'WorkerProcess:' + n; })
);
module.exports = {
Pool: Pool,
Worker: Worker
};