Skip to content
This repository was archived by the owner on Nov 26, 2024. It is now read-only.

Commit 1c96b3a

Browse files
committed
Initial commit
0 parents  commit 1c96b3a

File tree

6 files changed

+267
-0
lines changed

6 files changed

+267
-0
lines changed

LICENSE

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
Copyright (c) 2012 Chorus Technologies, Inc. (Hubify)
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
4+
5+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
6+
7+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

README.md

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
Mule.js
2+
=======
3+
4+
Mule is a work queue for CPU intensive tasks. You can use it to offload tasks
5+
that would otherwise kill your fast, responsive event loop. It's a bit like
6+
threads a gogo, except using processes not threads.
7+
8+
Mule works by using node's child_process.fork() method to pre-fork a bunch of
9+
processes using a script you define. It sets up a task queue to which you can
10+
push blocking tasks onto and listen for the result. As worker processes become
11+
available they alert the work queue that they're ready to accept more work.
12+
Tasks are sent and results received using node's inbuilt IPC for forked node
13+
processes.
14+
15+
Contrived Example
16+
-----------------
17+
18+
Imagine you have a node process which needs to stay responsive to web requests,
19+
user input or whatever. However it has some heavy CPU intensive work to do
20+
calculating fibonacci numbers. Here's how mule can help unburden your poor
21+
server:
22+
23+
**parent.js**
24+
```javascript
25+
var WorkQueue = require('mule').WorkQueue;
26+
27+
var workQueue = new WorkQueue(__dirname + '/worker.js');
28+
29+
// Generate a series of fibonacci numbers using the work queue to avoid blocking.
30+
var waiting = 100;
31+
for (var i = 1; i <= 100; i++) {
32+
// Generate random number to calculate a fibonacci sequence on
33+
var n = Math.floor(Math.random() * 40) + 1;
34+
35+
// Wrap in anonymous function so we still have access to i & n
36+
(function (i, n) {
37+
workQueue.enqueue(n, function (result) {
38+
console.log(i + ': fibo(' + n + ') = ' + result);
39+
40+
if (--waiting === 0) {
41+
// All jobs are complete so we can safely exit
42+
console.log('\nDone.')
43+
process.exit(0);
44+
}
45+
});
46+
})(i, n);
47+
}
48+
49+
console.log('See, no blocking!');
50+
```
51+
52+
And here's the worker:
53+
54+
**worker.js**
55+
```javascript
56+
/**
57+
* Calculate a Fibonacci number. Note that if you ran this in the main event
58+
* loop it would block.
59+
*/
60+
function fibo (n) {
61+
return n > 1 ? fibo(n - 1) + fibo(n - 2) : 1;
62+
}
63+
64+
// This is where we accept tasks given to us from the parent process.
65+
process.on('message', function (message) {
66+
// Do some CPU intensive calculations with the number passed.
67+
var result = fibo(message);
68+
69+
// Send the result back to the parent process when done.
70+
process.send(result);
71+
});
72+
73+
/* Send ready signal so parent knows we're ready to accept tasks. This should
74+
* always be the last line of your worker process script. */
75+
process.send('READY');
76+
```
77+

examples/parent.js

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
var WorkQueue = require(__dirname + '/../lib/index').WorkQueue;
2+
3+
var workQueue = new WorkQueue(__dirname + '/worker.js');
4+
5+
// Generate a series of fibonacci numbers using the work queue to avoid blocking.
6+
var waiting = 100;
7+
for (var i = 1; i <= 100; i++) {
8+
// Generate random number to calculate a fibonacci sequence on
9+
var n = Math.floor(Math.random() * 40) + 1;
10+
11+
// Wrap in anonymous function so we still have access to i & n
12+
(function (i, n) {
13+
workQueue.enqueue(n, function (result) {
14+
console.log(i + ': fibo(' + n + ') = ' + result);
15+
16+
if (--waiting === 0) {
17+
// All jobs are complete so we can safely exit
18+
console.log('\nDone.')
19+
process.exit(0);
20+
}
21+
});
22+
})(i, n);
23+
}
24+
25+
console.log('See, no blocking!');

examples/worker.js

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/**
2+
* Calculate a Fibonacci number. Note that if you ran this in the main event
3+
* loop it would block.
4+
*/
5+
function fibo (n) {
6+
return n > 1 ? fibo(n - 1) + fibo(n - 2) : 1;
7+
}
8+
9+
// This is where we accept tasks given to us from the parent process.
10+
process.on('message', function (message) {
11+
// Do some CPU intensive calculations with the number passed.
12+
var result = fibo(message);
13+
14+
// Send the result back to the parent process when done.
15+
process.send(result);
16+
});
17+
18+
/* Send ready signal so parent knows we're ready to accept tasks. This should
19+
* always be the last line of your worker process script. */
20+
process.send('READY');

