Skip to content

Commit 4544755

Browse files
committed
feat:allow user set parquet_max_page_size/parquet.max-page-size to avoid large page cause node OOM
1 parent 296afe9 commit 4544755

File tree

14 files changed

+184
-8
lines changed

14 files changed

+184
-8
lines changed

lib/trino-parquet/src/main/java/io/trino/parquet/ParquetReaderOptions.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ParquetReaderOptions
2727
private static final DataSize DEFAULT_MAX_BUFFER_SIZE = DataSize.of(8, MEGABYTE);
2828
private static final DataSize DEFAULT_SMALL_FILE_THRESHOLD = DataSize.of(3, MEGABYTE);
2929
private static final DataSize DEFAULT_MAX_FOOTER_READ_SIZE = DataSize.of(15, MEGABYTE);
30+
private static final DataSize DEFAULT_MAX_PAGE_SIZE = DataSize.of(500, MEGABYTE);
3031

3132
private final boolean ignoreStatistics;
3233
private final DataSize maxReadBlockSize;
@@ -38,6 +39,7 @@ public class ParquetReaderOptions
3839
private final DataSize smallFileThreshold;
3940
private final boolean vectorizedDecodingEnabled;
4041
private final DataSize maxFooterReadSize;
42+
private final DataSize maxPageSize;
4143

4244
private ParquetReaderOptions()
4345
{
@@ -51,6 +53,7 @@ private ParquetReaderOptions()
5153
smallFileThreshold = DEFAULT_SMALL_FILE_THRESHOLD;
5254
vectorizedDecodingEnabled = true;
5355
maxFooterReadSize = DEFAULT_MAX_FOOTER_READ_SIZE;
56+
maxPageSize = DEFAULT_MAX_PAGE_SIZE;
5457
}
5558

5659
private ParquetReaderOptions(
@@ -63,7 +66,8 @@ private ParquetReaderOptions(
6366
boolean useBloomFilter,
6467
DataSize smallFileThreshold,
6568
boolean vectorizedDecodingEnabled,
66-
DataSize maxFooterReadSize)
69+
DataSize maxFooterReadSize,
70+
DataSize maxPageSize)
6771
{
6872
this.ignoreStatistics = ignoreStatistics;
6973
this.maxReadBlockSize = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null");
@@ -76,6 +80,7 @@ private ParquetReaderOptions(
7680
this.smallFileThreshold = requireNonNull(smallFileThreshold, "smallFileThreshold is null");
7781
this.vectorizedDecodingEnabled = vectorizedDecodingEnabled;
7882
this.maxFooterReadSize = requireNonNull(maxFooterReadSize, "maxFooterReadSize is null");
83+
this.maxPageSize = requireNonNull(maxPageSize, "maxPageSize is null");
7984
}
8085

8186
public static Builder builder()
@@ -143,6 +148,11 @@ public DataSize getMaxFooterReadSize()
143148
return maxFooterReadSize;
144149
}
145150

151+
public DataSize getMaxPageSize()
152+
{
153+
return maxPageSize;
154+
}
155+
146156
public static class Builder
147157
{
148158
private boolean ignoreStatistics;
@@ -155,6 +165,7 @@ public static class Builder
155165
private DataSize smallFileThreshold;
156166
private boolean vectorizedDecodingEnabled;
157167
private DataSize maxFooterReadSize;
168+
private DataSize maxPageSize;
158169

159170
private Builder(ParquetReaderOptions parquetReaderOptions)
160171
{
@@ -169,6 +180,7 @@ private Builder(ParquetReaderOptions parquetReaderOptions)
169180
this.smallFileThreshold = parquetReaderOptions.smallFileThreshold;
170181
this.vectorizedDecodingEnabled = parquetReaderOptions.vectorizedDecodingEnabled;
171182
this.maxFooterReadSize = parquetReaderOptions.maxFooterReadSize;
183+
this.maxPageSize = parquetReaderOptions.maxPageSize;
172184
}
173185

174186
public Builder withIgnoreStatistics(boolean ignoreStatistics)
@@ -231,6 +243,12 @@ public Builder withMaxFooterReadSize(DataSize maxFooterReadSize)
231243
return this;
232244
}
233245

246+
public Builder withMaxPageSize(DataSize maxPageSize)
247+
{
248+
this.maxPageSize = requireNonNull(maxPageSize, "maxPageSize is null");
249+
return this;
250+
}
251+
234252
public ParquetReaderOptions build()
235253
{
236254
return new ParquetReaderOptions(
@@ -243,7 +261,8 @@ public ParquetReaderOptions build()
243261
useBloomFilter,
244262
smallFileThreshold,
245263
vectorizedDecodingEnabled,
246-
maxFooterReadSize);
264+
maxFooterReadSize,
265+
maxPageSize);
247266
}
248267
}
249268
}

lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public static PageReader createPageReader(
7070
ColumnDescriptor columnDescriptor,
7171
@Nullable OffsetIndex offsetIndex,
7272
Optional<String> fileCreatedBy,
73-
Optional<FileDecryptionContext> decryptionContext)
73+
Optional<FileDecryptionContext> decryptionContext,
74+
Optional<Long> maxPageSize)
7475
{
7576
// Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data.
7677
// Row-group column statistics can be used to identify such cases and switch to faster non-nullable read
@@ -86,7 +87,8 @@ public static PageReader createPageReader(
8687
metadata,
8788
columnChunk,
8889
offsetIndex,
89-
columnDecryptionContext);
90+
columnDecryptionContext,
91+
maxPageSize);
9092

9193
return new PageReader(
9294
dataSourceId,

lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public final class ParquetColumnChunkIterator
5353
private final ChunkedInputStream input;
5454
private final OffsetIndex offsetIndex;
5555
private final Optional<ColumnDecryptionContext> decryptionContext;
56+
private final Optional<Long> maxPageSize;
5657

5758
private long valueCount;
5859
private int dataPageCount;
@@ -67,7 +68,8 @@ public ParquetColumnChunkIterator(
6768
ColumnChunkMetadata metadata,
6869
ChunkedInputStream input,
6970
@Nullable OffsetIndex offsetIndex,
70-
Optional<ColumnDecryptionContext> decryptionContext)
71+
Optional<ColumnDecryptionContext> decryptionContext,
72+
Optional<Long> maxPageSize)
7173
{
7274
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
7375
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
@@ -76,6 +78,7 @@ public ParquetColumnChunkIterator(
7678
this.input = requireNonNull(input, "input is null");
7779
this.offsetIndex = offsetIndex;
7880
this.decryptionContext = requireNonNull(decryptionContext, "decryptionContext is null");
81+
this.maxPageSize = requireNonNull(maxPageSize, "maxPageSize is null");
7982
}
8083

8184
@Override
@@ -102,6 +105,17 @@ public Page next()
102105
PageHeader pageHeader = readPageHeader(decryptionContext.map(ColumnDecryptionContext::metadataDecryptor).orElse(null), pageHeaderAAD);
103106
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
104107
int compressedPageSize = pageHeader.getCompressed_page_size();
108+
109+
// Validate page size limit
110+
if (maxPageSize.isPresent() && uncompressedPageSize > maxPageSize.get()) {
111+
throw new ParquetCorruptionException(
112+
dataSourceId,
113+
"Parquet page size %d bytes exceeds maximum allowed size %d bytes for column %s",
114+
uncompressedPageSize,
115+
maxPageSize.get(),
116+
descriptor);
117+
}
118+
105119
Page result = null;
106120
switch (pageHeader.type) {
107121
case DICTIONARY_PAGE:

lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,15 @@ private ColumnChunk readPrimitive(PrimitiveField field)
667667
}
668668
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
669669
columnReader.setPageReader(
670-
createPageReader(dataSource.getId(), columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy, decryptionContext),
670+
createPageReader(
671+
dataSource.getId(),
672+
columnChunkInputStream,
673+
metadata,
674+
columnDescriptor,
675+
offsetIndex,
676+
fileCreatedBy,
677+
decryptionContext,
678+
Optional.of(options.getMaxPageSize().toBytes())),
671679
Optional.ofNullable(rowRanges));
672680
}
673681
ColumnChunk columnChunk = columnReader.readPrimitive();

lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,77 @@ private static void assertPages(
312312
assertThat(pageReader.readPage()).isNull();
313313
}
314314

