Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ deploy:
tags: true
npm_api_key:
secure: HK/tFvgj/TtYTJ3s2Bszc1/yJWvbSkLcfY3ki3GEuudMpfzcq134/2fbdZLb+B7Ukg31rdRVFCrSg8k6a1KhztkRr9SnMts5WO2ZGulmzNQ+XsBwdd0Bf7KYamAtqft5qBnSvh+ypBloQJQqq5qazb31971Fwvg5pdkYTQgCQxyIfZlH8nUbOxcYyl4w6Mvz5zsQp2c4OKOdq0FgeU3OqJ05i5lWL/CZWRO9L7+f0Uih5Jr9CuRzBUcVVxIopn1uOX1czug+OudIuUMLxbJwJt69ZpWdTbywLg6wVvA58ozbyialuEx8S1UaehsqHFj29JJWcOw+6TCi5+512DrBZMguiyTkjq5I5kmRcPNPY8dcqJUZUD6eDpKYQemFeg+6vKIvT3spK53VXNoEOIqAAiNTpmfY6JQ17S31gy1TqZldMtWr1HXf95LGlLC+czgMHPi1m6YiUgdDx5N7MFXumdOxiyHNdoitQFyyyS57RS7BG8/5ZMeKIXEfhQ9KU/D5L3KpgNCBmwVR72vF3nb89aVETrvNIbZEgc/cTdYWquezfPibGoGjWVJ4c38nd30s6rmoMBwoDwznaDg87ameoHUKSCSMx3uVXRZ5uR2C4SmTqVbWNKLXszL4iIW54EaLf3M+AYjoAb+EupaPMuEonJukdzkalp03RekYVeIY23U=

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sudo: required

services:
- docker

before_install:
- docker pull nathanhowell/parquet-tools
64 changes: 30 additions & 34 deletions lib/codec/rle.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const varint = require('varint')

function encodeRunBitpacked(values, opts) {
if (values.length % 8 !== 0) {
throw 'must be a multiple of 8';
for (let i = 0; i < values.length % 8; i++) {
values.push(0);
}

let buf = Buffer.alloc(Math.ceil(opts.bitWidth * (values.length / 8)));
Expand Down Expand Up @@ -50,39 +50,35 @@ exports.encodeValues = function(type, values, opts) {
}

let buf = Buffer.alloc(0);
let runs = [];
for (let cur = 0; cur < values.length; cur += 8) {
let repeating = true;
for (let i = 1; i < 8; ++i) {
if (values[cur + i] !== values[cur]) {
repeating = false;
let run = [];
let repeats = 0;

for (let i = 0; i < values.length; i++) {
// If we are at the beginning of a run and the next value is same we start
// collecting repeated values
if ( repeats === 0 && run.length % 8 === 0 && values[i] === values[i+1]) {
// If we have any data in runs we need to encode them
if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
run = [];
}
repeats = 1;
} else if (repeats > 0 && values[i] === values[i-1]) {
repeats += 1;
} else {
// If values changes we need to post any previous repeated values
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[i-1], repeats, opts)]);
repeats = 0;
}
run.push(values[i]);
}

const append =
runs.length > 0 &&
(runs[runs.length - 1][1] !== null) === repeating &&
(!repeating || runs[runs.length - 1][1] === values[cur]);

if (!append) {
runs.push([cur, repeating ? values[cur] : null]);
}
}

for (let i = values.length - (values.length % 8); i < values.length; ++i) {
runs.push([i, values[i]]);
}

for (let i = 0; i < runs.length; ++i) {
const begin = runs[i][0];
const end = i < runs.length - 1 ? runs[i + 1][0] : values.length;
const rep = runs[i][1];

if (rep === null) {
buf = Buffer.concat([buf, encodeRunBitpacked(values.slice(begin, end), opts)]);
} else {
buf = Buffer.concat([buf, encodeRunRepeated(rep, end - begin, opts)]);
}
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[values.length-1], repeats, opts)]);
} else if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
}

if (opts.disableEnvelope) {
Expand All @@ -94,7 +90,7 @@ exports.encodeValues = function(type, values, opts) {
buf.copy(envelope, 4);

return envelope;
}
};

function decodeRunBitpacked(cursor, count, opts) {
if (count % 8 !== 0) {
Expand Down Expand Up @@ -144,11 +140,11 @@ exports.decodeValues = function(type, cursor, count, opts) {
values.push(...decodeRunRepeated(cursor, count, opts));
}
}
values = values.slice(0,count);

if (values.length !== count) {
throw "invalid RLE encoding";
}

return values;
}

};
30 changes: 30 additions & 0 deletions test/codec_rle.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ describe('ParquetCodec::RLE', function() {
assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7]);
});

describe('number of values not a multiple of 8', function() {
it('should encode bitpacked values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
[0, 1, 2, 3, 4, 5, 6, 7, 6, 5],
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(buf, new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]));
});

it('should decode bitpacked values', function() {
let vals = parquet_codec_rle.decodeValues(
'INT32',
{
buffer: new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]),
offset: 0,
},
10,
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 6, 5]);
});
});