lib/index.js

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
var childProcess = require('child_process'),
2+
events = require('events'),
3+
os = require('os'),
4+
util = require('util');
5+
6+
7+
WorkerState = {
8+
STARTING: 'STARTING',
9+
READY: 'READY',
10+
BUSY: 'BUSY'
11+
};
12+
module.exports.WorkerState = WorkerState;
13+
14+
/**
15+
* Encapsulates a worker process. Manages state and notifying listeners of
16+
* changes in state.
17+
*/
18+
function Worker(workerScript) {
19+
this.process = childProcess.fork(workerScript);
20+
this.pid = this.process.pid;
21+
this.status = WorkerState.STARTING;
22+
23+
this.process.once('message', this.onReady.bind(this));
24+
25+
events.EventEmitter.call(this);
26+
}
27+
util.inherits(Worker, events.EventEmitter);
28+
29+
Worker.prototype.onReady = function (message) {
30+
if (this.status === WorkerState.STARTING) {
31+
console.log('Worker ' + this.pid + ' ready.');
32+
this.status = WorkerState.READY;
33+
this.emit('ready', this);
34+
}
35+
};
36+
37+
Worker.prototype.onMessage = function (callback, message) {
38+
callback(message);
39+
40+
this.status = WorkerState.READY;
41+
this.emit('ready', this);
42+
};
43+
44+
Worker.prototype.send = function (message, callback) {
45+
this.status = WorkerState.BUSY;
46+
this.emit('busy');
47+
48+
this.process.once('message', this.onMessage.bind(this, callback));
49+
this.process.send(message);
50+
};
51+
module.exports.Worker = Worker;
52+
53+
54+
function WorkQueue(workerScript, nWorkers) {
55+
this.workers = [];
56+
this.queue = [];
57+
58+
var self = this;
59+
function fork() {
60+
var worker = new Worker(workerScript);
61+
62+
worker.on('ready', self._run.bind(self));
63+
64+
worker.process.on('exit', function (code, signal) {
65+
if (code !== 0) { // Code will be non-zero if process dies suddenly
66+
console.warn('Worker process ' + worker.pid + ' died. Respawning...');
67+
for (var i = 0; i < self.workers.length; i++) {
68+
if (self.workers[i].pid === worker.pid) {
69+
self.workers.splice(i, 1); // Remove dead worker from pool.
70+
}
71+
}
72+
fork(); // FTW!
73+
}
74+
});
75+
76+
self.workers.push(worker);
77+
}
78+
79+
nWorkers = nWorkers || os.cpus().length;
80+
console.log('Starting ' + nWorkers + ' workers..');
81+
for (var i = 0; i < nWorkers; i++) {
82+
fork();
83+
}
84+
}
85+
86+
/**
87+
* Enqueue a task for a worker process to handle. A task can be any type of var,
88+
* as long your worker script knows what to do with it.
89+
*/
90+
WorkQueue.prototype.enqueue = function (task, callback) {
91+
this.queue.push({ task: task, callback: callback });
92+
process.nextTick(this._run.bind(this));
93+
};
94+
95+
WorkQueue.prototype._run = function (worker) {
96+
if (this.queue.length === 0) {
97+
return; // nothing to do
98+
}
99+
100+
if (!worker) {
101+
// Find the first available worker.
102+
for (var i = 0; i < this.workers.length; i++) {
103+
if (this.workers[i].status === WorkerState.READY) {
104+
worker = this.workers[i];
105+
break;
106+
}
107+
}
108+
}
109+
110+
if (!worker) {
111+
return; // there are no workers available to handle requests. Leave queue as is.
112+
}
113+
114+
var queued = this.queue.shift();
115+
worker.send(queued.task, queued.callback);
116+
};
117+
module.exports.WorkQueue = WorkQueue;

package.json

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"name": "mule",
3+
"version": "0.1.0",
4+
"author": "Hubify <[email protected]>",
5+
"description": "A simple process pool for unburdening the main event loop for CPU intensive tasks.",
6+
"contributors": [
7+
{
8+
"name": "Dave Kuhn",
9+
"email": "[email protected]"
10+
}
11+
],
12+
"main": "./lib/index.js",
13+
"repository": {
14+
"type": "git",
15+
"url": "https://github.com/Hubify/node-mule.git"
16+
},
17+
"license": "MIT",
18+
"engine": {
19+
"node": ">=0.8"
20+
}
21+
}

0 commit comments

Comments
 (0)