315+
@Test
316+
public void testPageSizeLimit()
317+
throws Exception
318+
{
319+
// Create a page with 100 bytes uncompressed size
320+
byte[] largeDataPage = new byte[100];
321+
Arrays.fill(largeDataPage, (byte) 42);
322+
323+
int valueCount = 10;
324+
byte[] compressedDataPage = V1.compress(UNCOMPRESSED, largeDataPage);
325+
326+
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, largeDataPage.length, compressedDataPage.length);
327+
V1.setDataPageHeader(pageHeader, valueCount);
328+
329+
ByteArrayOutputStream out = new ByteArrayOutputStream();
330+
Util.writePageHeader(pageHeader, out);
331+
out.write(compressedDataPage);
332+
byte[] bytes = out.toByteArray();
333+
334+
// Create ColumnChunkMetadata
335+
EncodingStats.Builder encodingStats = new EncodingStats.Builder();
336+
PrimitiveType primitiveType = Types.optional(INT32).named("test_column");
337+
ColumnChunkMetadata columnChunkMetaData = ColumnChunkMetadata.get(
338+
ColumnPath.get("test_column"),
339+
primitiveType,
340+
CompressionCodecName.UNCOMPRESSED,
341+
encodingStats.build(),
342+
ImmutableSet.of(),
343+
Statistics.createStats(primitiveType),
344+
0,
345+
0,
346+
valueCount,
347+
0,
348+
0);
349+
350+
// Test with maxPageSize = 50 bytes (less than the 100 bytes page)
351+
assertThatThrownBy(() -> {
352+
PageReader pageReader = PageReader.createPageReader(
353+
new ParquetDataSourceId("test"),
354+
new ChunkedInputStream(ImmutableList.of(new TestingChunkReader(Slices.wrappedBuffer(bytes)))),
355+
columnChunkMetaData,
356+
new ColumnDescriptor(new String[] {"test_column"}, new PrimitiveType(REQUIRED, INT32, "test_column"), 0, 0),
357+
null,
358+
Optional.empty(),
359+
Optional.empty(),
360+
Optional.of(50L)); // maxPageSize = 50 bytes
361+
362+
// Try to read the page - should fail during hasNext() when reading page header
363+
pageReader.readPage();
364+
})
365+
.isInstanceOf(RuntimeException.class)
366+
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
367+
.hasMessageContaining("Parquet page size 100 bytes exceeds maximum allowed size 50 bytes");
368+
369+
// Test with maxPageSize = 150 bytes (more than the 100 bytes page) - should succeed
370+
PageReader pageReader = PageReader.createPageReader(
371+
new ParquetDataSourceId("test"),
372+
new ChunkedInputStream(ImmutableList.of(new TestingChunkReader(Slices.wrappedBuffer(bytes)))),
373+
columnChunkMetaData,
374+
new ColumnDescriptor(new String[] {"test_column"}, new PrimitiveType(REQUIRED, INT32, "test_column"), 0, 0),
375+
null,
376+
Optional.empty(),
377+
Optional.empty(),
378+
Optional.of(150L)); // maxPageSize = 150 bytes
379+
380+
assertThat(pageReader.hasNext()).isTrue();
381+
DataPage dataPage = pageReader.readPage();
382+
assertThat(dataPage).isNotNull();
383+
assertThat(dataPage.getUncompressedSize()).isEqualTo(100);
384+
}
385+
315386
@DataProvider
316387
public Object[][] pageParameters()
317388
{
@@ -409,6 +480,7 @@ private static PageReader createPageReader(int valueCount, CompressionCodec comp
409480
new ColumnDescriptor(new String[] {}, new PrimitiveType(REQUIRED, INT32, ""), 0, 0),
410481
null,
411482
Optional.empty(),
483+
Optional.empty(),
412484
Optional.empty());
413485
}
414486

lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public void testWrittenPageSize()
146146
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
147147
null,
148148
Optional.empty(),
149+
Optional.empty(),
149150
Optional.empty());
150151

151152
pageReader.readDictionaryPage();
@@ -200,6 +201,7 @@ public void testWrittenPageValueCount()
200201
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
201202
null,
202203
Optional.empty(),
204+
Optional.empty(),
203205
Optional.empty());
204206

205207
pageReader.readDictionaryPage();
@@ -220,6 +222,7 @@ public void testWrittenPageValueCount()
220222
new ColumnDescriptor(new String[] {"columnb"}, new PrimitiveType(REQUIRED, INT64, "columnb"), 0, 0),
221223
null,
222224
Optional.empty(),
225+
Optional.empty(),
223226
Optional.empty());
224227