it('should encode repeated values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
Expand Down
117 changes: 117 additions & 0 deletions test/parquet-mr.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
'use strict';
const chai = require('chai');
const assert = chai.assert;
const parquet = require('../parquet.js');
const child_process = require('child_process');

// helper function that runs parquet-tools dump inside a docker container and returns the stdout
async function readParquetMr(file) {
return new Promise( (resolve, reject) => {
const dockerCmd = `docker run -v \${PWD}:/home nathanhowell/parquet-tools dump --debug /home/${file}`;
child_process.exec(dockerCmd, (err, stdout, stderr) => {
if (err || stderr) {
reject(err || stderr);
} else {
resolve(stdout);
}
});
});
}

describe('Parquet-mr', function() {
it('should read a simple parquetjs file', async function() {
var schema = new parquet.ParquetSchema({
name: { type: 'UTF8' },
quantity: { type: 'INT64' },
price: { type: 'DOUBLE' },
});

const rows = [
{ name: 'apples', quantity: 10, price: 2.6 },
{ name: 'oranges', quantity: 20, price: 2.7},
{ name: 'kiwi', price: 4.2, quantity: 4},
];

let writer = await parquet.ParquetWriter.openFile(schema, 'test-mr.parquet');

for (let row of rows) {
await writer.appendRow(row);
}

await writer.close();

const result = await readParquetMr('test-mr.parquet');
assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nname: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:51/51/1.00 VC:3 ENC:PLAIN,RLE\nquantity: INT64 UNCOMPRESSED DO:0 FPO:79 SZ:46/46/1.00 VC:3 ENC:PLAIN,RLE\nprice: DOUBLE UNCOMPRESSED DO:0 FPO:154 SZ:46/46/1.00 VC:3 ENC:PLAIN,RLE\n\n name TV=3 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:29 VC:3\n\n quantity TV=3 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:24 VC:3\n\n price TV=3 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:24 VC:3\n\nBINARY name \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:0 V:apples\nvalue 2: R:0 D:0 V:oranges\nvalue 3: R:0 D:0 V:kiwi\n\nINT64 quantity \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:0 V:10\nvalue 2: R:0 D:0 V:20\nvalue 3: R:0 D:0 V:4\n\nDOUBLE price \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:0 V:2.6\nvalue 2: R:0 D:0 V:2.7\nvalue 3: R:0 D:0 V:4.2\n');
});

it('should read a nested field', async function() {
var schema = new parquet.ParquetSchema({
fruit: {
fields: {
name: { type: 'UTF8'},
quantity: { type: 'INT32'}
}
}
});

let writer = await parquet.ParquetWriter.openFile(schema, 'test2-mr.parquet');

await writer.appendRow({
fruit: {
name: 'apple',
quantity: 9
}
});

await writer.close();

const result = await readParquetMr('test2-mr.parquet');
assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nfruit: \n.name: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:31/31/1.00 VC:1 ENC:PLAIN,RLE\n.quantity: INT32 UNCOMPRESSED DO:0 FPO:65 SZ:26/26/1.00 VC:1 ENC:PLAIN,RLE\n\n fruit.name TV=1 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:9 VC:1\n\n fruit.quantity TV=1 RL=0 DL=0\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:4 VC:1\n\nBINARY fruit.name \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 1 *** \nvalue 1: R:0 D:0 V:apple\n\nINT32 fruit.quantity \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 1 *** \nvalue 1: R:0 D:0 V:9\n');
});

it('should read a parquetjs file with optional value', async function() {
var schema = new parquet.ParquetSchema({
name: { type: 'UTF8', optional: true }
});

const rows = [
{ name: 'apples' },
{ name: 'oranges' },
{ name: 'kiwi' },
];

let writer = await parquet.ParquetWriter.openFile(schema, 'test3-mr.parquet');
for (let row of rows) {
await writer.appendRow(row);
}

await writer.close();

const result = await readParquetMr('test3-mr.parquet');
assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nname: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:53/53/1.00 VC:3 ENC:PLAIN,RLE\n\n name TV=3 RL=0 DL=1\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:31 VC:3\n\nBINARY name \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 3 *** \nvalue 1: R:0 D:1 V:apples\nvalue 2: R:0 D:1 V:oranges\nvalue 3: R:0 D:1 V:kiwi\n');
});

it('should read repeated fields', async function() {
const schema = new parquet.ParquetSchema({
stock: {
repeated: true,
fields: {
warehouse: { type: 'UTF8' },
}
}
});

let writer = await parquet.ParquetWriter.openFile(schema, 'test4-mr.parquet');

await writer.appendRow({
stock: [
{warehouse: 'Newark'}
]
});

await writer.close();

const result = await readParquetMr('test4-mr.parquet');
assert.equal(result,'row group 0 \n--------------------------------------------------------------------------------\nstock: \n.warehouse: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:36/36/1.00 VC:1 ENC:PLAIN,RLE\n\n stock.warehouse TV=1 RL=1 DL=1\n ----------------------------------------------------------------------------\n page 0: DLE:RLE RLE:RLE VLE:PLAIN ST:[no stats for this column] SZ:14 VC:1\n\nBINARY stock.warehouse \n--------------------------------------------------------------------------------\n*** row group 1 of 1, values 1 to 1 *** \nvalue 1: R:0 D:1 V:Newark\n')
});
});