From 07fb2fd8fc03bf2b57243531eaf91f2d60f5e460 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Wed, 28 Feb 2018 20:57:30 -0500 Subject: [PATCH 1/2] Fix RLE encode/decode * bitpacking should work for any length of data, not just multiple of 8 (last packed is padded if less than 8) * Improve runs estimation - only start a new run if we are at a mod 8 === 0, otherwise use bitpacking --- lib/codec/rle.js | 64 ++++++++++++++++++++++------------------------- test/codec_rle.js | 30 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 34 deletions(-) diff --git a/lib/codec/rle.js b/lib/codec/rle.js index 947a14f1..04b4bf0e 100644 --- a/lib/codec/rle.js +++ b/lib/codec/rle.js @@ -1,8 +1,8 @@ const varint = require('varint') function encodeRunBitpacked(values, opts) { - if (values.length % 8 !== 0) { - throw 'must be a multiple of 8'; + for (let i = 0; i < values.length % 8; i++) { + values.push(0); } let buf = Buffer.alloc(Math.ceil(opts.bitWidth * (values.length / 8))); @@ -50,39 +50,35 @@ exports.encodeValues = function(type, values, opts) { } let buf = Buffer.alloc(0); - let runs = []; - for (let cur = 0; cur < values.length; cur += 8) { - let repeating = true; - for (let i = 1; i < 8; ++i) { - if (values[cur + i] !== values[cur]) { - repeating = false; + let run = []; + let repeats = 0; + + for (let i = 0; i < values.length; i++) { + // If we are at the beginning of a run and the next value is same we start + // collecting repeated values + if ( repeats === 0 && run.length % 8 === 0 && values[i] === values[i+1]) { + // If we have any data in runs we need to encode them + if (run.length) { + buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]); + run = []; } + repeats = 1; + } else if (repeats > 0 && values[i] === values[i-1]) { + repeats += 1; + } else { + // If values changes we need to post any previous repeated values + if (repeats) { + buf = Buffer.concat([buf, encodeRunRepeated(values[i-1], repeats, opts)]); + repeats = 0; + } + run.push(values[i]); } - - const append = - runs.length > 0 && - (runs[runs.length - 1][1] !== null) === repeating && - (!repeating || runs[runs.length - 1][1] === values[cur]); - - if (!append) { - runs.push([cur, repeating ? values[cur] : null]); - } - } - - for (let i = values.length - (values.length % 8); i < values.length; ++i) { - runs.push([i, values[i]]); } - for (let i = 0; i < runs.length; ++i) { - const begin = runs[i][0]; - const end = i < runs.length - 1 ? runs[i + 1][0] : values.length; - const rep = runs[i][1]; - - if (rep === null) { - buf = Buffer.concat([buf, encodeRunBitpacked(values.slice(begin, end), opts)]); - } else { - buf = Buffer.concat([buf, encodeRunRepeated(rep, end - begin, opts)]); - } + if (repeats) { + buf = Buffer.concat([buf, encodeRunRepeated(values[values.length-1], repeats, opts)]); + } else if (run.length) { + buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]); } if (opts.disableEnvelope) { @@ -94,7 +90,7 @@ exports.encodeValues = function(type, values, opts) { buf.copy(envelope, 4); return envelope; -} +}; function decodeRunBitpacked(cursor, count, opts) { if (count % 8 !== 0) { @@ -144,11 +140,11 @@ exports.decodeValues = function(type, cursor, count, opts) { values.push(...decodeRunRepeated(cursor, count, opts)); } } + values = values.slice(0,count); if (values.length !== count) { throw "invalid RLE encoding"; } return values; -} - +}; \ No newline at end of file diff --git a/test/codec_rle.js b/test/codec_rle.js index 183ca8ab..fe0be6fe 100644 --- a/test/codec_rle.js +++ b/test/codec_rle.js @@ -33,6 +33,36 @@ describe('ParquetCodec::RLE', function() { assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7]); }); + describe('number of values not a multiple of 8', function() { + it('should encode bitpacked values', function() { + let buf = parquet_codec_rle.encodeValues( + 'INT32', + [0, 1, 2, 3, 4, 5, 6, 7, 6, 5], + { + disableEnvelope: true, + bitWidth: 3 + }); + + assert.deepEqual(buf, new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00])); + }); + + it('should decode bitpacked values', function() { + let vals = parquet_codec_rle.decodeValues( + 'INT32', + { + buffer: new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]), + offset: 0, + }, + 10, + { + disableEnvelope: true, + bitWidth: 3 + }); + + assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 6, 5]); + }); + }); + it('should encode repeated values', function() { let buf = parquet_codec_rle.encodeValues( 'INT32', From 0c7948d4fa64acf76e481256422c6f4a6ba56815 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Wed, 28 Feb 2018 21:19:54 -0500 Subject: [PATCH 2/2] Add parquet-mr tests --- .travis.yml | 8 ++++ test/parquet-mr.js | 117 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 test/parquet-mr.js diff --git a/.travis.yml b/.travis.yml index ceefb41a..6b72b8f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,3 +9,11 @@ deploy: tags: true npm_api_key: secure: HK/tFvgj/TtYTJ3s2Bszc1/yJWvbSkLcfY3ki3GEuudMpfzcq134/2fbdZLb+B7Ukg31rdRVFCrSg8k6a1KhztkRr9SnMts5WO2ZGulmzNQ+XsBwdd0Bf7KYamAtqft5qBnSvh+ypBloQJQqq5qazb31971Fwvg5pdkYTQgCQxyIfZlH8nUbOxcYyl4w6Mvz5zsQp2c4OKOdq0FgeU3OqJ05i5lWL/CZWRO9L7+f0Uih5Jr9CuRzBUcVVxIopn1uOX1czug+OudIuUMLxbJwJt69ZpWdTbywLg6wVvA58ozbyialuEx8S1UaehsqHFj29JJWcOw+6TCi5+512DrBZMguiyTkjq5I5kmRcPNPY8dcqJUZUD6eDpKYQemFeg+6vKIvT3spK53VXNoEOIqAAiNTpmfY6JQ17S31gy1TqZldMtWr1HXf95LGlLC+czgMHPi1m6YiUgdDx5N7MFXumdOxiyHNdoitQFyyyS57RS7BG8/5ZMeKIXEfhQ9KU/D5L3KpgNCBmwVR72vF3nb89aVETrvNIbZEgc/cTdYWquezfPibGoGjWVJ4c38nd30s6rmoMBwoDwznaDg87ameoHUKSCSMx3uVXRZ5uR2C4SmTqVbWNKLXszL4iIW54EaLf3M+AYjoAb+EupaPMuEonJukdzkalp03RekYVeIY23U= + +sudo: required + +services: + - docker + +before_install: + - docker pull nathanhowell/parquet-tools \ No newline at end of file diff --git a/test/parquet-mr.js b/test/parquet-mr.js new file mode 100644 index 00000000..67024391 --- /dev/null +++ b/test/parquet-mr.js @@ -0,0 +1,117 @@ +'use strict'; +const chai = require('chai'); +const assert = chai.assert; +const parquet = require('../parquet.js'); +const child_process = require('child_process'); + +// helper function that runs parquet-tools dump inside a docker container and returns the stdout +async function readParquetMr(file) { + return new Promise( (resolve, reject) => { + const dockerCmd = `docker run -v \${PWD}:/home nathanhowell/parquet-tools dump --debug /home/${file}`; + child_process.exec(dockerCmd, (err, stdout, stderr) => { + if (err || stderr) { + reject(err || stderr); + } else { + resolve(stdout); + } + }); + }); +} + +describe('Parquet-mr', function() { + it('should read a simple parquetjs file', async function() { + var schema = new parquet.ParquetSchema({ + name: { type: 'UTF8' }, + quantity: { type: 'INT64' }, + price: { type: 'DOUBLE' }, + }); + + const rows = [ + { name: 'apples', quantity: 10, price: 2.6 }, + { name: 'oranges', quantity: 20, price: 2.7}, + { name: 'kiwi', price: 4.2, quantity: 4}, + ]; + + let writer = await parquet.ParquetWriter.openFile(schema, 'test-mr.parquet'); + + for (let row of rows) { + await writer.appendRow(row); + } + + await writer.close(); + + const result = await readParquetMr('test-mr.parquet'); + assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nname: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:51/51/1.00 VC:3 ENC:PLAIN,RLE\nquantity: INT64 UNCOMPRESSED DO:0 FPO:79 SZ:46/46/1.00 VC:3 ENC:PLAIN,RLE\nprice: DOUBLE UNCOMPRESSED DO:0 FPO:154 SZ:46/46/1.00 VC:3 ENC:PLAIN,RLE\n\n name TV=3 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:29 VC:3\n\n quantity TV=3 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:24 VC:3\n\n price TV=3 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:24 VC:3\n\nBINARY name \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:0 V:apples\nvalue 2: R:0 D:0 V:oranges\nvalue 3: R:0 D:0 V:kiwi\n\nINT64 quantity \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:0 V:10\nvalue 2: R:0 D:0 V:20\nvalue 3: R:0 D:0 V:4\n\nDOUBLE price \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:0 V:2.6\nvalue 2: R:0 D:0 V:2.7\nvalue 3: R:0 D:0 V:4.2\n'); + }); + + it('should read a nested field', async function() { + var schema = new parquet.ParquetSchema({ + fruit: { + fields: { + name: { type: 'UTF8'}, + quantity: { type: 'INT32'} + } + } + }); + + let writer = await parquet.ParquetWriter.openFile(schema, 'test2-mr.parquet'); + + await writer.appendRow({ + fruit: { + name: 'apple', + quantity: 9 + } + }); + + await writer.close(); + + const result = await readParquetMr('test2-mr.parquet'); + assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nfruit: \n.name: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:31/31/1.00 VC:1 ENC:PLAIN,RLE\n.quantity: INT32 UNCOMPRESSED DO:0 FPO:65 SZ:26/26/1.00 VC:1 ENC:PLAIN,RLE\n\n fruit.name TV=1 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:9 VC:1\n\n fruit.quantity TV=1 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:4 VC:1\n\nBINARY fruit.name \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 1 *** \nvalue 1: R:0 D:0 V:apple\n\nINT32 fruit.quantity \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 1 *** \nvalue 1: R:0 D:0 V:9\n'); + }); + + it('should read a parquetjs file with optional value', async function() { + var schema = new parquet.ParquetSchema({ + name: { type: 'UTF8', optional: true } + }); + + const rows = [ + { name: 'apples' }, + { name: 'oranges' }, + { name: 'kiwi' }, + ]; + + let writer = await parquet.ParquetWriter.openFile(schema, 'test3-mr.parquet'); + for (let row of rows) { + await writer.appendRow(row); + } + + await writer.close(); + + const result = await readParquetMr('test3-mr.parquet'); + assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nname: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:53/53/1.00 VC:3 ENC:PLAIN,RLE\n\n name TV=3 RL=0 DL=1\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:31 VC:3\n\nBINARY name \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:1 V:apples\nvalue 2: R:0 D:1 V:oranges\nvalue 3: R:0 D:1 V:kiwi\n'); + }); + + it('should read repeated fields', async function() { + const schema = new parquet.ParquetSchema({ + stock: { + repeated: true, + fields: { + warehouse: { type: 'UTF8' }, + } + } + }); + + let writer = await parquet.ParquetWriter.openFile(schema, 'test4-mr.parquet'); + + await writer.appendRow({ + stock: [ + {warehouse: 'Newark'} + ] + }); + + await writer.close(); + + const result = await readParquetMr('test4-mr.parquet'); + assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nstock: \n.warehouse: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:36/36/1.00 VC:1 ENC:PLAIN,RLE\n\n stock.warehouse TV=1 RL=1 DL=1\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:14 VC:1\n\nBINARY stock.warehouse \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 1 *** \nvalue 1: R:0 D:1 V:Newark\n') + }); +});