225228
pageReader.readDictionaryPage();

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.rowPositionColumnHandle;
9696
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
9797
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
98+
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxPageSize;
9899
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockRowCount;
99100
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetMaxReadBlockSize;
100101
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getParquetSmallFileThreshold;
@@ -226,6 +227,7 @@ public ConnectorPageSource createPageSource(
226227
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session))
227228
.withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session))
228229
.withSmallFileThreshold(getParquetSmallFileThreshold(session))
230+
.withMaxPageSize(getParquetMaxPageSize(session))
229231
.withUseColumnIndex(split.getDeletionVector().isEmpty() && isParquetUseColumnIndex(session))
230232
.withIgnoreStatistics(isParquetIgnoreStatistics(session))
231233
.withVectorizedDecodingEnabled(isParquetVectorizedDecodingEnabled(session))

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public final class DeltaLakeSessionProperties
5656
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
5757
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
5858
private static final String PARQUET_SMALL_FILE_THRESHOLD = "parquet_small_file_threshold";
59+
private static final String PARQUET_MAX_PAGE_SIZE = "parquet_max_page_size";
5960
private static final String PARQUET_USE_COLUMN_INDEX = "parquet_use_column_index";
6061
private static final String PARQUET_IGNORE_STATISTICS = "parquet_ignore_statistics";
6162
private static final String PARQUET_VECTORIZED_DECODING_ENABLED = "parquet_vectorized_decoding_enabled";
@@ -126,6 +127,11 @@ public DeltaLakeSessionProperties(
126127
parquetReaderConfig.getSmallFileThreshold(),
127128
value -> validateMaxDataSize(PARQUET_SMALL_FILE_THRESHOLD, value, DataSize.valueOf(PARQUET_READER_MAX_SMALL_FILE_THRESHOLD)),
128129
false),
130+
dataSizeProperty(
131+
PARQUET_MAX_PAGE_SIZE,
132+
"Parquet: Maximum size for a single Parquet page. Prevents OOM from extremely large pages",
133+
parquetReaderConfig.getMaxPageSize(),
134+
false),
129135
booleanProperty(
130136
PARQUET_USE_COLUMN_INDEX,
131137
"Use Parquet column index",
@@ -275,6 +281,11 @@ public static DataSize getParquetSmallFileThreshold(ConnectorSession session)
275281
return session.getProperty(PARQUET_SMALL_FILE_THRESHOLD, DataSize.class);
276282
}
277283

284+
public static DataSize getParquetMaxPageSize(ConnectorSession session)
285+
{
286+
return session.getProperty(PARQUET_MAX_PAGE_SIZE, DataSize.class);
287+
}
288+
278289
public static boolean isParquetUseColumnIndex(ConnectorSession session)
279290
{
280291
return session.getProperty(PARQUET_USE_COLUMN_INDEX, Boolean.class);

plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public final class HiveSessionProperties
9898
private static final String PARQUET_MAX_READ_BLOCK_ROW_COUNT = "parquet_max_read_block_row_count";
9999
private static final String PARQUET_SMALL_FILE_THRESHOLD = "parquet_small_file_threshold";
100100
private static final String PARQUET_VECTORIZED_DECODING_ENABLED = "parquet_vectorized_decoding_enabled";
101+
private static final String PARQUET_MAX_PAGE_SIZE = "parquet_max_page_size";
101102
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
102103
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
103104
private static final String PARQUET_WRITER_PAGE_VALUE_COUNT = "parquet_writer_page_value_count";
@@ -366,6 +367,11 @@ public HiveSessionProperties(
366367
"Enable using Java Vector API for faster decoding of parquet files",
367368
parquetReaderConfig.isVectorizedDecodingEnabled(),
368369
false),
370+
dataSizeProperty(
371+
PARQUET_MAX_PAGE_SIZE,
372+
"Parquet: Maximum size for a single Parquet page. Prevents OOM from extremely large pages",
373+
parquetReaderConfig.getMaxPageSize(),
374+
false),
369375
dataSizeProperty(
370376
PARQUET_WRITER_BLOCK_SIZE,
371377
"Parquet: Writer block size",
@@ -749,6 +755,11 @@ public static DataSize getParquetSmallFileThreshold(ConnectorSession session)
749755
return session.getProperty(PARQUET_SMALL_FILE_THRESHOLD, DataSize.class);
750756
}
751757

758+
public static DataSize getParquetMaxPageSize(ConnectorSession session)
759+
{
760+
return session.getProperty(PARQUET_MAX_PAGE_SIZE, DataSize.class);
761+
}
762+
752763
public static boolean isParquetVectorizedDecodingEnabled(ConnectorSession session)
753764
{
754765
return session.getProperty(PARQUET_VECTORIZED_DECODING_ENABLED, Boolean.class);

plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import static io.trino.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
9898
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
9999
import static io.trino.plugin.hive.HivePageSourceProvider.getProjection;
100+
import static io.trino.plugin.hive.HiveSessionProperties.getParquetMaxPageSize;
100101
import static io.trino.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockRowCount;
101102
import static io.trino.plugin.hive.HiveSessionProperties.getParquetMaxReadBlockSize;
102103
import static io.trino.plugin.hive.HiveSessionProperties.getParquetSmallFileThreshold;
@@ -203,6 +204,7 @@ public Optional<ConnectorPageSource> createPageSource(
203204
.withUseColumnIndex(isParquetUseColumnIndex(session))
204205
.withBloomFilter(useParquetBloomFilter(session))
205206
.withVectorizedDecodingEnabled(isParquetVectorizedDecodingEnabled(session))
207+
.withMaxPageSize(getParquetMaxPageSize(session))
206208
.build(),
207209
Optional.empty(),
208210
fileDecryptionProperties,

0 commit comments

Comments
 (0)