Skip to content

Commit 70a67f1

Browse files
committed
Add statistics to pages and columns
Default for all columns unless `statistics: false` in the field definition
1 parent 366375a commit 70a67f1

7 files changed

Lines changed: 335 additions & 8 deletions

File tree

lib/reader.js

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const parquet_util = require('./util')
77
const parquet_schema = require('./schema')
88
const parquet_codec = require('./codec')
99
const parquet_compression = require('./compression')
10+
const parquet_types = require('./types');
1011

1112
/**
1213
* Parquet File Magic String
@@ -118,6 +119,18 @@ class ParquetReader {
118119
this.schema = new parquet_schema.ParquetSchema(
119120
decodeSchema(
120121
this.metadata.schema.splice(1)));
122+
123+
/* decode any statistics values */
124+
if (this.metadata.row_groups) {
125+
this.metadata.row_groups.forEach(row => row.columns.forEach( col => {
126+
const stats = col.meta_data.statistics;
127+
if (stats) {
128+
const field = this.schema.findField(col.meta_data.path_in_schema);
129+
stats.max_value = decodeStatisticsValue(stats.max_value, field);
130+
stats.min_value = decodeStatisticsValue(stats.min_value, field);
131+
}
132+
}));
133+
}
121134
}
122135

123136
/**
@@ -294,6 +307,38 @@ function decodeValues(type, encoding, cursor, count, opts) {
294307
return parquet_codec[encoding].decodeValues(type, cursor, count, opts);
295308
}
296309

310+
311+
function decodeStatisticsValue(value, column) {
312+
if (!value.length) {
313+
return undefined;
314+
}
315+
if (!column.primitiveType.includes('BYTE_ARRAY')) {
316+
value = decodeValues(column.primitiveType,'PLAIN',{buffer: Buffer.from(value), offset: 0}, 1, column);
317+
if (value.length === 1) value = value[0];
318+
}
319+
if (column.originalType) {
320+
value = parquet_types.fromPrimitive(column.originalType, value);
321+
}
322+
return value;
323+
}
324+
325+
function decodeStatistics(statistics, column) {
326+
if (!statistics) {
327+
return;
328+
}
329+
if (statistics.min_value !== null) {
330+
statistics.min_value = decodeStatisticsValue(statistics.min_value, column);
331+
}
332+
if (statistics.max_value !== null) {
333+
statistics.max_value = decodeStatisticsValue(statistics.max_value, column);
334+
}
335+
336+
statistics.min = statistics.min_value;
337+
statistics.max = statistics.max_value;
338+
339+
return statistics;
340+
}
341+
297342
function decodeDataPages(buffer, opts) {
298343
let cursor = {
299344
buffer: buffer,
@@ -310,19 +355,24 @@ function decodeDataPages(buffer, opts) {
310355

311356
while (cursor.offset < cursor.size) {
312357
const pageHeader = new parquet_thrift.PageHeader();
313-
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));
358+
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));
314359

315360
const pageType = parquet_util.getThriftEnum(
316361
parquet_thrift.PageType,
317362
pageHeader.type);
318363

319364
let pageData = null;
365+
320366
switch (pageType) {
321367
case 'DATA_PAGE':
368+
pageHeader.data_page_header.statistics = decodeStatistics(pageHeader.data_page_header.statistics, opts.column);
322369
pageData = decodeDataPage(cursor, pageHeader, opts);
370+
323371
break;
324372
case 'DATA_PAGE_V2':
373+
pageHeader.data_page_header_v2.statistics = decodeStatistics(pageHeader.data_page_header_v2.statistics, opts.column);
325374
pageData = decodeDataPageV2(cursor, pageHeader, opts);
375+
326376
break;
327377
default:
328378
throw "invalid page type: " + pageType;

lib/schema.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
108108
rLevelMax: rLevelMax,
109109
dLevelMax: dLevelMax,
110110
isNested: true,
111+
statistics: opts.statistics,
111112
fieldCount: Object.keys(opts.fields).length,
112113
fields: buildFields(
113114
opts.fields,
@@ -150,6 +151,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
150151
path: path.concat([name]),
151152
repetitionType: repetitionType,
152153
encoding: opts.encoding,
154+
statistics: opts.statistics,
153155
compression: opts.compression,
154156
typeLength: opts.typeLength || typeDef.typeLength,
155157
rLevelMax: rLevelMax,

lib/shred.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ exports.shredRecord = function(schema, record, buffer) {
3333
dlevels: [],
3434
rlevels: [],
3535
values: [],
36+
distinct_values: new Set(),
3637
count: 0
3738
};
3839
}
@@ -51,6 +52,7 @@ exports.shredRecord = function(schema, record, buffer) {
5152
dlevels: [],
5253
rlevels: [],
5354
values: [],
55+
distinct_values: new Set(),
5456
count: 0
5557
};
5658
buffer.pages[field.path] = [];
@@ -72,9 +74,11 @@ exports.shredRecord = function(schema, record, buffer) {
7274
buffer.columnData[field.path].values,
7375
recordShredded[field.path].values);
7476

77+
[...recordShredded[field.path].distinct_values].forEach(value => buffer.columnData[field.path].distinct_values.add(value));
78+
7579
buffer.columnData[field.path].count += recordShredded[field.path].count;
7680
}
77-
}
81+
};
7882

7983
function shredRecordInternal(fields, record, data, rlvl, dlvl) {
8084
for (let fieldName in fields) {
@@ -129,6 +133,7 @@ function shredRecordInternal(fields, record, data, rlvl, dlvl) {
129133
rlvl_i,
130134
field.dLevelMax);
131135
} else {
136+
data[field.path].distinct_values.add(values[i]);
132137
data[field.path].values.push(parquet_types.toPrimitive(fieldType, values[i]));
133138
data[field.path].rlevels.push(rlvl_i);
134139
data[field.path].dlevels.push(field.dLevelMax);

lib/util.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,23 @@
22
const fs = require('fs');
33
const thrift = require('thrift');
44

5+
6+
/** We need to use a patched version of TFramedTransport where
7+
* readString returns the original buffer instead of a string if the
8+
* buffer can not be safely encoded as utf8 (see http://bit.ly/2GXeZEF)
9+
*/
10+
11+
class fixedTFramedTransport extends thrift.TFramedTransport {
12+
readString(len) {
13+
this.ensureAvailable(len);
14+
var buffer = this.inBuf.slice(this.readPos, this.readPos + len);
15+
var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
16+
this.readPos += len;
17+
return (Buffer.from(str).equals(buffer)) ? str : buffer;
18+
}
19+
}
20+
21+
522
/**
623
* Helper function that serializes a thrift object into a buffer
724
*/
@@ -24,7 +41,7 @@ exports.decodeThrift = function(obj, buf, offset) {
2441
offset = 0;
2542
}
2643

