Skip to content

delegate work pattern support of version 0.4.0 #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,17 @@ below.

To flip the monitor back into an enabled state, send a `POST` request to `http://localhost:3000/enable`.


NOTE for 0.4.0 version
The major change is to support a general work delegation pattern between workers & master. In a few scenarios, we've seen duplicate work
done by each worker, that could be delegated to master to address and avoid the duplication of effort. And to make it general enough, we
defined the following delegation pattern:
worker -> master : message
message.type is "delegate"
message.delegate defines the actual message type
message.expect is optional, if not given, the delegate work is silently handled by master (e.g. logging remotely); if given, worker will expect a response message whose
type must equal message.expect; if given expect, the following will be enabled: message.matches defines the matching criteria of the response message, message.timeout defines
the max timeout of the delegate work. message.notification allows delegated work to publish changes detected later.
message.origin keeps the orginal message.
In cluster2, after master receives the message from worker, it would turn it into an event message, and find the proper listener to handle such.
The event handler could be config reader, remote logger, resource externalizer e.g. and they might/might not respond to master based on the expect.
1 change: 1 addition & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Cluster.prototype.listen = function(createApp, cb) {
noWorkers: self.options.noWorkers,
timeout: self.options.timeout || 30 * 1000, // idle socket timeout
connThreshold: self.options.connThreshold || 10000, // recycle workers after this many connections
heartbeatInterval: self.options.heartbeatInterval,
emitter: self
});

Expand Down
54 changes: 54 additions & 0 deletions lib/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,60 @@ var Monitor = module.exports = function Monitor(options) {
});
});

app.get('/admin/v3console/ValidateInternals', function(req, res){
//this will render the validate internal homepage
//including links to logs & cals
//and also link to component status
});

app.get('/admin/v3console/ComponentStatus(\?component=[^&]+)?', function(req, res){

//when component = empty, this will list all the component ids and links associated with them.

//otherwise, we'll collect the component status from all workers
var component = "component", //should come from param
view = "html",//should come from param
workers = [];//should come from master

var promises = _.map(workers, function(worker){
var defer = Q.defer();

worker.send("component", {
component: component,
view: view
});

var timeOut = setTimeout(function(){
defer.reject(new Error("time out"));
});

var handler = function(){
if(_.isEqual(message.type, "component") && _.isEqual(message.component, component)){
clearTimeout(timeOut);
worker.removeEventListener("message", handler);

defer.resolve(message.view);
}
};

worker.on("message", handler);

return defer.promise;
});

Q.allResolved(promises, function(resolves){

//monitor app's logic is to merge the view
res.writeHead(200, {
'Content-Type': _.isEqual(view, "json") ? 'application/json' : "text/html"
});

res.send("view");

res.end();
});
});

return app;
}

Expand Down
112 changes: 79 additions & 33 deletions lib/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ var Process = module.exports = function Process(options) {
return {
pid: process.pid,//using master's pid
uptime: (memoize.uptime || 0) + heartbeat.uptime,//sum
totalmem: (memoize.totalmem || 0) + heartbeat.totalmem,//sum
freemem: (memoize.freemem || 0) + heartbeat.freemem,//sum
totalConnections: (memoize.totalConnections || 0) + heartbeat.totalConnections,//sum
pendingConnections: (memoize.pendingConnections || 0) + heartbeat.pendingConnections,//sum
Expand All @@ -169,44 +170,88 @@ var Process = module.exports = function Process(options) {
self.emitter.emit('heartbeat', {
pid: process.pid,
uptime: aggr.uptime / count,//avg
totalmem: aggr.totalmem / count,//avg
freemem: aggr.freemem / count,//avg
totalConnections: aggr.totalConnections,//total
pendingConnections: aggr.pendingConnections,//total
timedoutConnections: aggr.timedoutConnections//total
});
}, 60000);

}, self.options.heartbeatInterval || 60000);
}
else if(message.type === "read-config"){

var sameOrigin = function(origin, message){
return _.isEqual(_.pick(origin, "domain", "target", "project", "config", "version"),
_.pick(message, "domain", "target", "project", "config", "version"));
},
else if(message.type === "delegate"){//delegate is a proposed work pattern between master & workers, there're jobs workers would like to delegate to master
//and master would use messaging to interact with the actual handler module, and send the result back to all workers; besides, such handler might notify
//the master whenever there're some change of the results to publish to all workers in the future.
var delegate = message.delegate,
expect = message.expect;

if(expect){//there're jobs which expects immediate responses, in case of those, onExpect handler is created, timeout might be applied
var deferred = Q.defer(),
origin = message,
deferred = Q.defer();
matches = message.matches,
targets = message.targets,
isExpected = function(origin, response){
return _.reduce(matches, function(memoize, match){
return memoize && _.isEqual(origin[match], response[match]);
},
true);
},
onExpect = function(message){
if(isExpected(origin, message)){
self.emitter.removeListener(expect, onExpect);
deferred.resolve(message);
}
},
send = function(message){
message.type = expect;
if(targets){
var workers = _.reduce(self.workers, function(memoize, worker){
memoize[worker.pid] = worker;
return memoize;
}, {});
_.each(_.compact(targets), function(target){
var targetWorker = workers[target];
if(targetWorker){
targetWorker.send(message);
}
});
}
else{
self.notifyWorkers(message);
}
},
timeOut = setTimeout(function(){
deferred.reject(new Error("timeout"));
}, message.timeOut || 10000);

self.emitter.on(expect, onExpect);

deferred.promise
.then(function(message){
clearTimeout(timeOut);
send(message);
})
.fail(function(error){
message.error = error;
send(message);
})
.fin(function(){
if(message.notification){//this is for future update notifications, and registered afterwards.
if(!self._notifications){
self._notifications = {};
}
if(!self._notifications[expect]){//make sure notifications won't be repeatedly registered.
self._notifications[expect] = true;
self.emitter.on(expect, function(message){
send(message);
});
}
}
})
.done();
}

self.emitter.on("config-read", function(message){
if(sameOrigin(origin, message)){
deferred.resolve(message);
}
});
var timeOut = setTimeout(function(){
deferred.reject(new Error("timeout"));
}, 10000);//timeout after 10 seconds

//let either raptor-config or node-config module to handle the "read-config" message
//simple protocol is to wait for "config-read" response back within 10 secs;
self.emitter.emit("read-config", message);
deferred.promise
.then(function(message){
clearTimeout(timeOut);
self.notifyWorkers(message);
})
.fail(function(error){
message.error = error;
self.notifyWorkers(message);
})
.done();
self.emitter.emit(delegate, message);
}
});

