Skip to content

Commit e6a3cfa

Browse files
committed
Add statistics to pages and columns
Default for all columns unless `statistics: false` in the field definition
1 parent 366375a commit e6a3cfa

File tree

6 files changed

+174
-8
lines changed

6 files changed

+174
-8
lines changed

lib/reader.js

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const parquet_util = require('./util')
77
const parquet_schema = require('./schema')
88
const parquet_codec = require('./codec')
99
const parquet_compression = require('./compression')
10+
const parquet_types = require('./types');
1011

1112
/**
1213
* Parquet File Magic String
@@ -294,6 +295,32 @@ function decodeValues(type, encoding, cursor, count, opts) {
294295
return parquet_codec[encoding].decodeValues(type, cursor, count, opts);
295296
}
296297

298+
299+
function decodeStatisticsValue(value, column) {
300+
if (column.primitiveType !== 'BYTE_ARRAY') {
301+
value = decodeValues(column.primitiveType,'PLAIN',{buffer: Buffer.from(value), offset: 0}, 1, column);
302+
if (value.length === 1) value = value[0];
303+
304+
}
305+
if (column.originalType) {
306+
value = parquet_types.fromPrimitive(column.originalType, value);
307+
}
308+
return value;
309+
}
310+
311+
function decodeStatistics(statistics, column) {
312+
if (!statistics) {
313+
return;
314+
}
315+
statistics.min_value = decodeStatisticsValue(statistics.min_value, column);
316+
statistics.max_value = decodeStatisticsValue(statistics.max_value, column);
317+
318+
statistics.min = statistics.min_value;
319+
statistics.max = statistics.max_value;
320+
321+
return statistics;
322+
}
323+
297324
function decodeDataPages(buffer, opts) {
298325
let cursor = {
299326
buffer: buffer,
@@ -310,19 +337,24 @@ function decodeDataPages(buffer, opts) {
310337

311338
while (cursor.offset < cursor.size) {
312339
const pageHeader = new parquet_thrift.PageHeader();
313-
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));
340+
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));
314341

315342
const pageType = parquet_util.getThriftEnum(
316343
parquet_thrift.PageType,
317344
pageHeader.type);
318345

319346
let pageData = null;
347+
320348
switch (pageType) {
321349
case 'DATA_PAGE':
350+
pageHeader.data_page_header.statistics = decodeStatistics(pageHeader.data_page_header.statistics, opts.column);
322351
pageData = decodeDataPage(cursor, pageHeader, opts);
352+
323353
break;
324354
case 'DATA_PAGE_V2':
355+
pageHeader.data_page_header_v2.statistics = decodeStatistics(pageHeader.data_page_header_v2.statistics, opts.column);
325356
pageData = decodeDataPageV2(cursor, pageHeader, opts);
357+
326358
break;
327359
default:
328360
throw "invalid page type: " + pageType;

lib/schema.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
108108
rLevelMax: rLevelMax,
109109
dLevelMax: dLevelMax,
110110
isNested: true,
111+
statistics: opts.statistics,
111112
fieldCount: Object.keys(opts.fields).length,
112113
fields: buildFields(
113114
opts.fields,
@@ -150,6 +151,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
150151
path: path.concat([name]),
151152
repetitionType: repetitionType,
152153
encoding: opts.encoding,
154+
statistics: opts.statistics,
153155
compression: opts.compression,
154156
typeLength: opts.typeLength || typeDef.typeLength,
155157
rLevelMax: rLevelMax,

lib/shred.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ exports.shredRecord = function(schema, record, buffer) {
3333
dlevels: [],
3434
rlevels: [],
3535
values: [],
36+
distinct_values: new Set(),
3637
count: 0
3738
};
3839
}
@@ -51,6 +52,7 @@ exports.shredRecord = function(schema, record, buffer) {
5152
dlevels: [],
5253
rlevels: [],
5354
values: [],
55+
distinct_values: new Set(),
5456
count: 0
5557
};
5658
buffer.pages[field.path] = [];
@@ -72,9 +74,11 @@ exports.shredRecord = function(schema, record, buffer) {
7274
buffer.columnData[field.path].values,
7375
recordShredded[field.path].values);
7476

77+
[...recordShredded[field.path].distinct_values].forEach(value => buffer.columnData[field.path].distinct_values.add(value));
78+
7579
buffer.columnData[field.path].count += recordShredded[field.path].count;
7680
}
77-
}
81+
};
7882

7983
function shredRecordInternal(fields, record, data, rlvl, dlvl) {
8084
for (let fieldName in fields) {
@@ -129,6 +133,7 @@ function shredRecordInternal(fields, record, data, rlvl, dlvl) {
129133
rlvl_i,
130134
field.dLevelMax);
131135
} else {
136+
data[field.path].distinct_values.add(values[i]);
132137
data[field.path].values.push(parquet_types.toPrimitive(fieldType, values[i]));
133138
data[field.path].rlevels.push(rlvl_i);
134139
data[field.path].dlevels.push(field.dLevelMax);

lib/util.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,23 @@
22
const fs = require('fs');
33
const thrift = require('thrift');
44

5+
6+
/** We need to use a patched version of TFramedTransport where
7+
* readString returns the original buffer instead of a string if the
8+
* buffer can not be safely encoded as utf8 (see http://bit.ly/2GXeZEF)
9+
*/
10+
11+
class fixedTFramedTransport extends thrift.TFramedTransport {
12+
readString(len) {
13+
this.ensureAvailable(len);
14+
var buffer = this.inBuf.slice(this.readPos, this.readPos + len);
15+
var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
16+
this.readPos += len;
17+
return (Buffer.from(str).equals(buffer)) ? str : buffer;
18+
}
19+
}
20+
21+
522
/**
623
* Helper function that serializes a thrift object into a buffer
724
*/
@@ -24,7 +41,7 @@ exports.decodeThrift = function(obj, buf, offset) {
2441
offset = 0;
2542
}
2643