27-
var transport = new thrift.TFramedTransport(buf);
44+
var transport = new fixedTFramedTransport(buf);
2845
transport.readPos = offset;
2946
var protocol = new thrift.TCompactProtocol(transport);
3047
obj.read(protocol);

lib/writer.js

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const parquet_shredder = require('./shred')
77
const parquet_util = require('./util')
88
const parquet_codec = require('./codec')
99
const parquet_compression = require('./compression')
10+
const parquet_types = require('./types');
1011

1112
/**
1213
* Parquet File Magic String
@@ -293,6 +294,30 @@ function encodeValues(type, encoding, values, opts) {
293294
return parquet_codec[encoding].encodeValues(type, values, opts);
294295
}
295296

297+
function encodeStatisticsValue(value, column) {
298+
if (value === undefined) {
299+
return new Buffer(0);
300+
}
301+
if (column.originalType) {
302+
value = parquet_types.toPrimitive(column.originalType,value);
303+
}
304+
if (column.primitiveType !== 'BYTE_ARRAY') {
305+
value = encodeValues(column.primitiveType,'PLAIN',[value],column);
306+
}
307+
return value;
308+
}
309+
310+
function encodeStatistics(statistics,column) {
311+
statistics = Object.assign({},statistics);
312+
statistics.min_value = encodeStatisticsValue(statistics.min_value, column);
313+
statistics.max_value = encodeStatisticsValue(statistics.max_value, column);
314+
315+
statistics.max = statistics.max_value;
316+
statistics.min = statistics.min_value;
317+
318+
return new parquet_thrift.Statistics(statistics);
319+
}
320+
296321
function encodePages(schema, rowBuffer, opts) {
297322
if (!rowBuffer.pageRowCount) {
298323
return;
@@ -305,6 +330,23 @@ function encodePages(schema, rowBuffer, opts) {
305330

306331
let page;
307332
const values = rowBuffer.columnData[field.path];
333+
334+
let statistics;
335+
336+
if (field.statistics !== false) {
337+
statistics = {};
338+
[...values.distinct_values].forEach( (v,i) => {
339+
if (i === 0 || v > statistics.max_value) {
340+
statistics.max_value = v;
341+
}
342+
if (i === 0 || v < statistics.min_value) {
343+
statistics.min_value = v;
344+
}
345+
});
346+
347+
statistics.null_count = values.count - values.values.length;
348+
statistics.distinct_count = values.distinct_values.size;
349+
}
308350

309351
if (opts.useDataPageV2) {
310352
page = encodeDataPageV2(
@@ -313,18 +355,27 @@ function encodePages(schema, rowBuffer, opts) {
313355
rowBuffer.pageRowCount,
314356
values.values,
315357
values.rlevels,
316-
values.dlevels);
358+
values.dlevels,
359+
statistics);
317360
} else {
318361
page = encodeDataPage(
319362
field,
320363
values.count,
321364
values.values,
322365
values.rlevels,
323-
values.dlevels);
366+
values.dlevels,
367+
statistics);
324368
}
325369

326-
rowBuffer.pages[field.path].push({page, count: values.values.length });
370+
rowBuffer.pages[field.path].push({
371+
page,
372+
statistics,
373+
distinct_values: values.distinct_values,
374+
count: values.values.length
375+
});
376+
327377

378+
values.distinct_values = new Set();
328379
values.values = [];
329380
values.rlevels = [];
330381
values.dlevels = [];
@@ -337,7 +388,7 @@ function encodePages(schema, rowBuffer, opts) {
337388
/**
338389
* Encode a parquet data page
339390
*/
340-
function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
391+
function encodeDataPage(column, valueCount, values, rlevels, dlevels, statistics) {
341392
/* encode values */
342393
let valuesBuf = encodeValues(
343394
column.primitiveType,
@@ -374,6 +425,9 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
374425
pageHeader.compressed_page_size = pageBody.length;
375426
pageHeader.data_page_header = new parquet_thrift.DataPageHeader();
376427
pageHeader.data_page_header.num_values = rlevels.length;
428+
if (column.statistics !== false) {
429+
pageHeader.data_page_header.statistics = encodeStatistics(statistics, column);
430+
}
377431

378432
pageHeader.data_page_header.encoding = parquet_thrift.Encoding[column.encoding];
379433
pageHeader.data_page_header.definition_level_encoding =
@@ -388,7 +442,7 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
388442
/**
389443
* Encode a parquet data page (v2)
390444
*/
391-
function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels) {
445+
function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels, statistics) {
392446
/* encode values */
393447
let valuesBuf = encodeValues(
394448
column.primitiveType,
@@ -433,6 +487,10 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels
433487
pageHeader.data_page_header_v2.num_nulls = valueCount - values.length;
434488
pageHeader.data_page_header_v2.num_rows = valueCount;
435489

490+
if (column.statistics !== false) {
491+
pageHeader.data_page_header_v2.statistics = encodeStatistics(statistics, column);
492+
}
493+
436494
pageHeader.uncompressed_page_size =
437495
rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length;
438496

@@ -477,6 +535,34 @@ function encodeColumnChunk(pages, opts) {
477535
metadata.codec = parquet_thrift.CompressionCodec[
478536
opts.useDataPageV2 ? opts.column.compression : 'UNCOMPRESSED'];
479537

538+
/* compile statistics */
539+
let statistics = {};
540+
let distinct_values = new Set();
541+
statistics.null_count = 0;
542+
statistics.distinct_count = 0;
543+
544+
545+
for (let i = 0; i < pages.length; i++) {
546+
let page = pages[i];
547+
548+
if (opts.column.statistics !== false) {
549+
550+
if (page.statistics.max_value > statistics.max_value || i == 0) {
551+
statistics.max_value = page.statistics.max_value;
552+
}
553+
if (page.statistics.min_value < statistics.min_value || i == 0) {
554+
statistics.min_value = page.statistics.min_value;
555+
}
556+
statistics.null_count += page.statistics.null_count;
557+
page.distinct_values.forEach(value => distinct_values.add(value));
558+
}
559+
}
560+
561+
if (opts.column.statistics !== false) {
562+
statistics.distinct_count = distinct_values.size;
563+
metadata.statistics = encodeStatistics(statistics, opts.column);
564+
}
565+
480566
/* list encodings */
481567
let encodingsSet = {};
482568
encodingsSet[PARQUET_RDLVL_ENCODING] = true;

test/integration.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,25 @@ async function verifyPages() {
151151
assert.equal(rowCount, column.column.meta_data.num_values);
152152
}
153153

154+
async function verifyStatistics() {
155+
const column = await sampleColumnHeaders();
156+
const colStats = column.column.meta_data.statistics;
157+
158+
assert.equal(colStats.max_value, 'oranges');
159+
assert.equal(colStats.min_value, 'apples');
160+
assert.equal(colStats.null_count, 0);
161+
assert.equal(colStats.distinct_count, 4);
162+
163+
column.pages.forEach( (d, i) => {
164+
let header = d.data_page_header || d.data_page_header_v2;
165+
let pageStats = header.statistics;
166+
assert.equal(pageStats.null_count,0);
167+
assert.equal(pageStats.distinct_count, 4);
168+
assert.equal(pageStats.max_value, 'oranges');
169+
assert.equal(pageStats.min_value, 'apples');
170+
});
171+
}
172+
154173
async function readTestFile() {
155174
let reader = await parquet.ParquetReader.openFile('fruits.parquet');
156175
assert.equal(reader.getRowCount(), TEST_NUM_ROWS * 4);
@@ -345,6 +364,10 @@ describe('Parquet', function() {
345364
it('verify that data is split into pages', function() {
346365
return verifyPages();
347366
});
367+
368+
it('verify statistics', function() {
369+
return verifyStatistics();
370+
});
348371
});
349372

350373
describe('with DataPageHeaderV2', function() {
@@ -362,6 +385,10 @@ describe('Parquet', function() {
362385
return verifyPages();
363386
});
364387

388+
it('verify statistics', function() {
389+
return verifyStatistics();
390+
});
391+
365392
it('write a test file with GZIP compression', function() {
366393
const opts = { useDataPageV2: true, compression: 'GZIP' };
367394
return writeTestFile(opts);

0 commit comments

Comments
 (0)