Expand Down Expand Up @@ -430,24 +475,25 @@ Process.prototype.listen = function() {
}, 100);

// Heartbeat - make sure to clear this on 'close'
// TODO: Other details to include
var heartbeat = setInterval(function () {
var heartbeat = {
pid: process.pid,
uptime: Math.round(process.uptime()),
totalmem: os.totalmem(),
freemem: os.freemem(),
totalConnections: totalConns,
pendingConnections: conns,
timedoutConnections: timedoutConns
};
self.emitter.emit('heartbeat', heartbeat);

self.emitter.emit('heartbeat', heartbeat);
var toMaster = {
type:"heartbeat"
};
_.extend(toMaster, heartbeat);

process.send(toMaster);
}, 60000);
}, self.options.heartbeatInterval || 60000);

_.each(apps, function(app){
app.app.on('close', function() {
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"email": "[email protected]"
}],
"name": "cluster2",
"version": "0.3.9",
"version": "0.4.0",
"repository": {
"type": "git",
"url": "https://github.com/ql-io/cluster2"
Expand All @@ -23,6 +23,7 @@
},
"devDependencies": {
"express": "2.5.11",
"websocket": "",
"nodeunit": "",
"request": ""
},
Expand Down
2 changes: 1 addition & 1 deletion ports
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3198
3303
49 changes: 49 additions & 0 deletions test/cluster-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,52 @@ module.exports = {
emitter.emit('starting');
},

'start, check heartbeat and stop': function(test) {
var emitter = new EventEmitter(), child = start(emitter);

emitter.on('starting', function() {
waitForStart(child, emitter, test, 0, 100);
});

emitter.on('started', function () {

var timeOut = setTimeout(function(){
test.ok(false, "timeout and no heartbeat found");
stop(emitter);
}, 3000);

emitter.on('heartbeat', function(heartbeat){
test.ok(heartbeat.pid);
test.ok(heartbeat.uptime);
test.ok(heartbeat.totalmem);
test.ok(heartbeat.freemem);

clearTimeout(timeOut);
stop(emitter);
});
});

emitter.on('start failure', function (error) {
log('Failed to start ', error.stack || error);
test.ok(false, 'failed to start')
});

emitter.on('stopping', function() {
waitForStop.apply(null, [emitter, test, 0, 100])
});

emitter.on('stopped', function() {
log('Stopped');
// Assert that there are 0 pids.
fs.readdir('./pids', function(err, paths) {
test.equal(paths.length, 0);
});
test.done();
});

emitter.emit('starting');
},

'start, check ecv and stop': function(test) {
var emitter = new EventEmitter(), child = start(emitter);

Expand Down Expand Up @@ -535,6 +581,9 @@ function waitForStart(child, emitter, test) {
clearTimeout(timeOut);
deferred.resolve();
}
if(message.type === 'heartbeat'){
emitter.emit('heartbeat', message);
}
});

deferred.promise.then(function(){
Expand Down
9 changes: 8 additions & 1 deletion test/lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ var c = new Cluster({
connThreshold: 10,
ecv: {
control: true
}
},
heartbeatInterval: 1000
});

c.on('died', function(pid) {
Expand Down Expand Up @@ -82,6 +83,12 @@ c.on('SIGINT', function() {
});
});

c.on('heartbeat', function(heartbeat){

heartbeat.type = 'heartbeat';
process.send(heartbeat);
});

c.listen(function(cb) {
cb(server);
});
Loading