Skip to content

Fix: error handling for Variable Byte Integer #95 #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 19, 2020
1 change: 1 addition & 0 deletions constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ protocol.RETAIN_MASK = 0x01
/* Length */
protocol.VARBYTEINT_MASK = 0x7F
protocol.VARBYTEINT_FIN_MASK = 0x80
protocol.VARBYTEINT_MAX = 268435455

/* Connack */
protocol.SESSIONPRESENT_MASK = 0x01
Expand Down
9 changes: 6 additions & 3 deletions parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class Parser extends EventEmitter {
if (result) {
this.packet.length = result.value
this._list.consume(result.bytes)
} else {
this._emitError(new Error('Invalid length'))
}
debug('_parseLength %d', result.value)
return !!result
Expand Down Expand Up @@ -542,14 +540,15 @@ class Parser extends EventEmitter {

_parseVarByteNum (fullInfoFlag) {
debug('_parseVarByteNum')
const maxBytes = 4
let bytes = 0
let mul = 1
let value = 0
let result = false
let current
const padding = this._pos ? this._pos : 0

while (bytes < 5) {
while (bytes < maxBytes) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be a less than or equal for maxBytes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the previous ' bytes < 5' was incorrect.

The maximum size of a var byte int is 4 bytes, so 4 bytes need to be examined in the loop.
The bytes variable starts at 0, so to examine 4 bytes, the loop has to be over 0, 1, 2, 3.
If it doesn't meet the ((current & constants.VARBYTEINT_FIN_MASK) === 0) condition for any of those four bytes, then that indicates that it has exceeded the maximum size, i.e. that bit being non-zero means that it is expecting more length bytes, but we are already at the maximum at for bytes === 3.

Here are a couple of tests that demonstrate that buggy aspect present in v6.6.0, failing to emit the required error there:

testParseError('Invalid length', Buffer.from(
  [16, 255, 255, 255, 255, 1]
), {})
testParseError('Invalid length', Buffer.from(
  [16, 255, 255, 255, 255, 127]
), {})

While the following tests do pass in v6.6.0:

testParseError('Invalid length', Buffer.from(
  [16, 255, 255, 255, 255, 128]
), {})
testParseError('Invalid length', Buffer.from(
  [16, 255, 255, 255, 255, 255, 1]
), {})

(Note that the error message is different between v6.6.0 and my changes, i.e. 'Invalid length' vs 'Invalid variable byte integer'.)

With the updated error message, the following tests all pass with my changes:

testParseError('Invalid variable byte integer', Buffer.from(
  [16, 255, 255, 255, 255]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
  [16, 255, 255, 255, 128]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
  [16, 255, 255, 255, 255, 1]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
  [16, 255, 255, 255, 255, 127]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
  [16, 255, 255, 255, 255, 128]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
  [16, 255, 255, 255, 255, 255, 1]
), {})

That seems to be good evidence that my version has the correct while condition and that the previous one was another bug in v6.6.0.

current = this._list.readUInt8(padding + bytes++)
value += mul * (current & constants.VARBYTEINT_MASK)
mul *= 0x80
Expand All @@ -563,6 +562,10 @@ class Parser extends EventEmitter {
}
}

if (!result && bytes === maxBytes && this._list.length >= bytes) {
this._emitError(new Error('Invalid variable byte integer'))
}

if (padding) {
this._pos += bytes
}
Expand Down
157 changes: 150 additions & 7 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const util = require('util')

const test = require('tape')
const mqtt = require('./')
const WS = require('readable-stream').Writable
Expand Down Expand Up @@ -32,7 +34,24 @@ function testParseGenerate (name, object, buffer, opts) {
})

test(`${name} generate`, t => {
t.equal(mqtt.generate(object, opts).toString('hex'), buffer.toString('hex'))
// For really large buffers, the expanded hex string can be so long as to
// generate an error in nodejs 14.x, so only do the test with extra output
// for relatively small buffers.
const bigLength = 10000
const generatedBuffer = mqtt.generate(object, opts)
if (generatedBuffer.length < bigLength && buffer.length < bigLength) {
t.equal(generatedBuffer.toString('hex'), buffer.toString('hex'))
} else {
const bufferOkay = generatedBuffer.equals(buffer)
if (bufferOkay) {
t.pass()
} else {
// Output abbreviated representations of the buffers.
t.comment('Expected:\n' + util.inspect(buffer))
t.comment('Got:\n' + util.inspect(generatedBuffer))
t.fail('Large buffers not equal')
}
}
t.end()
})

Expand Down Expand Up @@ -205,9 +224,26 @@ test('disabled numbers cache', t => {
testGenerateError('Unknown command', {})

testParseError('Not supported', Buffer.from([0, 1, 0]), {})
testParseError('Invalid length', Buffer.from(

// Length header field
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 128]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 1]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 127]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 128]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 255, 1]
), {})

