diff --git a/README.md b/README.md index 31ac9ff..3af763c 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Compatible with STOMP 1.0, 1.1 and 1.2 servers. Send message: require('stompit') - .broker() + .broker('failover:(tcp://localhost:61613,tcp://localhsot:61614)?randomize=false') .send('/queue/a', 'hello queue a', function(error){ if(!error){ console.log('message sent'); diff --git a/lib/failover.js b/lib/failover.js index 5caa0c9..bcaeb99 100644 --- a/lib/failover.js +++ b/lib/failover.js @@ -7,11 +7,10 @@ var events = require('events'); var util = require('./util'); var connect = require('./connect'); +var url = require('url'); function Failover(servers, options){ - servers = servers || [{}]; - var defaults = { initialReconnectDelay: 10, maxReconnectDelay: 30000, @@ -22,6 +21,19 @@ function Failover(servers, options){ randomize: true }; + switch(typeof servers){ + case 'undefined': + servers = [{}]; // default server + break; + case 'string': + var uriConfig = this._parseFailoverUri(servers); + servers = uriConfig.servers; + options = util.extend(uriConfig.options, options); + break; + default: + break; + } + options = util.extend(defaults, options); for(var key in defaults){ @@ -37,20 +49,29 @@ function Failover(servers, options){ util.inherits(Failover, events.EventEmitter); -Failover.prototype._createConnector = function(options){ +Failover.prototype._createConnector = function(arg){ - var connector; + var config; - if(typeof options === 'function'){ - connector = options; - } - else{ - connector = function(callback){ - return connect(options, callback); - }; + switch(typeof arg){ + case 'function': + return arg; + + case 'string': + config = this._parseServerUri(arg); + break; + + case 'object': + config = arg; + break; + + default: + throw new Error('invalid type for server config argument'); } - return connector; + return function(callback){ + return connect(config, callback); + }; }; Failover.prototype.addServer = function(){ @@ -147,4 +168,104 @@ Failover.prototype.connect = function(callback){ connect(); }; +Failover.prototype._parseFailoverUri = function(uri){ + + var serverList = uri; + var optionsQueryString = null; + + var comps = uri.match(/^failover:\(([^\)]*)\)(\?.*)?$/); + + if(comps){ + serverList = comps[1]; + optionsQueryString = comps[2]; + } + + var servers = serverList.length > 0 ? serverList.split(',') : []; + var options = optionsQueryString ? options = url.parse(optionsQueryString, true).query : {}; + + var validateBool = function(name){ + + if(!options.hasOwnProperty(name)){ + return; + } + + var value = ('' + options[name]).toLowerCase(); + + if(value === 'true' || value === '1'){ + value = true; + } + else if(value === 'false' || value === '0'){ + value = false; + } + + if(typeof value !== 'boolean'){ + throw new Error('invalid ' + name + ' value \'' + options[name] + '\' (expected boolean)'); + } + + options[name] = value; + }; + + var validateFloat = function(name, lowest, greatest){ + + if(!options.hasOwnProperty(name)){ + return; + } + + var value = parseFloat(options[name], 10); + + if(isNaN(value) || value < lowest || value > greatest){ + throw new Error('invalid ' + name + ' value \'' + options[name] + '\' (expected number between ' + lowest + ' and ' + greatest + ')'); + } + + options[name] = value; + }; + + validateFloat('initialReconnectDelay', 0, Infinity); + validateFloat('maxReconnectDelay', 0, Infinity); + validateBool('useExponentialBackOff'); + validateFloat('reconnectDelayExponent', 0, Infinity); + validateFloat('maxReconnectAttempts', -1, Infinity); + validateFloat('maxReconnects', -1, Infinity); + validateBool('randomize'); + + return { + servers: servers, + options: options + }; +}; + +Failover.prototype._parseServerUri = function(uri){ + + var comps = uri.match(/^\s*((\w+):\/\/)?(([^:]+):([^@]+)@)?([\w-.]+)(:(\d+))?\s*$/); + + if(!comps){ + throw new Error('could not parse server uri \'' + uri + '\''); + } + + //var scheme = comps[2]; + var login = comps[4]; + var passcode = comps[5]; + var hostname = comps[6]; + var port = comps[8]; + + var server = { + host: hostname, + connectHeaders: {} + }; + + if(port !== void 0){ + server.port = parseInt(port, 10); + } + + if(login !== void 0){ + server.connectHeaders.login = login; + } + + if(passcode !== void 0){ + server.connectHeaders.passcode = passcode; + } + + return server; +}; + module.exports = Failover; diff --git a/package.json b/package.json index ae5f5b5..afa6241 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "stompit", "description": "STOMP client", - "version": "0.12.1", + "version": "0.13.0", "keywords": [ "stomp", "messages", "queue" ], diff --git a/test/failover.js b/test/failover.js index b24164f..e78f002 100644 --- a/test/failover.js +++ b/test/failover.js @@ -170,4 +170,135 @@ describe('Failover', function(){ }); }); }); + + describe("#_parseFailoverUri", function(){ + + var failover = new Failover([], {}); + var parse = failover._parseFailoverUri.bind(failover); + + it('should parse a simple uri', function(){ + var ret = parse('failover:(primary,secondary)'); + assert(typeof ret === 'object'); + assert(ret.servers.length === 2); + assert(ret.servers[0] === 'primary'); + assert(ret.servers[1] === 'secondary'); + }); + + it('should parse a server list', function(){ + var ret = parse('primary,secondary'); + assert(typeof ret === 'object'); + assert(ret.servers.length === 2); + assert(ret.servers[0] === 'primary'); + assert(ret.servers[1] === 'secondary'); + }); + + it('should parse query string', function(){ + var ret = parse('failover:(primary)?var1=val1&var2=val2'); + assert(typeof ret === 'object'); + assert(typeof ret.options === 'object'); + assert(ret.options.var1 === 'val1'); + assert(ret.options.var2 === 'val2'); + }); + + it('should accept an empty query string', function(){ + var ret = parse('failover:(primary)?'); + assert(ret.servers.length === 1 && ret.servers[0] === 'primary'); + }); + + it('should cast values of known options', function(){ + + var ret = parse('failover:(primary)?initialReconnectDelay=10&maxReconnectDelay=30000&useExponentialBackOff=true&maxReconnectAttempts=-1&maxReconnects=-1&randomize=true'); + + assert(ret.options.initialReconnectDelay === 10); + assert(ret.options.maxReconnectDelay === 30000); + assert(ret.options.useExponentialBackOff === true); + assert(ret.options.maxReconnectAttempts === -1); + assert(ret.options.maxReconnects === -1); + assert(ret.options.randomize === true); + + assert(parse('failover:(primary)?randomize=TRUE').options.randomize === true); + assert(parse('failover:(primary)?randomize=1').options.randomize === true); + + assert(parse('failover:(primary)?randomize=FALSE').options.randomize === false); + assert(parse('failover:(primary)?randomize=0').options.randomize === false); + }); + + it('should throw an error for invalid values of known options', function(){ + + var expectParseError = function(source){ + + var thrown = false; + + try{ + parse(source); + }catch(e){ + thrown = true; + } + + assert(thrown); + }; + + expectParseError('failover:(sasf)?initialReconnectDelay=zvxvsdf'); + expectParseError('failover:(sasf)?initialReconnectDelay=-2'); + + expectParseError('failover:(sasf)?maxReconnectDelay=asdf'); + expectParseError('failover:(sasf)?maxReconnectDelay=-34'); + + expectParseError('failover:(sasf)?useExponentialBackOff=asdf'); + expectParseError('failover:(sasf)?useExponentialBackOff=-34'); + + expectParseError('failover:(sasf)?maxReconnectAttempts=asdf'); + expectParseError('failover:(sasf)?maxReconnectAttempts=-34'); + + expectParseError('failover:(sasf)?maxReconnects=asdf'); + expectParseError('failover:(sasf)?maxReconnects=-34'); + + expectParseError('failover:(sasf)?randomize=asdf'); + }); + }); + + describe('#_parseServerUri', function(){ + + var failover = new Failover([], {}); + var parse = failover._parseServerUri.bind(failover); + + it('should parse a typical uri', function(){ + var ret = parse('tcp://localhost:61613'); + assert(typeof ret === 'object'); + assert(ret.host === 'localhost'); + assert(ret.port === 61613); + }); + + it('should parse without a scheme', function(){ + var ret = parse('localhost:1234'); + assert(typeof ret === 'object'); + assert(ret.host === 'localhost'); + assert(ret.port === 1234); + }); + + it('should parse without a port', function(){ + var ret = parse('localhost'); + assert(ret.host === 'localhost'); + assert(ret.port === void 0); + }); + + it('should parse login and passcode', function(){ + + var ret = parse('user:pass@localhost:123'); + assert(ret.connectHeaders.login === 'user'); + assert(ret.connectHeaders.passcode === 'pass'); + assert(ret.host === 'localhost'); + assert(ret.port === 123); + + ret = parse('tcp://user:pass@localhost'); + assert(ret.connectHeaders.login === 'user'); + assert(ret.connectHeaders.passcode === 'pass'); + assert(ret.host === 'localhost'); + assert(ret.port === void 0); + }); + + it('should ignore leading and trailing whitespace', function(){ + assert(parse(' localhost \t').host === 'localhost'); + }); + }); });