Skip to content

Commit 26e26a0

Browse files
committed
Rtc/Chat: major refactor and client request fixes.
- lots of fixes around client requests and seqAppend atomic operation. - properly set initial sequence count for seqAppend key/value. - seqAppend was sending two requests for every command except for the first. - move docker spawning into rtctwst module. - start one slimerjs docker instance at a time. This avoids strange problem where initial pages disconnect from twst server right away. - in wait_chat_propagate, send all messages from the leader instead of spreading them out. The spread messages have trouble finishing once we reach 17 nodes in size.
1 parent 05ef488 commit 26e26a0

File tree

7 files changed

+222
-155
lines changed

7 files changed

+222
-155
lines changed

chat.js

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ function updateHistory() {
1111
}
1212

1313
function applyCmd(stateMachine, cmd) {
14-
//log("cmd:", JSON.stringify(cmd));
14+
//log("applyCmd:", JSON.stringify(cmd));
1515
switch (cmd.op) {
1616
case 'get':
1717
stateMachine[cmd.key];
@@ -25,35 +25,36 @@ function applyCmd(stateMachine, cmd) {
2525
// the target key is not a sequence.
2626
if (!'key' in cmd) { throw new Error("seqAppend missing 'key'") }
2727
if (!'value' in cmd) { throw new Error("seqAppend missing 'value'") }
28-
if (cmd.key in stateMachine) {
29-
if (!'cnt' in cmd) { throw new Error("seqAppend missing 'cnt'") }
30-
var seq = stateMachine[cmd.key];
31-
if (!('cnt' in seq && 'value' in seq)) {
32-
throw new Error("seqAppend on non-sequence");
33-
}
34-
if (cmd.cnt !== seq.cnt) {
35-
return [false, seq.cnt];
36-
} else {
37-
seq.value.push(cmd.value);
38-
seq.cnt += 1;
39-
setTimeout(updateHistory, 1);
40-
return [true, seq.cnt];
41-
}
28+
if (!'cnt' in cmd) { throw new Error("seqAppend missing 'cnt'") }
29+
30+
// Add the sequence key if it's not already there
31+
if (!(cmd.key in stateMachine)) {
32+
stateMachine[cmd.key] = {cnt: 0, value: []};
33+
}
34+
35+
var seq = stateMachine[cmd.key];
36+
if (!('cnt' in seq && 'value' in seq)) {
37+
throw new Error("seqAppend on non-sequence");
38+
}
39+
if (cmd.cnt !== seq.cnt) {
40+
return [false, seq.cnt];
4241
} else {
43-
stateMachine[cmd.key] = {cnt: 0, value: [cmd.value]};
42+
seq.value.push(cmd.value);
43+
seq.cnt += 1;
4444
setTimeout(updateHistory, 1);
45-
return [true, 0];
45+
return [true, seq.cnt];
4646
}
4747
}
4848
}
4949

5050
var sendTimeout = 100,
51+
sendTimer = null,
5152
curSeqCnt = 0,
5253
curSend = null,
5354
pendingSends = [];
5455

5556
function flushSends() {
56-
setTimeout(flushSends, sendTimeout);
57+
sendTimer = setTimeout(flushSends, sendTimeout);
5758
if (curSend || pendingSends.length === 0) { return; }
5859
var curSend = pendingSends.shift(),
5960
req = {op: 'seqAppend',
@@ -63,7 +64,12 @@ function flushSends() {
6364
clientRequest(req, function(result) {
6465
//console.log("result:", JSON.stringify(result));
6566
if (result.status !== 'success' || result.result[0] === false) {
67+
console.log("retrying seqAppend with cnt: " + result.result[1]);
6668
pendingSends.unshift(curSend);
69+
} else {
70+
// After a successful send, try again immediately
71+
clearTimeout(sendTimer);
72+
sendTimer = setTimeout(flushSends, 1);
6773
}
6874
curSend = null;
6975
curSeqCnt = result.result[1];

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@
2323
"express": "~4.11.2",
2424
"peer": "~0.2.8",
2525
"minimist": "1.1.0",
26-
"twst": "0.0.7"
26+
"twst": "0.0.8"
2727
}
2828
}

rtc.js

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -103,48 +103,58 @@ function rtcSend(targetId, rpcName, args, callback) {
103103
}
104104
}
105105
function rtcReceive(json) {
106-
var resp = JSON.parse(json),
107-
rpcName = resp[0],
108-
otherNodeId = resp[1],
109-
args = resp[2];
110-
rpcCounts[rpcName]++;
111-
112-
// Call the rpc indicated
113-
node[rpcName](args);
106+
if (node) {
107+
var resp = JSON.parse(json),
108+
rpcName = resp[0],
109+
otherNodeId = resp[1],
110+
args = resp[2];
111+
rpcCounts[rpcName]++;
112+
113+
// Call the rpc indicated
114+
node[rpcName](args);
115+
} else {
116+
console.warn("rtcReceive called before Raft cluster started");
117+
}
114118
}
115119

116120
// Wrap async clientRequest/clientRequestResponse messages into
117121
// a callback based clientRequest call
118122
var curLeaderId = null;
119-
var pendingClientRequest = null;
123+
var pendingClientRequests = [];
120124
function clientRequest(args, callback) {
121-
//log("clientRequest:", args);
122-
if (pendingClientRequest) {
123-
// TODO: fix this
124-
throw new Error("outstanding clientRequest");
125-
}
125+
//log("clientRequest:", JSON.stringify(args));
126126
args['responseId'] = nodeId;
127-
pendingClientRequest = {args: args, callback: callback};
128-
if (curLeaderId === null || curLeaderId === nodeId) {
129-
node.clientRequest(args);
130-
} else {
131-
rtcSend(curLeaderId, 'clientRequest', args);
127+
pendingClientRequests.push({args: args, callback: callback});
128+
clientRequestSend();
129+
}
130+
function clientRequestSend() {
131+
if (pendingClientRequests.length > 0) {
132+
var args = pendingClientRequests[0].args;
133+
if (curLeaderId === null || curLeaderId === nodeId) {
134+
//console.log("node.clientRequest:", JSON.stringify(args));
135+
node.clientRequest(args);
136+
} else {
137+
rtcSend(curLeaderId, 'clientRequest', args);
138+
}
132139
}
133140
}
141+
// TODO: if request response doesn't make it through then pending won't
142+
// be sent until next actual request and there will be a permanently
143+
// delayed request.
134144
function clientRequestResponse(result) {
135-
//log("clientRequestResponse:", result);
145+
//log("clientRequestResponse:", JSON.stringify(result));
136146
if (result.status === 'NOT_LEADER') {
137147
curLeaderId = result.leaderHint;
138-
if (pendingClientRequest) {
139-
var args = pendingClientRequest.args;
140-
//log("curLeaderId:", curLeaderId, nodeMap[curLeaderId]);
141-
rtcSend(curLeaderId, 'clientRequest', args);
142-
}
143-
} else {
144-
var callback = pendingClientRequest.callback,
145-
args = pendingClientRequest.args;
146-
pendingClientRequest = null;
148+
setTimeout(clientRequestSend, 1);
149+
} else if (result.status === 'success') {
150+
var callback = pendingClientRequests[0].callback,
151+
args = pendingClientRequests[0].args;
152+
pendingClientRequests.shift();
147153
callback(result);
154+
//setTimeout(clientRequestSend, 1);
155+
} else {
156+
console.log('Retrying failed clientRequest: ' + result.result);
157+
setTimeout(clientRequestSend, 1);
148158
}
149159
}
150160

@@ -156,7 +166,7 @@ function addRemoveServersAsync() {
156166
var changes = 0;
157167

158168
if (node && node._self.state === 'leader') {
159-
//log("addRemoveServersAsync, nodeMap IDs: " + Object.keys(nodeMap) +
169+
//log("addRemoveServersAsync, nodeMap IDs: " + Object.keys(nodeMap) +
160170
// ", serverMap IDs: " + Object.keys(node._self.serverMap));
161171

162172
// If an ID is in map1 but not in map2 then call rpc with

test/rtctwst.js

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
var spawn = require('child_process').spawn,
1+
var child_process = require('child_process'),
22
Twst = require('twst').Twst,
33
RtcTwst = null;
44

55
// Subclass Twst
66
exports.RtcTwst = RtcTwst = function(opts) {
77
Twst.call(this, opts);
8+
this.nextPageIndex = 0;
9+
810
//rtwst.on('return', function(idx, msg) { console.log('RETURN:', idx, msg); });
911
//rtwst.on('callback', function(idx, msg) { console.log('CALLBACK:', idx, msg); });
1012
this.on('error', function(idx, msg) {
@@ -22,6 +24,8 @@ exports.RtcTwst = RtcTwst = function(opts) {
2224
this.on('close', function(idx, msg) {
2325
console.log(idx + ' CLOSE:', msg.data);
2426
});
27+
28+
if (opts.startPages) { this.startPages(opts); }
2529
};
2630
RtcTwst.prototype = Object.create(Twst.prototype);
2731
RtcTwst.prototype.constructor = RtcTwst;
@@ -36,33 +40,63 @@ RtcTwst.prototype.cleanup_exit = function(exitCode) {
3640

3741
RtcTwst.prototype.dockerPage = function(url, opts) {
3842
opts = opts || {};
39-
var prefix = opts.prefix || "";
40-
var timeout = opts.timeout || 60000;
41-
var docker_args = ['run', '-it', '--rm',
42-
'-v', process.cwd() + '/test:/test',
43-
'slimerjs-wily',
44-
'slimerjs', '/test/launch.js',
45-
url,
46-
timeout];
43+
var self = this,
44+
prefix = opts.prefix || "",
45+
timeout = opts.timeout || 60000;
46+
docker_args = ['run', '-it', '-d', // '--rm',
47+
'-v', process.cwd() + '/test:/test',
48+
'slimerjs-wily',
49+
'slimerjs', '/test/launch.js',
50+
url,
51+
timeout];
4752

4853
console.log(prefix + 'launching slimerjs in docker');
49-
var page = spawn('docker', docker_args);
54+
var out = child_process.execFileSync('docker', docker_args,
55+
{encoding: 'utf8'}),
56+
id = out.replace(/[\r\n]*$/,'');
57+
//var page = child_process.spawn('docker', docker_args);
58+
console.log('launched docker container ' + id);
5059

51-
page.stdout.on('data', function(chunk) {
52-
if (opts.verbose) {
53-
var line = chunk.toString('utf8').replace(/\r\n$/,'');
54-
if (line === '') { return; }
55-
console.log(prefix + line);
60+
var page = child_process.spawn('docker', ['logs', '-t', '-f', id]);
61+
page.docker_id = id;
62+
page.on('close', function (code) {
63+
console.log(prefix + 'docker logs exited with code ' + code);
64+
if (opts.nocleanup) {
65+
process.exit(exitCode);
66+
} else {
67+
//this.cleanup_exit(code);
5668
}
5769
});
5870

59-
page.on('close', function (code) {
60-
console.log(prefix + 'docker container exited with code ' + code);
61-
cleanup_exit(this, code);
62-
});
71+
if (opts.verbose) {
72+
page.stdout.on('data', function(chunk) {
73+
var line = chunk.toString('utf8').replace(/\r\n$/,'');
74+
if (line === '') { return; }
75+
console.log(prefix + line);
76+
});
77+
}
6378
return page;
6479
}
6580

81+
RtcTwst.prototype.startPages = function(opts) {
82+
var self = this,
83+
cur_client_cnt = Object.keys(self.clients).length,
84+
prefix = opts.prefix + this.nextPageIndex + ': ';
85+
//console.log("startPages:", opts.clientCount, this.nextPageIndex, cur_client_cnt);
86+
if (cur_client_cnt >= opts.clientCount) {
87+
opts.pagesCallback();
88+
return;
89+
}
90+
if (this.nextPageIndex <= cur_client_cnt) {
91+
console.log('Starting docker client ' + this.nextPageIndex);
92+
self.dockerPage(opts.url, {prefix: prefix,
93+
timeout: opts.timeout,
94+
verbose: opts.verbose});
95+
this.nextPageIndex += 1;
96+
}
97+
setTimeout(function() { self.startPages(opts); }, 100);
98+
}
99+
66100
////////////////////////////////////
67101
// RTC specific
68102

@@ -135,7 +169,8 @@ RtcTwst.prototype.wait_cluster_up = function(timeout, callback) {
135169
} else if (elapsed > timeout) {
136170
callback(false, nodes, elapsed);
137171
} else {
138-
setTimeout(checkfn, 500);
172+
//setTimeout(checkfn, 500);
173+
setTimeout(checkfn, 100);
139174
}
140175
});
141176
}
@@ -164,6 +199,7 @@ RtcTwst.prototype.wait_cluster_predicate = function(timeout, predicate, callback
164199
}
165200
}
166201

202+
//console.log('Predicate data: ' + JSON.stringify(data));
167203
console.log('Predicate false: ' + JSON.stringify(falseStates) +
168204
', true: ' + JSON.stringify(trueStates) +
169205
', true count: ' + trueCnt);

0 commit comments

Comments
 (0)