Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions lib/codec/rle.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const varint = require('varint')

function encodeRunBitpacked(values, opts) {
if (values.length % 8 !== 0) {
throw 'must be a multiple of 8';
for (let i = 0; i < values.length % 8; i++) {
values.push(0);
}

let buf = Buffer.alloc(Math.ceil(opts.bitWidth * (values.length / 8)));
Expand Down Expand Up @@ -50,39 +50,35 @@ exports.encodeValues = function(type, values, opts) {
}

let buf = Buffer.alloc(0);
let runs = [];
for (let cur = 0; cur < values.length; cur += 8) {
let repeating = true;
for (let i = 1; i < 8; ++i) {
if (values[cur + i] !== values[cur]) {
repeating = false;
let run = [];
let repeats = 0;

for (let i = 0; i < values.length; i++) {
// If we are at the beginning of a run and the next value is same we start
// collecting repeated values
if ( repeats === 0 && run.length % 8 === 0 && values[i] === values[i+1]) {
// If we have any data in runs we need to encode them
if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
run = [];
}
repeats = 1;
} else if (repeats > 0 && values[i] === values[i-1]) {
repeats += 1;
} else {
// If values changes we need to post any previous repeated values
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[i-1], repeats, opts)]);
repeats = 0;
}
run.push(values[i]);
}

const append =
runs.length > 0 &&
(runs[runs.length - 1][1] !== null) === repeating &&
(!repeating || runs[runs.length - 1][1] === values[cur]);

if (!append) {
runs.push([cur, repeating ? values[cur] : null]);
}
}

for (let i = values.length - (values.length % 8); i < values.length; ++i) {
runs.push([i, values[i]]);
}

for (let i = 0; i < runs.length; ++i) {
const begin = runs[i][0];
const end = i < runs.length - 1 ? runs[i + 1][0] : values.length;
const rep = runs[i][1];

if (rep === null) {
buf = Buffer.concat([buf, encodeRunBitpacked(values.slice(begin, end), opts)]);
} else {
buf = Buffer.concat([buf, encodeRunRepeated(rep, end - begin, opts)]);
}
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[values.length-1], repeats, opts)]);
} else if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
}

if (opts.disableEnvelope) {
Expand All @@ -94,7 +90,7 @@ exports.encodeValues = function(type, values, opts) {
buf.copy(envelope, 4);

return envelope;
}
};

function decodeRunBitpacked(cursor, count, opts) {
if (count % 8 !== 0) {
Expand Down Expand Up @@ -144,11 +140,11 @@ exports.decodeValues = function(type, cursor, count, opts) {
values.push(...decodeRunRepeated(cursor, count, opts));
}
}
values = values.slice(0,count);

if (values.length !== count) {
throw "invalid RLE encoding";
}

return values;
}

};
30 changes: 30 additions & 0 deletions test/codec_rle.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,36 @@ describe('ParquetCodec::RLE', function() {
assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7]);
});

describe('number of values not a multiple of 8', function() {
it('should encode bitpacked values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
[0, 1, 2, 3, 4, 5, 6, 7, 6, 5],
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(buf, new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]));
});

it('should decode bitpacked values', function() {
let vals = parquet_codec_rle.decodeValues(
'INT32',
{
buffer: new Buffer([0x05, 0x88, 0xc6, 0xfa, 0x2e, 0x00, 0x00]),
offset: 0,
},
10,
{
disableEnvelope: true,
bitWidth: 3
});

assert.deepEqual(vals, [0, 1, 2, 3, 4, 5, 6, 7, 6, 5]);
});
});

it('should encode repeated values', function() {
let buf = parquet_codec_rle.encodeValues(
'INT32',
Expand Down