diff --git a/README.md b/README.md index bf3d3b6..484095a 100644 --- a/README.md +++ b/README.md @@ -242,3 +242,17 @@ below. To flip the monitor back into an enabled state, send a `POST` request to `http://localhost:3000/enable`. + +NOTE for 0.4.0 version +The major change is to support a general work delegation pattern between workers & master. In a few scenarios, we've seen duplicate work +done by each worker, that could be delegated to master to address and avoid the duplication of effort. And to make it general enough, we +defined the following delegation pattern: +worker -> master : message +message.type is "delegate" +message.delegate defines the actual message type +message.expect is optional, if not given, the delegate work is silently handled by master (e.g. logging remotely); if given, worker will expect a response message whose +type must equal message.expect; if given expect, the following will be enabled: message.matches defines the matching criteria of the response message, message.timeout defines +the max timeout of the delegate work. message.notification allows delegated work to publish changes detected later. +message.origin keeps the orginal message. +In cluster2, after master receives the message from worker, it would turn it into an event message, and find the proper listener to handle such. +The event handler could be config reader, remote logger, resource externalizer e.g. and they might/might not respond to master based on the expect. \ No newline at end of file diff --git a/lib/index.js b/lib/index.js index 79f3593..d799daa 100644 --- a/lib/index.js +++ b/lib/index.js @@ -76,6 +76,7 @@ Cluster.prototype.listen = function(createApp, cb) { noWorkers: self.options.noWorkers, timeout: self.options.timeout || 30 * 1000, // idle socket timeout connThreshold: self.options.connThreshold || 10000, // recycle workers after this many connections + heartbeatInterval: self.options.heartbeatInterval, emitter: self }); diff --git a/lib/monitor.js b/lib/monitor.js index f0ef750..1738025 100644 --- a/lib/monitor.js +++ b/lib/monitor.js @@ -100,6 +100,60 @@ var Monitor = module.exports = function Monitor(options) { }); }); + app.get('/admin/v3console/ValidateInternals', function(req, res){ + //this will render the validate internal homepage + //including links to logs & cals + //and also link to component status + }); + + app.get('/admin/v3console/ComponentStatus(\?component=[^&]+)?', function(req, res){ + + //when component = empty, this will list all the component ids and links associated with them. + + //otherwise, we'll collect the component status from all workers + var component = "component", //should come from param + view = "html",//should come from param + workers = [];//should come from master + + var promises = _.map(workers, function(worker){ + var defer = Q.defer(); + + worker.send("component", { + component: component, + view: view + }); + + var timeOut = setTimeout(function(){ + defer.reject(new Error("time out")); + }); + + var handler = function(){ + if(_.isEqual(message.type, "component") && _.isEqual(message.component, component)){ + clearTimeout(timeOut); + worker.removeEventListener("message", handler); + + defer.resolve(message.view); + } + }; + + worker.on("message", handler); + + return defer.promise; + }); + + Q.allResolved(promises, function(resolves){ + + //monitor app's logic is to merge the view + res.writeHead(200, { + 'Content-Type': _.isEqual(view, "json") ? 'application/json' : "text/html" + }); + + res.send("view"); + + res.end(); + }); + }); + return app; } diff --git a/lib/process.js b/lib/process.js index 16e01be..b2b05cf 100644 --- a/lib/process.js +++ b/lib/process.js @@ -153,6 +153,7 @@ var Process = module.exports = function Process(options) { return { pid: process.pid,//using master's pid uptime: (memoize.uptime || 0) + heartbeat.uptime,//sum + totalmem: (memoize.totalmem || 0) + heartbeat.totalmem,//sum freemem: (memoize.freemem || 0) + heartbeat.freemem,//sum totalConnections: (memoize.totalConnections || 0) + heartbeat.totalConnections,//sum pendingConnections: (memoize.pendingConnections || 0) + heartbeat.pendingConnections,//sum @@ -169,44 +170,88 @@ var Process = module.exports = function Process(options) { self.emitter.emit('heartbeat', { pid: process.pid, uptime: aggr.uptime / count,//avg + totalmem: aggr.totalmem / count,//avg freemem: aggr.freemem / count,//avg totalConnections: aggr.totalConnections,//total pendingConnections: aggr.pendingConnections,//total timedoutConnections: aggr.timedoutConnections//total }); - }, 60000); + + }, self.options.heartbeatInterval || 60000); } - else if(message.type === "read-config"){ - - var sameOrigin = function(origin, message){ - return _.isEqual(_.pick(origin, "domain", "target", "project", "config", "version"), - _.pick(message, "domain", "target", "project", "config", "version")); - }, + else if(message.type === "delegate"){//delegate is a proposed work pattern between master & workers, there're jobs workers would like to delegate to master + //and master would use messaging to interact with the actual handler module, and send the result back to all workers; besides, such handler might notify + //the master whenever there're some change of the results to publish to all workers in the future. + var delegate = message.delegate, + expect = message.expect; + + if(expect){//there're jobs which expects immediate responses, in case of those, onExpect handler is created, timeout might be applied + var deferred = Q.defer(), origin = message, - deferred = Q.defer(); + matches = message.matches, + targets = message.targets, + isExpected = function(origin, response){ + return _.reduce(matches, function(memoize, match){ + return memoize && _.isEqual(origin[match], response[match]); + }, + true); + }, + onExpect = function(message){ + if(isExpected(origin, message)){ + self.emitter.removeListener(expect, onExpect); + deferred.resolve(message); + } + }, + send = function(message){ + message.type = expect; + if(targets){ + var workers = _.reduce(self.workers, function(memoize, worker){ + memoize[worker.pid] = worker; + return memoize; + }, {}); + _.each(_.compact(targets), function(target){ + var targetWorker = workers[target]; + if(targetWorker){ + targetWorker.send(message); + } + }); + } + else{ + self.notifyWorkers(message); + } + }, + timeOut = setTimeout(function(){ + deferred.reject(new Error("timeout")); + }, message.timeOut || 10000); + + self.emitter.on(expect, onExpect); + + deferred.promise + .then(function(message){ + clearTimeout(timeOut); + send(message); + }) + .fail(function(error){ + message.error = error; + send(message); + }) + .fin(function(){ + if(message.notification){//this is for future update notifications, and registered afterwards. + if(!self._notifications){ + self._notifications = {}; + } + if(!self._notifications[expect]){//make sure notifications won't be repeatedly registered. + self._notifications[expect] = true; + self.emitter.on(expect, function(message){ + send(message); + }); + } + } + }) + .done(); + } - self.emitter.on("config-read", function(message){ - if(sameOrigin(origin, message)){ - deferred.resolve(message); - } - }); - var timeOut = setTimeout(function(){ - deferred.reject(new Error("timeout")); - }, 10000);//timeout after 10 seconds - - //let either raptor-config or node-config module to handle the "read-config" message - //simple protocol is to wait for "config-read" response back within 10 secs; - self.emitter.emit("read-config", message); - deferred.promise - .then(function(message){ - clearTimeout(timeOut); - self.notifyWorkers(message); - }) - .fail(function(error){ - message.error = error; - self.notifyWorkers(message); - }) - .done(); + self.emitter.emit(delegate, message); } }); @@ -430,24 +475,25 @@ Process.prototype.listen = function() { }, 100); // Heartbeat - make sure to clear this on 'close' - // TODO: Other details to include var heartbeat = setInterval(function () { var heartbeat = { pid: process.pid, uptime: Math.round(process.uptime()), + totalmem: os.totalmem(), freemem: os.freemem(), totalConnections: totalConns, pendingConnections: conns, timedoutConnections: timedoutConns }; - self.emitter.emit('heartbeat', heartbeat); + self.emitter.emit('heartbeat', heartbeat); var toMaster = { type:"heartbeat" }; _.extend(toMaster, heartbeat); + process.send(toMaster); - }, 60000); + }, self.options.heartbeatInterval || 60000); _.each(apps, function(app){ app.app.on('close', function() { diff --git a/package.json b/package.json index 374dbad..1fe4f4a 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "email": "subbu@ebaysf.com" }], "name": "cluster2", - "version": "0.3.9", + "version": "0.4.0", "repository": { "type": "git", "url": "https://github.com/ql-io/cluster2" @@ -23,6 +23,7 @@ }, "devDependencies": { "express": "2.5.11", + "websocket": "", "nodeunit": "", "request": "" }, diff --git a/ports b/ports index a8f999a..71f48c0 100644 --- a/ports +++ b/ports @@ -1 +1 @@ -3198 \ No newline at end of file +3303 \ No newline at end of file diff --git a/test/cluster-test.js b/test/cluster-test.js index e957404..f6b7df8 100644 --- a/test/cluster-test.js +++ b/test/cluster-test.js @@ -91,6 +91,52 @@ module.exports = { emitter.emit('starting'); }, + 'start, check heartbeat and stop': function(test) { + var emitter = new EventEmitter(), child = start(emitter); + + emitter.on('starting', function() { + waitForStart(child, emitter, test, 0, 100); + }); + + emitter.on('started', function () { + + var timeOut = setTimeout(function(){ + test.ok(false, "timeout and no heartbeat found"); + stop(emitter); + }, 3000); + + emitter.on('heartbeat', function(heartbeat){ + test.ok(heartbeat.pid); + test.ok(heartbeat.uptime); + test.ok(heartbeat.totalmem); + test.ok(heartbeat.freemem); + + clearTimeout(timeOut); + stop(emitter); + }); + }); + + emitter.on('start failure', function (error) { + log('Failed to start ', error.stack || error); + test.ok(false, 'failed to start') + }); + + emitter.on('stopping', function() { + waitForStop.apply(null, [emitter, test, 0, 100]) + }); + + emitter.on('stopped', function() { + log('Stopped'); + // Assert that there are 0 pids. + fs.readdir('./pids', function(err, paths) { + test.equal(paths.length, 0); + }); + test.done(); + }); + + emitter.emit('starting'); + }, + 'start, check ecv and stop': function(test) { var emitter = new EventEmitter(), child = start(emitter); @@ -535,6 +581,9 @@ function waitForStart(child, emitter, test) { clearTimeout(timeOut); deferred.resolve(); } + if(message.type === 'heartbeat'){ + emitter.emit('heartbeat', message); + } }); deferred.promise.then(function(){ diff --git a/test/lib/server.js b/test/lib/server.js index 079fbe6..75cb8b1 100644 --- a/test/lib/server.js +++ b/test/lib/server.js @@ -39,7 +39,8 @@ var c = new Cluster({ connThreshold: 10, ecv: { control: true - } + }, + heartbeatInterval: 1000 }); c.on('died', function(pid) { @@ -82,6 +83,12 @@ c.on('SIGINT', function() { }); }); +c.on('heartbeat', function(heartbeat){ + + heartbeat.type = 'heartbeat'; + process.send(heartbeat); +}); + c.listen(function(cb) { cb(server); }); diff --git a/test/lib/wsserver.js b/test/lib/wsserver.js new file mode 100644 index 0000000..4b36154 --- /dev/null +++ b/test/lib/wsserver.js @@ -0,0 +1,135 @@ +/* + * Copyright 2012 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var Cluster = require('../../lib/index.js'), + WebSocketServer = require('websocket').server, + http = require("http"); + +var server = http.createServer(function(request, response) { + console.log((new Date()) + ' Received request for ' + request.url); + response.writeHead(200); + response.end(); +}); + +var wsServer = new WebSocketServer({ + httpServer: server, + // You should not use autoAcceptConnections for production + // applications, as it defeats all standard cross-origin protection + // facilities built into the protocol and the browser. You should + // *always* verify the connection's origin and decide whether or not + // to accept it. + autoAcceptConnections: false +}); + +function originIsAllowed(origin) { + // put logic here to detect whether the specified origin is allowed. + return true; +} + +wsServer.on('request', function(request) { + if (!originIsAllowed(request.origin)) { + // Make sure we only accept requests from an allowed origin + request.reject(); + console.log((new Date()) + ' Connection from origin ' + request.origin + ' rejected.'); + return; + } + + var connection = request.accept('echo-protocol', request.origin); + console.log((new Date()) + ' Connection accepted.'); + connection.on('message', function(message) { + + if (message.type === 'utf8') { + console.log('Process:' + process.pid + ' Received Message: ' + message.utf8Data); + connection.sendUTF(message.utf8Data); + } + else if (message.type === 'binary') { + console.log('Process:' + process.pid + ' Received Binary Message of ' + message.binaryData.length + ' bytes'); + connection.sendBytes(message.binaryData); + } + }); + connection.on('close', function(reasonCode, description) { + console.log((new Date()) + ' Peer ' + connection.remoteAddress + ' disconnected.'); + + wsServer.close(); + }); +}); + +server.on('close', function() { + serving = false; +}) + +var c = new Cluster({ + timeout: 300 * 1000, + port: process.env["port"] || 3000, + monPort: process.env["monPort"] || 10000 - process.env["port"] || 3001, + cluster: true, + noWorkers: process.env["noWorkers"] || 2, + connThreshold: 10, + ecv: { + control: true + }, + heartbeatInterval: 1000 +}); + +c.on('died', function(pid) { + console.log('Worker ' + pid + ' died'); + process.send({ + dead: true + }) +}); + +c.on('forked', function(pid) { + console.log('Worker ' + pid + ' forked'); +}); + +c.on('listening', function(pid){ + console.log('Worker ' + pid + ' listening'); + process.send({ + ready: true + }); +}); + +c.on('SIGKILL', function() { + console.log('Got SIGKILL'); + process.send({ + 'signal':'SIGKILL' + }); +}); + +c.on('SIGTERM', function(event) { + console.log('Got SIGTERM - shutting down'); + console.log(event); + process.send({ + 'signal':'SIGTERM' + }); +}); + +c.on('SIGINT', function() { + console.log('Got SIGINT'); + process.send({ + 'signal':'SIGINT' + }); +}); + +c.on('heartbeat', function(heartbeat){ + + heartbeat.type = 'heartbeat'; + process.send(heartbeat); +}); + +c.listen(function(cb) { + cb(server); +}); diff --git a/test/wscluster-test.js b/test/wscluster-test.js new file mode 100644 index 0000000..7276f50 --- /dev/null +++ b/test/wscluster-test.js @@ -0,0 +1,255 @@ +/* + * Copyright 2012 eBay Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +'use strict'; + +var spawn = require('child_process').spawn, + request = require('request'), + fs = require('fs'), + os = require('os'), + EventEmitter = require('events').EventEmitter, + util = require('util'), + _ = require('underscore'), + Q = require('q'), + WebSocketClient = require("websocket").client; + +var debug = false; +function log() { + if(debug) { + console.log.apply(null, (arguments || []).join('')); + } +} +var port = 3000, + monPort = 10000 - port; + +module.exports = { + + setUp: function (callback) { + //to ensure that occupying ports won't cause all test cases to fail + fs.exists("./ports", function(exists){ + if(!exists){ + fs.writeFileSync("./ports", port); + } + fs.readFile("./ports", + function(err, data){ + port = parseInt(data, 10) + 1; + if(port >= 5000){ + port = 3000; + } + monPort = 10000 - port; + fs.writeFile("./ports", "" + port, { + encoding : "utf8" + }, + function(){ + callback(); + }); + }); + }); + }, + + 'start and then stop': function(test) { + var emitter = new EventEmitter(), child = start(emitter); + + emitter.on('starting', function() { + waitForStart(child, emitter, test, 0, 100); + }); + + emitter.on('started', function () { + //create 10 clients and verify that the websocket has been opened and served by different cluster workers. + _.each(_.range(0, 10), function(e){ + var client = new WebSocketClient(); + client.on('connectFailed', function(error) { + console.log('Connect Error: ' + error.toString()); + }); + + client.on('connect', function(connection) { + console.log('WebSocket client connected'); + connection.on('error', function(error) { + console.log("Connection Error: " + error.toString()); + }); + connection.on('close', function() { + console.log('echo-protocol Connection Closed'); + }); + connection.on('message', function(message) { + if (message.type === 'utf8') { + console.log("Received: '" + message.utf8Data + "'"); + } + }); + + var now = new Date().getTime(); + function sendNumber() { + if (connection.connected) { + var number = Math.round(Math.random() * 0xFFFFFF); + connection.sendUTF(number.toString()); + } + if(new Date().getTime() - now < 10000){ + setTimeout(sendNumber, 1000); + } + else{ + stop(emitter); + } + } + sendNumber(); + }); + + client.connect('ws://localhost:' + port + '/', 'echo-protocol'); + }); + }); + + emitter.on('start failure', function (error) { + log('Failed to start ', error.stack || error); + test.ok(false, 'failed to start') + }); + + emitter.on('stopping', function() { + waitForStop.apply(null, [emitter, test, 0, 100]) + }); + + emitter.on('stopped', function() { + log('Stopped'); + // Assert that there are 0 pids. + fs.readdir('./pids', function(err, paths) { + test.equal(paths.length, 0); + }); + test.done(); + }); + + emitter.emit('starting'); + } +} + +// Start the cluster +function start(emitter) { + log('Starting'); + var env = {}; + _.extend(env, process.env); + _.extend(env, { + port:port, + monPort:monPort + }); + var start = spawn('node', ['test/lib/wsserver.js'], { + env: env, + stdio: ['pipe', 1, 2, 'ipc']//enable piped stdout, and ipc for messaging + }); + start.on('exit', function (code, signal) { + log('Process exited with signal ', signal, ' and code ', code); + }); + + return start; +} + +function stop(emitter) { + log('Stopping'); + var stop = spawn('node', ['test/lib/stop.js']); + stop.on('exit', function (code, signal) { + log('Process exited with signal ', signal, ' and code ', code); + }); + + stop.stdout.setEncoding('utf8'); + stop.stdout.on('data', function (data) { + log(data); + }); + emitter.emit('stopping'); +} + +function shutdown(emitter) { + log('Shutting down'); + var shutdown = spawn('node', ['test/lib/shutdown.js']); + shutdown.on('exit', function (code, signal) { + log('Process exited with signal ', signal, ' and code ', code); + }); + + shutdown.stdout.setEncoding('utf8'); + shutdown.stdout.on('data', function (data) { + log(data); + }); + emitter.emit('stopping'); +} + +/* + function waitForStart(child, emitter, test, current, max) { + current++; + if(current < max) { + request(util.format('http://localhost:%d', port), function (error, response, body) { + log('Waiting for server to start'); + if(error) { + log('Error: ', error.stack || error); + if(error.code === 'ECONNREFUSED') { + setTimeout(function () { + waitForStart.apply(null, [child, emitter, test, current, max]) + }, 100); + } + } + else { + emitter.emit('started'); + } + }); + } + else { + test.ok(false, 'Server did not start. Giving up'); + test.done(); + } + } + */ + +function waitForStart(child, emitter, test) { + + var deferred = Q.defer(); + var timeOut = setTimeout(function(){ + deferred.reject(new Error("timeout")); + }, 3000); + + child.on("message", function(message){ + if(message.ready){ + clearTimeout(timeOut); + deferred.resolve(); + } + if(message.type === 'heartbeat'){ + emitter.emit('heartbeat', message); + } + }); + + deferred.promise.then(function(){ + emitter.emit("started"); + }) + .fail(function(error){ + test.ok(false, error); + test.done(); + }); +} + +function waitForStop(emitter, test, current, max) { + current++; + if(current < max) { + request(util.format('http://localhost:%d', port), function (error, response, body) { + log('Waiting for server to stop'); + if(error) { + emitter.emit('stopped'); + } + else { + setTimeout(function () { + waitForStop.apply(null, [emitter, test, current, max]) + }, 100); + } + }); + } + else { + test.ok(false, 'Server did not start. Giving up'); + test.done(); + } +} + +