Skip to content

Commit

Permalink
Add support for failover URIs
Browse files Browse the repository at this point in the history
  • Loading branch information
gdaws committed Sep 3, 2013
1 parent 2a76050 commit 87c79da
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Compatible with STOMP 1.0, 1.1 and 1.2 servers.
Send message:

require('stompit')
.broker()
.broker('failover:(tcp://localhost:61613,tcp://localhsot:61614)?randomize=false')
.send('/queue/a', 'hello queue a', function(error){
if(!error){
console.log('message sent');
Expand Down
145 changes: 133 additions & 12 deletions lib/failover.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
var events = require('events');
var util = require('./util');
var connect = require('./connect');
var url = require('url');

function Failover(servers, options){

servers = servers || [{}];

var defaults = {
initialReconnectDelay: 10,
maxReconnectDelay: 30000,
Expand All @@ -22,6 +21,19 @@ function Failover(servers, options){
randomize: true
};

switch(typeof servers){
case 'undefined':
servers = [{}]; // default server
break;
case 'string':
var uriConfig = this._parseFailoverUri(servers);
servers = uriConfig.servers;
options = util.extend(uriConfig.options, options);
break;
default:
break;
}

options = util.extend(defaults, options);

for(var key in defaults){
Expand All @@ -37,20 +49,29 @@ function Failover(servers, options){

util.inherits(Failover, events.EventEmitter);

Failover.prototype._createConnector = function(options){
Failover.prototype._createConnector = function(arg){

var connector;
var config;

if(typeof options === 'function'){
connector = options;
}
else{
connector = function(callback){
return connect(options, callback);
};
switch(typeof arg){
case 'function':
return arg;

case 'string':
config = this._parseServerUri(arg);
break;

case 'object':
config = arg;
break;

default:
throw new Error('invalid type for server config argument');
}

return connector;
return function(callback){
return connect(config, callback);
};
};

Failover.prototype.addServer = function(){
Expand Down Expand Up @@ -147,4 +168,104 @@ Failover.prototype.connect = function(callback){
connect();
};

Failover.prototype._parseFailoverUri = function(uri){

var serverList = uri;
var optionsQueryString = null;

var comps = uri.match(/^failover:\(([^\)]*)\)(\?.*)?$/);

if(comps){
serverList = comps[1];
optionsQueryString = comps[2];
}

var servers = serverList.length > 0 ? serverList.split(',') : [];
var options = optionsQueryString ? options = url.parse(optionsQueryString, true).query : {};

var validateBool = function(name){

if(!options.hasOwnProperty(name)){
return;
}

var value = ('' + options[name]).toLowerCase();

if(value === 'true' || value === '1'){
value = true;
}
else if(value === 'false' || value === '0'){
value = false;
}

if(typeof value !== 'boolean'){
throw new Error('invalid ' + name + ' value \'' + options[name] + '\' (expected boolean)');
}

options[name] = value;
};

var validateFloat = function(name, lowest, greatest){

if(!options.hasOwnProperty(name)){
return;
}

var value = parseFloat(options[name], 10);

if(isNaN(value) || value < lowest || value > greatest){
throw new Error('invalid ' + name + ' value \'' + options[name] + '\' (expected number between ' + lowest + ' and ' + greatest + ')');
}

options[name] = value;
};

validateFloat('initialReconnectDelay', 0, Infinity);
validateFloat('maxReconnectDelay', 0, Infinity);
validateBool('useExponentialBackOff');
validateFloat('reconnectDelayExponent', 0, Infinity);
validateFloat('maxReconnectAttempts', -1, Infinity);
validateFloat('maxReconnects', -1, Infinity);
validateBool('randomize');

return {
servers: servers,
options: options
};
};

Failover.prototype._parseServerUri = function(uri){

var comps = uri.match(/^\s*((\w+):\/\/)?(([^:]+):([^@]+)@)?([\w-.]+)(:(\d+))?\s*$/);

if(!comps){
throw new Error('could not parse server uri \'' + uri + '\'');
}

//var scheme = comps[2];
var login = comps[4];
var passcode = comps[5];
var hostname = comps[6];
var port = comps[8];

var server = {
host: hostname,
connectHeaders: {}
};

if(port !== void 0){
server.port = parseInt(port, 10);
}

if(login !== void 0){
server.connectHeaders.login = login;
}

if(passcode !== void 0){
server.connectHeaders.passcode = passcode;
}

return server;
};

module.exports = Failover;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "stompit",
"description": "STOMP client",
"version": "0.12.1",
"version": "0.13.0",
"keywords": [
"stomp", "messages", "queue"
],
Expand Down
131 changes: 131 additions & 0 deletions test/failover.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,135 @@ describe('Failover', function(){
});
});
});

describe("#_parseFailoverUri", function(){

var failover = new Failover([], {});
var parse = failover._parseFailoverUri.bind(failover);

it('should parse a simple uri', function(){
var ret = parse('failover:(primary,secondary)');
assert(typeof ret === 'object');
assert(ret.servers.length === 2);
assert(ret.servers[0] === 'primary');
assert(ret.servers[1] === 'secondary');
});

it('should parse a server list', function(){
var ret = parse('primary,secondary');
assert(typeof ret === 'object');
assert(ret.servers.length === 2);
assert(ret.servers[0] === 'primary');
assert(ret.servers[1] === 'secondary');
});

it('should parse query string', function(){
var ret = parse('failover:(primary)?var1=val1&var2=val2');
assert(typeof ret === 'object');
assert(typeof ret.options === 'object');
assert(ret.options.var1 === 'val1');
assert(ret.options.var2 === 'val2');
});

it('should accept an empty query string', function(){
var ret = parse('failover:(primary)?');
assert(ret.servers.length === 1 && ret.servers[0] === 'primary');
});

it('should cast values of known options', function(){

var ret = parse('failover:(primary)?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&maxReconnectAttempts=-1&maxReconnects=-1&randomize=true');

assert(ret.options.initialReconnectDelay === 10);
assert(ret.options.maxReconnectDelay === 30000);
assert(ret.options.useExponentialBackOff === true);
assert(ret.options.maxReconnectAttempts === -1);
assert(ret.options.maxReconnects === -1);
assert(ret.options.randomize === true);

assert(parse('failover:(primary)?randomize=TRUE').options.randomize === true);
assert(parse('failover:(primary)?randomize=1').options.randomize === true);

assert(parse('failover:(primary)?randomize=FALSE').options.randomize === false);
assert(parse('failover:(primary)?randomize=0').options.randomize === false);
});

it('should throw an error for invalid values of known options', function(){

var expectParseError = function(source){

var thrown = false;

try{
parse(source);
}catch(e){
thrown = true;
}

assert(thrown);
};

expectParseError('failover:(sasf)?initialReconnectDelay=zvxvsdf');
expectParseError('failover:(sasf)?initialReconnectDelay=-2');

expectParseError('failover:(sasf)?maxReconnectDelay=asdf');
expectParseError('failover:(sasf)?maxReconnectDelay=-34');

expectParseError('failover:(sasf)?useExponentialBackOff=asdf');
expectParseError('failover:(sasf)?useExponentialBackOff=-34');

expectParseError('failover:(sasf)?maxReconnectAttempts=asdf');
expectParseError('failover:(sasf)?maxReconnectAttempts=-34');

expectParseError('failover:(sasf)?maxReconnects=asdf');
expectParseError('failover:(sasf)?maxReconnects=-34');

expectParseError('failover:(sasf)?randomize=asdf');
});
});

describe('#_parseServerUri', function(){

var failover = new Failover([], {});
var parse = failover._parseServerUri.bind(failover);

it('should parse a typical uri', function(){
var ret = parse('tcp://localhost:61613');
assert(typeof ret === 'object');
assert(ret.host === 'localhost');
assert(ret.port === 61613);
});

it('should parse without a scheme', function(){
var ret = parse('localhost:1234');
assert(typeof ret === 'object');
assert(ret.host === 'localhost');
assert(ret.port === 1234);
});

it('should parse without a port', function(){
var ret = parse('localhost');
assert(ret.host === 'localhost');
assert(ret.port === void 0);
});

it('should parse login and passcode', function(){

var ret = parse('user:pass@localhost:123');
assert(ret.connectHeaders.login === 'user');
assert(ret.connectHeaders.passcode === 'pass');
assert(ret.host === 'localhost');
assert(ret.port === 123);

ret = parse('tcp://user:pass@localhost');
assert(ret.connectHeaders.login === 'user');
assert(ret.connectHeaders.passcode === 'pass');
assert(ret.host === 'localhost');
assert(ret.port === void 0);
});

it('should ignore leading and trailing whitespace', function(){
assert(parse(' localhost \t').host === 'localhost');
});
});
});

0 comments on commit 87c79da

Please sign in to comment.