27-
var transport = new thrift.TFramedTransport(buf);
44+
var transport = new fixedTFramedTransport(buf);
2845
transport.readPos = offset;
2946
var protocol = new thrift.TCompactProtocol(transport);
3047
obj.read(protocol);

lib/writer.js

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const parquet_shredder = require('./shred')
77
const parquet_util = require('./util')
88
const parquet_codec = require('./codec')
99
const parquet_compression = require('./compression')
10+
const parquet_types = require('./types');
1011

1112
/**
1213
* Parquet File Magic String
@@ -293,6 +294,27 @@ function encodeValues(type, encoding, values, opts) {
293294
return parquet_codec[encoding].encodeValues(type, values, opts);
294295
}
295296

297+
function encodeStatisticsValue(value, column) {
298+
if (column.originalType) {
299+
value = parquet_types.toPrimitive(column.originalType,value);
300+
}
301+
if (column.primitiveType !== 'BYTE_ARRAY') {
302+
value = encodeValues(column.primitiveType,'PLAIN',[value],column);
303+
}
304+
return value;
305+
}
306+
307+
function encodeStatistics(statistics,column) {
308+
statistics = Object.assign({},statistics);
309+
statistics.min_value = encodeStatisticsValue(statistics.min_value, column);
310+
statistics.max_value = encodeStatisticsValue(statistics.max_value, column);
311+
312+
statistics.max = statistics.max_value;
313+
statistics.min = statistics.min_value;
314+
315+
return new parquet_thrift.Statistics(statistics);
316+
}
317+
296318
function encodePages(schema, rowBuffer, opts) {
297319
if (!rowBuffer.pageRowCount) {
298320
return;
@@ -305,6 +327,23 @@ function encodePages(schema, rowBuffer, opts) {
305327

306328
let page;
307329
const values = rowBuffer.columnData[field.path];
330+
331+
let statistics;
332+
333+
if (field.statistics !== false) {
334+
statistics = {};
335+
[...values.distinct_values].forEach( (v,i) => {
336+
if (i === 0 || v > statistics.max_value) {
337+
statistics.max_value = v;
338+
}
339+
if (i === 0 || v < statistics.min_value) {
340+
statistics.min_value = v;
341+
}
342+
});
343+
344+
statistics.null_count = values.count - values.values.length;
345+
statistics.distinct_count = values.distinct_values.size;
346+
}
308347

309348
if (opts.useDataPageV2) {
310349
page = encodeDataPageV2(
@@ -313,18 +352,27 @@ function encodePages(schema, rowBuffer, opts) {
313352
rowBuffer.pageRowCount,
314353
values.values,
315354
values.rlevels,
316-
values.dlevels);
355+
values.dlevels,
356+
statistics);
317357
} else {
318358
page = encodeDataPage(
319359
field,
320360
values.count,
321361
values.values,
322362
values.rlevels,
323-
values.dlevels);
363+
values.dlevels,
364+
statistics);
324365
}
325366

326-
rowBuffer.pages[field.path].push({page, count: values.values.length });
367+
rowBuffer.pages[field.path].push({
368+
page,
369+
statistics,
370+
distinct_values: values.distinct_values,
371+
count: values.values.length
372+
});
373+
327374

375+
values.distinct_values = new Set();
328376
values.values = [];
329377
values.rlevels = [];
330378
values.dlevels = [];
@@ -337,7 +385,7 @@ function encodePages(schema, rowBuffer, opts) {
337385
/**
338386
* Encode a parquet data page
339387
*/
340-
function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
388+
function encodeDataPage(column, valueCount, values, rlevels, dlevels, statistics) {
341389
/* encode values */
342390
let valuesBuf = encodeValues(
343391
column.primitiveType,
@@ -374,6 +422,9 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
374422
pageHeader.compressed_page_size = pageBody.length;
375423
pageHeader.data_page_header = new parquet_thrift.DataPageHeader();
376424
pageHeader.data_page_header.num_values = rlevels.length;
425+
if (column.statistics !== false) {
426+
pageHeader.data_page_header.statistics = encodeStatistics(statistics, column);
427+
}
377428

378429
pageHeader.data_page_header.encoding = parquet_thrift.Encoding[column.encoding];
379430
pageHeader.data_page_header.definition_level_encoding =
@@ -388,7 +439,7 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) {
388439
/**
389440
* Encode a parquet data page (v2)
390441
*/
391-
function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels) {
442+
function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels, statistics) {
392443
/* encode values */
393444
let valuesBuf = encodeValues(
394445
column.primitiveType,
@@ -433,6 +484,10 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels
433484
pageHeader.data_page_header_v2.num_nulls = valueCount - values.length;
434485
pageHeader.data_page_header_v2.num_rows = valueCount;
435486

487+
if (column.statistics !== false) {
488+
pageHeader.data_page_header_v2.statistics = encodeStatistics(statistics, column);
489+
}
490+
436491
pageHeader.uncompressed_page_size =
437492
rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length;
438493

@@ -477,6 +532,34 @@ function encodeColumnChunk(pages, opts) {
477532
metadata.codec = parquet_thrift.CompressionCodec[
478533
opts.useDataPageV2 ? opts.column.compression : 'UNCOMPRESSED'];
479534

535+
/* compile statistics */
536+
let statistics = {};
537+
let distinct_values = new Set();
538+
statistics.null_count = 0;
539+
statistics.distinct_count = 0;
540+
541+
542+
for (let i = 0; i < pages.length; i++) {
543+
let page = pages[i];
544+
545+
if (opts.column.statistics !== false) {
546+
547+
if (page.statistics.max_value > statistics.max_value || i == 0) {
548+
statistics.max_value = page.statistics.max_value;
549+
}
550+
if (page.statistics.min_value < statistics.min_value || i == 0) {
551+
statistics.min_value = page.statistics.min_value;
552+
}
553+
statistics.null_count += page.statistics.null_count;
554+
page.distinct_values.forEach(value => distinct_values.add(value));
555+
}
556+
}
557+
558+
if (opts.column.statistics !== false) {
559+
statistics.distinct_count = distinct_values.size;
560+
metadata.statistics = encodeStatistics(statistics, opts.column);
561+
}
562+
480563
/* list encodings */
481564
let encodingsSet = {};
482565
encodingsSet[PARQUET_RDLVL_ENCODING] = true;

test/integration.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,25 @@ async function verifyPages() {
151151
assert.equal(rowCount, column.column.meta_data.num_values);
152152
}
153153

154+
async function verifyStatistics() {
155+
const column = await sampleColumnHeaders();
156+
const colStats = column.column.meta_data.statistics;
157+
158+
assert.equal(colStats.max_value, 'oranges');
159+
assert.equal(colStats.min_value, 'apples');
160+
assert.equal(colStats.null_count, 0);
161+
assert.equal(colStats.distinct_count, 4);
162+
163+
column.pages.forEach( (d, i) => {
164+
let header = d.data_page_header || d.data_page_header_v2;
165+
let pageStats = header.statistics;
166+
assert.equal(pageStats.null_count,0);
167+
assert.equal(pageStats.distinct_count, 4);
168+
assert.equal(pageStats.max_value, 'oranges');
169+
assert.equal(pageStats.min_value, 'apples');
170+
});
171+
}
172+
154173
async function readTestFile() {
155174
let reader = await parquet.ParquetReader.openFile('fruits.parquet');
156175
assert.equal(reader.getRowCount(), TEST_NUM_ROWS * 4);
@@ -345,6 +364,10 @@ describe('Parquet', function() {
345364
it('verify that data is split into pages', function() {
346365
return verifyPages();
347366
});
367+
368+
it('verify statistics', function() {
369+
return verifyStatistics();
370+
});
348371
});
349372

350373
describe('with DataPageHeaderV2', function() {
@@ -362,6 +385,10 @@ describe('Parquet', function() {
362385
return verifyPages();
363386
});
364387

388+
it('verify statistics', function() {
389+
return verifyStatistics();
390+
});
391+
365392
it('write a test file with GZIP compression', function() {
366393
const opts = { useDataPageV2: true, compression: 'GZIP' };
367394
return writeTestFile(opts);

0 commit comments

Comments
 (0)