diff --git a/node/peer.js b/node/peer.js index 0605b3e68..0816d72f6 100644 --- a/node/peer.js +++ b/node/peer.js @@ -343,37 +343,6 @@ TChannelPeer.prototype.makeOutConnection = function makeOutConnection(socket) { return conn; }; -TChannelPeer.prototype.outPendingWeightedRandom = function outPendingWeightedRandom() { - // Returns a score in the range from 0 to 1, where it is preferable to use - // a peer with a higher score over one with a lower score. - // This range is divided among an infinite set of subranges corresponding - // to peers with the same number of pending requests. - // So, the range (1/2, 1) is reserved for peers with 0 pending connections. - // The range (1/4, 1/2) is reserved for peers with 1 pending connections. - // The range (1/8, 1/4) is reserved for peers with 2 pending connections. - // Ad nauseam. - // Within each equivalence class, each peer receives a uniform random - // value. - // - // The previous score was a weighted random variable: - // random() ** (1 + pending) - // This had the attribute that a less loaded peer was merely more likely to - // be chosen over a more loaded peer. - // We observed with the introduction of a heap, that a less favored peer - // would have its score less frequently re-evaluated. - // An emergent behavior was that scores would, over time, be squeezed - // toward zero and the least favored peer would remain the least favored - // for ever increasing durations. - // - // This remains true with this algorithm, within each equivalence class. - var self = this; - var pending = self.pendingIdentified + self.countOutPending(); - var max = Math.pow(0.5, pending); - var min = max / 2; - var diff = max - min; - return min + diff * self.random(); -}; - TChannelPeer.prototype.countOutPending = function countOutPending() { var self = this; var pending = 0; @@ -434,13 +403,18 @@ PreferOutgoingHandler.prototype.getTier = function getTier() { PreferOutgoingHandler.prototype.getScore = function getScore() { var self = this; - // space: + // domain, per tier of availability: // [0.1, 0.4) peers with no identified outgoing connection // [0.4, 1.0) identified outgoing connections - var random = self.peer.outPendingWeightedRandom(); - var qos = self.getTier(); - self.lastTier = qos; - switch (qos) { + + // within each domain, the score is a function of the number of pending + // requests. 1 for 0 pending, 1/2 for 1 pending, 1/3 for 2 pending, etc. + var pending = self.peer.pendingIdentified + self.peer.countOutPending(); + var pendingScore = 1 / (1 + pending); + + var tier = self.getTier(); + self.lastTier = tier; + switch (tier) { case TIER_ONLY_INCOMING: if (!self.peer.channel.destroyed) { self.peer.connect(); @@ -449,9 +423,9 @@ PreferOutgoingHandler.prototype.getScore = function getScore() { case TIER_UNCONNECTED: /* falls through */ case TIER_FRESH_OUTGOING: - return 0.1 + random * 0.3; + return 0.1 + pendingScore * 0.3; case TIER_READY_OUTGOING: - return 0.4 + random * 0.6; + return 0.4 + pendingScore * 0.6; } }; diff --git a/node/peer_heap.js b/node/peer_heap.js index 5dddcb060..0e9088906 100644 --- a/node/peer_heap.js +++ b/node/peer_heap.js @@ -31,6 +31,8 @@ function PeerHeap() { // TODO: worth it to keep a tail free list like TimeHeap? // self.end = 0; self._stack = []; + // This is a reusable array, used in the non-reentrant chooseEl routine. + self._candidates = []; } PeerHeap.prototype.choose = function choose(threshold, filter) { @@ -57,12 +59,41 @@ PeerHeap.prototype.choose = function choose(threshold, filter) { PeerHeap.prototype._chooseEl = function _chooseEl(threshold) { var self = this; + var i; - var el = self.array[0]; - if (el.score <= threshold) { // TODO: why inclusive? + var score = self.array[0].score; + + if (score <= threshold) { // TODO: why inclusive? return null; } + // Seed the candidate list with the one obvious maximum + self._candidates.push(0); + + // Find all elements atop the heap that share the maximum score + for (i = 0; i < self._candidates.length; i++) { + var left = (2 * i) + 1; + if (left >= self.array.length) { + continue; + } + if (self.array[left].score === score) { + self._candidates.push(left); + } + var right = left + 1; + if (right >= self.array.length) { + continue; + } + if (self.array[right].score === score) { + self._candidates.push(right); + } + } + + // Choose one candidate randomly + i = Math.floor(Math.random() * self._candidates.length); + var el = self.array[i]; + + self._candidates.length = 0; + return el; }; diff --git a/node/test/pool-of-servers.js b/node/test/pool-of-servers.js index 5842e7f6c..f01cc0e7f 100644 --- a/node/test/pool-of-servers.js +++ b/node/test/pool-of-servers.js @@ -25,7 +25,10 @@ var parallel = require('run-parallel'); var allocCluster = require('./lib/alloc-cluster.js'); allocCluster.test('request().send() to a pool of servers', { - numPeers: 5 + numPeers: 5, + channelOptions: { + choosePeerWithHeap: true + } }, function t(cluster, assert) { var client = cluster.channels[0]; @@ -174,7 +177,7 @@ allocCluster.test('request().send() to a pool of servers', { } var keys = Object.keys(byServer); - assert.equal(keys.length, numPeers, 'expected 25 servers'); + assert.equal(keys.length, numPeers, 'expected responses from all servers'); for (var k = 0; k < keys.length; k++) { var count = byServer[keys[k]];