diff --git a/constants.js b/constants.js index b3a5d91..f364a83 100644 --- a/constants.js +++ b/constants.js @@ -39,6 +39,7 @@ protocol.RETAIN_MASK = 0x01 /* Length */ protocol.VARBYTEINT_MASK = 0x7F protocol.VARBYTEINT_FIN_MASK = 0x80 +protocol.VARBYTEINT_MAX = 268435455 /* Connack */ protocol.SESSIONPRESENT_MASK = 0x01 diff --git a/parser.js b/parser.js index 228e018..60b01f4 100644 --- a/parser.js +++ b/parser.js @@ -72,8 +72,6 @@ class Parser extends EventEmitter { if (result) { this.packet.length = result.value this._list.consume(result.bytes) - } else { - this._emitError(new Error('Invalid length')) } debug('_parseLength %d', result.value) return !!result @@ -542,6 +540,7 @@ class Parser extends EventEmitter { _parseVarByteNum (fullInfoFlag) { debug('_parseVarByteNum') + const maxBytes = 4 let bytes = 0 let mul = 1 let value = 0 @@ -549,7 +548,7 @@ class Parser extends EventEmitter { let current const padding = this._pos ? this._pos : 0 - while (bytes < 5) { + while (bytes < maxBytes) { current = this._list.readUInt8(padding + bytes++) value += mul * (current & constants.VARBYTEINT_MASK) mul *= 0x80 @@ -563,6 +562,10 @@ class Parser extends EventEmitter { } } + if (!result && bytes === maxBytes && this._list.length >= bytes) { + this._emitError(new Error('Invalid variable byte integer')) + } + if (padding) { this._pos += bytes } diff --git a/test.js b/test.js index 3fc7d0b..8b0dc07 100644 --- a/test.js +++ b/test.js @@ -1,3 +1,5 @@ +const util = require('util') + const test = require('tape') const mqtt = require('./') const WS = require('readable-stream').Writable @@ -32,7 +34,24 @@ function testParseGenerate (name, object, buffer, opts) { }) test(`${name} generate`, t => { - t.equal(mqtt.generate(object, opts).toString('hex'), buffer.toString('hex')) + // For really large buffers, the expanded hex string can be so long as to + // generate an error in nodejs 14.x, so only do the test with extra output + // for relatively small buffers. + const bigLength = 10000 + const generatedBuffer = mqtt.generate(object, opts) + if (generatedBuffer.length < bigLength && buffer.length < bigLength) { + t.equal(generatedBuffer.toString('hex'), buffer.toString('hex')) + } else { + const bufferOkay = generatedBuffer.equals(buffer) + if (bufferOkay) { + t.pass() + } else { + // Output abbreviated representations of the buffers. + t.comment('Expected:\n' + util.inspect(buffer)) + t.comment('Got:\n' + util.inspect(generatedBuffer)) + t.fail('Large buffers not equal') + } + } t.end() }) @@ -205,9 +224,26 @@ test('disabled numbers cache', t => { testGenerateError('Unknown command', {}) testParseError('Not supported', Buffer.from([0, 1, 0]), {}) -testParseError('Invalid length', Buffer.from( + +// Length header field +testParseError('Invalid variable byte integer', Buffer.from( [16, 255, 255, 255, 255] ), {}) +testParseError('Invalid variable byte integer', Buffer.from( + [16, 255, 255, 255, 128] +), {}) +testParseError('Invalid variable byte integer', Buffer.from( + [16, 255, 255, 255, 255, 1] +), {}) +testParseError('Invalid variable byte integer', Buffer.from( + [16, 255, 255, 255, 255, 127] +), {}) +testParseError('Invalid variable byte integer', Buffer.from( + [16, 255, 255, 255, 255, 128] +), {}) +testParseError('Invalid variable byte integer', Buffer.from( + [16, 255, 255, 255, 255, 255, 1] +), {}) testParseGenerate('minimal connect', { cmd: 'connect', @@ -1134,6 +1170,31 @@ testParseGenerate('publish MQTT 5 properties with 0-4 byte varbyte', { 116, 101, 115, 116 // Payload (test) ]), { protocolVersion: 5 }) +testParseGenerate('publish MQTT 5 properties with max value varbyte', { + cmd: 'publish', + retain: true, + qos: 2, + dup: true, + length: 22, + topic: 'test', + payload: Buffer.from('test'), + messageId: 10, + properties: { + payloadFormatIndicator: false, + subscriptionIdentifier: [1, 268435455] + } +}, Buffer.from([ + 61, 22, // Header + 0, 4, // Topic length + 116, 101, 115, 116, // Topic (test) + 0, 10, // Message ID + 9, // properties length + 1, 0, // payloadFormatIndicator + 11, 1, // subscriptionIdentifier + 11, 255, 255, 255, 127, // subscriptionIdentifier (max value) + 116, 101, 115, 116 // Payload (test) +]), { protocolVersion: 5 }) + ; (() => { const buffer = Buffer.alloc(2048) testParseGenerate('2KB publish packet', { @@ -1149,18 +1210,21 @@ testParseGenerate('publish MQTT 5 properties with 0-4 byte varbyte', { 0, 4, // Topic length 116, 101, 115, 116 // Topic (test) ]), buffer])) -})(); (() => { - const buffer = Buffer.alloc(2 * 1024 * 1024) - testParseGenerate('2MB publish packet', { +})() + +; (() => { + const maxLength = 268435455 + const buffer = Buffer.alloc(maxLength - 6) + testParseGenerate('Max payload publish packet', { cmd: 'publish', retain: false, qos: 0, dup: false, - length: 6 + 2 * 1024 * 1024, + length: maxLength, topic: 'test', payload: buffer }, Buffer.concat([Buffer.from([ - 48, 134, 128, 128, 1, // Header + 48, 255, 255, 255, 127, // Header 0, 4, // Topic length 116, 101, 115, 116 // Topic (test) ]), buffer])) @@ -1250,6 +1314,85 @@ test('splitted publish parse', t => { ])), 0, 'remaining bytes') }) +test('split publish longer', t => { + t.plan(3) + + const length = 255 + const topic = 'test' + // Minus two bytes for the topic length specifier + const payloadLength = length - topic.length - 2 + + const parser = mqtt.parser() + const expected = { + cmd: 'publish', + retain: false, + qos: 0, + dup: false, + length: length, + topic: topic, + payload: Buffer.from('a'.repeat(payloadLength)) + } + + parser.on('packet', packet => { + t.deepLooseEqual(packet, expected, 'expected packet') + }) + + t.equal(parser.parse(Buffer.from([ + 48, 255, 1, // Header + 0, topic.length, // Topic length + 116, 101, 115, 116 // Topic (test) + ])), 6, 'remaining bytes') + + t.equal(parser.parse(Buffer.from(Array(payloadLength).fill(97))), + 0, 'remaining bytes') +}) + +test('split length parse', t => { + t.plan(4) + + const length = 255 + const topic = 'test' + const payloadLength = length - topic.length - 2 + + const parser = mqtt.parser() + const expected = { + cmd: 'publish', + retain: false, + qos: 0, + dup: false, + length: length, + topic: topic, + payload: Buffer.from('a'.repeat(payloadLength)) + } + + parser.on('packet', packet => { + t.deepLooseEqual(packet, expected, 'expected packet') + }) + + t.equal(parser.parse(Buffer.from([ + 48, 255 // Header (partial length) + ])), 1, 'remaining bytes') + + t.equal(parser.parse(Buffer.from([ + 1, // Rest of header length + 0, topic.length, // Topic length + 116, 101, 115, 116 // Topic (test) + ])), 6, 'remaining bytes') + + t.equal(parser.parse(Buffer.from(Array(payloadLength).fill(97))), + 0, 'remaining bytes') +}) + +testGenerateError('Invalid variable byte integer: 268435456', { + cmd: 'publish', + retain: false, + qos: 0, + dup: false, + length: (268435455 + 1), + topic: 'test', + payload: Buffer.alloc(268435455 + 1 - 6) +}, {}, 'Length var byte integer over max allowed value throws error') + testGenerateError('Invalid subscriptionIdentifier: 268435456', { cmd: 'publish', retain: true, diff --git a/writeToStream.js b/writeToStream.js index 0a076f6..980294e 100644 --- a/writeToStream.js +++ b/writeToStream.js @@ -780,6 +780,11 @@ function auth (packet, stream, opts) { const varByteIntCache = {} function writeVarByteInt (stream, num) { + if (num > protocol.VARBYTEINT_MAX) { + stream.emit('error', new Error(`Invalid variable byte integer: ${num}`)) + return false + } + let buffer = varByteIntCache[num] if (!buffer) { @@ -787,7 +792,7 @@ function writeVarByteInt (stream, num) { if (num < 16384) varByteIntCache[num] = buffer } debug('writeVarByteInt: writing to stream: %o', buffer) - stream.write(buffer) + return stream.write(buffer) } /**