testParseGenerate('minimal connect', {
cmd: 'connect',
Expand Down Expand Up @@ -1134,6 +1170,31 @@ testParseGenerate('publish MQTT 5 properties with 0-4 byte varbyte', {
116, 101, 115, 116 // Payload (test)
]), { protocolVersion: 5 })

testParseGenerate('publish MQTT 5 properties with max value varbyte', {
cmd: 'publish',
retain: true,
qos: 2,
dup: true,
length: 22,
topic: 'test',
payload: Buffer.from('test'),
messageId: 10,
properties: {
payloadFormatIndicator: false,
subscriptionIdentifier: [1, 268435455]
}
}, Buffer.from([
61, 22, // Header
0, 4, // Topic length
116, 101, 115, 116, // Topic (test)
0, 10, // Message ID
9, // properties length
1, 0, // payloadFormatIndicator
11, 1, // subscriptionIdentifier
11, 255, 255, 255, 127, // subscriptionIdentifier (max value)
116, 101, 115, 116 // Payload (test)
]), { protocolVersion: 5 })

; (() => {
const buffer = Buffer.alloc(2048)
testParseGenerate('2KB publish packet', {
Expand All @@ -1149,18 +1210,21 @@ testParseGenerate('publish MQTT 5 properties with 0-4 byte varbyte', {
0, 4, // Topic length
116, 101, 115, 116 // Topic (test)
]), buffer]))
})(); (() => {
const buffer = Buffer.alloc(2 * 1024 * 1024)
testParseGenerate('2MB publish packet', {
})()

; (() => {
const maxLength = 268435455
const buffer = Buffer.alloc(maxLength - 6)
testParseGenerate('Max payload publish packet', {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: 6 + 2 * 1024 * 1024,
length: maxLength,
topic: 'test',
payload: buffer
}, Buffer.concat([Buffer.from([
48, 134, 128, 128, 1, // Header
48, 255, 255, 255, 127, // Header
0, 4, // Topic length
116, 101, 115, 116 // Topic (test)
]), buffer]))
Expand Down Expand Up @@ -1250,6 +1314,85 @@ test('splitted publish parse', t => {
])), 0, 'remaining bytes')
})

test('split publish longer', t => {
t.plan(3)

const length = 255
const topic = 'test'
// Minus two bytes for the topic length specifier
const payloadLength = length - topic.length - 2

const parser = mqtt.parser()
const expected = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: length,
topic: topic,
payload: Buffer.from('a'.repeat(payloadLength))
}

parser.on('packet', packet => {
t.deepLooseEqual(packet, expected, 'expected packet')
})

t.equal(parser.parse(Buffer.from([
48, 255, 1, // Header
0, topic.length, // Topic length
116, 101, 115, 116 // Topic (test)
])), 6, 'remaining bytes')

t.equal(parser.parse(Buffer.from(Array(payloadLength).fill(97))),
0, 'remaining bytes')
})

test('split length parse', t => {
t.plan(4)

const length = 255
const topic = 'test'
const payloadLength = length - topic.length - 2

const parser = mqtt.parser()
const expected = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: length,
topic: topic,
payload: Buffer.from('a'.repeat(payloadLength))
}

parser.on('packet', packet => {
t.deepLooseEqual(packet, expected, 'expected packet')
})

t.equal(parser.parse(Buffer.from([
48, 255 // Header (partial length)
])), 1, 'remaining bytes')

t.equal(parser.parse(Buffer.from([
1, // Rest of header length
0, topic.length, // Topic length
116, 101, 115, 116 // Topic (test)
])), 6, 'remaining bytes')

t.equal(parser.parse(Buffer.from(Array(payloadLength).fill(97))),
0, 'remaining bytes')
})

testGenerateError('Invalid variable byte integer: 268435456', {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: (268435455 + 1),
topic: 'test',
payload: Buffer.alloc(268435455 + 1 - 6)
}, {}, 'Length var byte integer over max allowed value throws error')

testGenerateError('Invalid subscriptionIdentifier: 268435456', {
cmd: 'publish',
retain: true,
Expand Down
7 changes: 6 additions & 1 deletion writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -780,14 +780,19 @@ function auth (packet, stream, opts) {

const varByteIntCache = {}
function writeVarByteInt (stream, num) {
if (num > protocol.VARBYTEINT_MAX) {
stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))
return false
}

let buffer = varByteIntCache[num]

if (!buffer) {
buffer = genBufVariableByteInt(num)
if (num < 16384) varByteIntCache[num] = buffer
}
debug('writeVarByteInt: writing to stream: %o', buffer)
stream.write(buffer)
return stream.write(buffer)
}

/**
Expand Down