Skip to content

Commit b2ccb28

Browse files
Max-Chengraunaqmorarka
authored andcommitted
Avoid reading unusually large parquet pages
Reading unusually large parquet pages can lead to workers going into full GC and crashing. This change adds a guard rail to fail reads of such files gracefully.
1 parent d3a7fd5 commit b2ccb28

File tree

9 files changed

+120
-12
lines changed

9 files changed

+120
-12
lines changed

docs/src/main/sphinx/object-storage/file-formats.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ with Parquet files performed by supported object storage connectors:
115115
This prevents workers from going into full GC or crashing due to poorly
116116
configured Parquet writers.
117117
- `15MB`
118+
* - `parquet.max-page-read-size`
119+
- Maximum allowed size of a parquet page during reads. Files with parquet pages
120+
larger than this will generate an exception on read.
121+
- `500MB`
118122
:::
119123

120124
[](file-compression) is automatically performed and some details can be

lib/trino-parquet/src/main/java/io/trino/parquet/ParquetReaderOptions.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ParquetReaderOptions
2727
private static final DataSize DEFAULT_MAX_BUFFER_SIZE = DataSize.of(8, MEGABYTE);
2828
private static final DataSize DEFAULT_SMALL_FILE_THRESHOLD = DataSize.of(3, MEGABYTE);
2929
private static final DataSize DEFAULT_MAX_FOOTER_READ_SIZE = DataSize.of(15, MEGABYTE);
30+
private static final DataSize DEFAULT_MAX_PAGE_READ_SIZE = DataSize.of(500, MEGABYTE);
3031

3132
private final boolean ignoreStatistics;
3233
private final DataSize maxReadBlockSize;
@@ -38,6 +39,7 @@ public class ParquetReaderOptions
3839
private final DataSize smallFileThreshold;
3940
private final boolean vectorizedDecodingEnabled;
4041
private final DataSize maxFooterReadSize;
42+
private final DataSize maxPageReadSize;
4143

4244
private ParquetReaderOptions()
4345
{
@@ -51,6 +53,7 @@ private ParquetReaderOptions()
5153
smallFileThreshold = DEFAULT_SMALL_FILE_THRESHOLD;
5254
vectorizedDecodingEnabled = true;
5355
maxFooterReadSize = DEFAULT_MAX_FOOTER_READ_SIZE;
56+
maxPageReadSize = DEFAULT_MAX_PAGE_READ_SIZE;
5457
}
5558

5659
private ParquetReaderOptions(
@@ -63,7 +66,8 @@ private ParquetReaderOptions(
6366
boolean useBloomFilter,
6467
DataSize smallFileThreshold,
6568
boolean vectorizedDecodingEnabled,
66-
DataSize maxFooterReadSize)
69+
DataSize maxFooterReadSize,
70+
DataSize maxPageReadSize)
6771
{
6872
this.ignoreStatistics = ignoreStatistics;
6973
this.maxReadBlockSize = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null");
@@ -76,6 +80,7 @@ private ParquetReaderOptions(
7680
this.smallFileThreshold = requireNonNull(smallFileThreshold, "smallFileThreshold is null");
7781
this.vectorizedDecodingEnabled = vectorizedDecodingEnabled;
7882
this.maxFooterReadSize = requireNonNull(maxFooterReadSize, "maxFooterReadSize is null");
83+
this.maxPageReadSize = requireNonNull(maxPageReadSize, "maxPageReadSize is null");
7984
}
8085

8186
public static Builder builder()
@@ -143,6 +148,11 @@ public DataSize getMaxFooterReadSize()
143148
return maxFooterReadSize;
144149
}
145150

151+
public DataSize getMaxPageReadSize()
152+
{
153+
return maxPageReadSize;
154+
}
155+
146156
public static class Builder
147157
{
148158
private boolean ignoreStatistics;
@@ -155,6 +165,7 @@ public static class Builder
155165
private DataSize smallFileThreshold;
156166
private boolean vectorizedDecodingEnabled;
157167
private DataSize maxFooterReadSize;
168+
private DataSize maxPageReadSize;
158169

159170
private Builder(ParquetReaderOptions parquetReaderOptions)
160171
{
@@ -169,6 +180,7 @@ private Builder(ParquetReaderOptions parquetReaderOptions)
169180
this.smallFileThreshold = parquetReaderOptions.smallFileThreshold;
170181
this.vectorizedDecodingEnabled = parquetReaderOptions.vectorizedDecodingEnabled;
171182
this.maxFooterReadSize = parquetReaderOptions.maxFooterReadSize;
183+
this.maxPageReadSize = parquetReaderOptions.maxPageReadSize;
172184
}
173185

174186
public Builder withIgnoreStatistics(boolean ignoreStatistics)
@@ -231,6 +243,12 @@ public Builder withMaxFooterReadSize(DataSize maxFooterReadSize)
231243
return this;
232244
}
233245

246+
public Builder withMaxPageReadSize(DataSize maxPageReadSize)
247+
{
248+
this.maxPageReadSize = requireNonNull(maxPageReadSize, "maxPageSize is null");
249+
return this;
250+
}
251+
234252
public ParquetReaderOptions build()
235253
{
236254
return new ParquetReaderOptions(
@@ -243,7 +261,8 @@ public ParquetReaderOptions build()
243261
useBloomFilter,
244262
smallFileThreshold,
245263
vectorizedDecodingEnabled,
246-
maxFooterReadSize);
264+
maxFooterReadSize,
265+
maxPageReadSize);
247266
}
248267
}
249268
}

lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public static PageReader createPageReader(
7070
ColumnDescriptor columnDescriptor,
7171
@Nullable OffsetIndex offsetIndex,
7272
Optional<String> fileCreatedBy,
73-
Optional<FileDecryptionContext> decryptionContext)
73+
Optional<FileDecryptionContext> decryptionContext,
74+
long maxPageSizeInBytes)
7475
{
7576
// Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data.
7677
// Row-group column statistics can be used to identify such cases and switch to faster non-nullable read
@@ -86,7 +87,8 @@ public static PageReader createPageReader(
8687
metadata,
8788
columnChunk,
8889
offsetIndex,
89-
columnDecryptionContext);
90+
columnDecryptionContext,
91+
maxPageSizeInBytes);
9092

9193
return new PageReader(
9294
dataSourceId,

lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Optional;
4040
import java.util.OptionalLong;
4141

42+
import static com.google.common.base.Preconditions.checkArgument;
4243
import static com.google.common.base.Preconditions.checkState;
4344
import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding;
4445
import static java.util.Objects.requireNonNull;
@@ -53,6 +54,7 @@ public final class ParquetColumnChunkIterator
5354
private final ChunkedInputStream input;
5455
private final OffsetIndex offsetIndex;
5556
private final Optional<ColumnDecryptionContext> decryptionContext;
57+
private final long maxPageReadSizeInBytes;
5658

5759
private long valueCount;
5860
private int dataPageCount;
@@ -67,7 +69,8 @@ public ParquetColumnChunkIterator(
6769
ColumnChunkMetadata metadata,
6870
ChunkedInputStream input,
6971
@Nullable OffsetIndex offsetIndex,
70-
Optional<ColumnDecryptionContext> decryptionContext)
72+
Optional<ColumnDecryptionContext> decryptionContext,
73+
long maxPageReadSizeInBytes)
7174
{
7275
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
7376
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
@@ -76,6 +79,8 @@ public ParquetColumnChunkIterator(
7679
this.input = requireNonNull(input, "input is null");
7780
this.offsetIndex = offsetIndex;
7881
this.decryptionContext = requireNonNull(decryptionContext, "decryptionContext is null");
82+
checkArgument(maxPageReadSizeInBytes > 0, "maxPageReadSizeInBytes must be positive");
83+
this.maxPageReadSizeInBytes = maxPageReadSizeInBytes;
7984
}
8085

8186
@Override
@@ -102,6 +107,16 @@ public Page next()
102107
PageHeader pageHeader = readPageHeader(decryptionContext.map(ColumnDecryptionContext::metadataDecryptor).orElse(null), pageHeaderAAD);
103108
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
104109
int compressedPageSize = pageHeader.getCompressed_page_size();
110+
111+
if (uncompressedPageSize > maxPageReadSizeInBytes) {
112+
throw new ParquetCorruptionException(
113+
dataSourceId,
114+
"Parquet page size %d bytes exceeds maximum allowed size %d bytes for column %s",
115+
uncompressedPageSize,
116+
maxPageReadSizeInBytes,
117+
descriptor);
118+
}
119+
105120
Page result = null;
106121
switch (pageHeader.type) {
107122
case DICTIONARY_PAGE:

lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,15 @@ private ColumnChunk readPrimitive(PrimitiveField field)
667667
}
668668
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
669669
columnReader.setPageReader(
670-
createPageReader(dataSource.getId(), columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy, decryptionContext),
670+
createPageReader(
671+
dataSource.getId(),
672+
columnChunkInputStream,
673+
metadata,
674+
columnDescriptor,
675+
offsetIndex,
676+
fileCreatedBy,
677+
decryptionContext,
678+
options.getMaxPageReadSize().toBytes()),
671679
Optional.ofNullable(rowRanges));
672680
}
673681
ColumnChunk columnChunk = columnReader.readPrimitive();

lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,16 @@ public void singlePage(CompressionCodec compressionCodec, DataPageType dataPageT
111111
Slices.wrappedBuffer(Arrays.copyOf(bytes, headerSize + 1)),
112112
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 1, headerSize + 2)),
113113
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 2, bytes.length))));
114+
115+
// verify page size limit - should fail when page size exceeds limit
116+
long pageSize = pageHeader.getUncompressed_page_size();
117+
assertThatThrownBy(() -> {
118+
PageReader pageReader = createPageReader(valueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)), pageSize - 1);
119+
pageReader.readPage();
120+
})
121+
.isInstanceOf(RuntimeException.class)
122+
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
123+
.hasMessageContaining("exceeds maximum allowed size");
114124
}
115125

116126
@Test(dataProvider = "pageParameters")
@@ -151,6 +161,16 @@ public void manyPages(CompressionCodec compressionCodec, DataPageType dataPageTy
151161
Slices.wrappedBuffer(Arrays.copyOf(bytes, pageSize - 2)),
152162
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize - 2, pageSize * 2)),
153163
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize * 2, bytes.length))));
164+
165+
// verify page size limit - should fail when page size exceeds limit
166+
long uncompressedPageSize = pageHeader.getUncompressed_page_size();
167+
assertThatThrownBy(() -> {
168+
PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)), uncompressedPageSize - 1);
169+
pageReader.readPage();
170+
})
171+
.isInstanceOf(RuntimeException.class)
172+
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
173+
.hasMessageContaining("exceeds maximum allowed size");
154174
}
155175

156176
@Test(dataProvider = "pageParameters")
@@ -204,6 +224,17 @@ public void dictionaryPage(CompressionCodec compressionCodec, DataPageType dataP
204224
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryHeaderSize - 1, dictionaryPageSize - 1)),
205225
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize - 1, dictionaryPageSize + 1)),
206226
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize + 1, bytes.length))));
227+
228+
// verify page size limit - should fail when page size exceeds limit
229+
long uncompressedPageSize = pageHeader.getUncompressed_page_size();
230+
assertThatThrownBy(() -> {
231+
PageReader limitedReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes)), uncompressedPageSize - 1);
232+
limitedReader.readDictionaryPage();
233+
limitedReader.readPage();
234+
})
235+
.isInstanceOf(RuntimeException.class)
236+
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
237+
.hasMessageContaining("exceeds maximum allowed size");
207238
}
208239

209240
@Test
@@ -384,6 +415,11 @@ private static byte[] compress(CompressionCodec compressionCodec, byte[] bytes,
384415
}
385416

386417
private static PageReader createPageReader(int valueCount, CompressionCodec compressionCodec, boolean hasDictionary, List<Slice> slices)
418+
{
419+
return createPageReader(valueCount, compressionCodec, hasDictionary, slices, Long.MAX_VALUE);
420+
}
421+
422+
private static PageReader createPageReader(int valueCount, CompressionCodec compressionCodec, boolean hasDictionary, List<Slice> slices, long maxPageSize)
387423
{
388424
EncodingStats.Builder encodingStats = new EncodingStats.Builder();
389425
if (hasDictionary) {
@@ -409,7 +445,8 @@ private static PageReader createPageReader(int valueCount, CompressionCodec comp
409445
new ColumnDescriptor(new String[] {}, new PrimitiveType(REQUIRED, INT32, ""), 0, 0),
410446
null,
411447
Optional.empty(),
412-
Optional.empty());
448+
Optional.empty(),
449+
maxPageSize);
413450
}
414451

415452
private static void assertDataPageEquals(PageHeader pageHeader, byte[] dataPage, byte[] compressedDataPage, DataPage decompressedPage)

lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public void testWrittenPageSize()
146146
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
147147
null,
148148
Optional.empty(),
149-
Optional.empty());
149+
Optional.empty(),
150+
Long.MAX_VALUE);
150151

151152
pageReader.readDictionaryPage();
152153
assertThat(pageReader.hasNext()).isTrue();
@@ -200,7 +201,8 @@ public void testWrittenPageValueCount()
200201
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
201202
null,
202203
Optional.empty(),
203-
Optional.empty());
204+
Optional.empty(),
205+
Long.MAX_VALUE);
204206

205207
pageReader.readDictionaryPage();
206208
assertThat(pageReader.hasNext()).isTrue();
@@ -220,7 +222,8 @@ public void testWrittenPageValueCount()
220222
new ColumnDescriptor(new String[] {"columnb"}, new PrimitiveType(REQUIRED, INT64, "columnb"), 0, 0),
221223
null,
222224
Optional.empty(),
223-
Optional.empty());
225+
Optional.empty(),
226+
Long.MAX_VALUE);
224227

225228
pageReader.readDictionaryPage();
226229
assertThat(pageReader.hasNext()).isTrue();

plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,23 @@ public DataSize getMaxFooterReadSize()
191191
return options.getMaxFooterReadSize();
192192
}
193193

194+
@Config("parquet.max-page-read-size")
195+
@ConfigDescription("Maximum allowed size of a parquet page during reads. Files with parquet pages larger than this will generate an exception on read")
196+
public ParquetReaderConfig setMaxPageReadSize(DataSize maxPageSize)
197+
{
198+
options = ParquetReaderOptions.builder(options)
199+
.withMaxPageReadSize(maxPageSize)
200+
.build();
201+
return this;
202+
}
203+
204+
@NotNull
205+
@MinDataSize("4MB")
206+
public DataSize getMaxPageReadSize()
207+
{
208+
return options.getMaxPageReadSize();
209+
}
210+
194211
public ParquetReaderOptions toParquetReaderOptions()
195212
{
196213
return options;

plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetReaderConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void testDefaults()
4040
.setUseBloomFilter(true)
4141
.setSmallFileThreshold(DataSize.of(3, MEGABYTE))
4242
.setVectorizedDecodingEnabled(true)
43-
.setMaxFooterReadSize(DataSize.of(15, MEGABYTE)));
43+
.setMaxFooterReadSize(DataSize.of(15, MEGABYTE))
44+
.setMaxPageReadSize(DataSize.of(500, MEGABYTE)));
4445
}
4546

4647
@Test
@@ -57,6 +58,7 @@ public void testExplicitPropertyMappings()
5758
.put("parquet.small-file-threshold", "1kB")
5859
.put("parquet.experimental.vectorized-decoding.enabled", "false")
5960
.put("parquet.max-footer-read-size", "25MB")
61+
.put("parquet.max-page-read-size", "123MB")
6062
.buildOrThrow();
6163

6264
ParquetReaderConfig expected = new ParquetReaderConfig()
@@ -69,7 +71,8 @@ public void testExplicitPropertyMappings()
6971
.setUseBloomFilter(false)
7072
.setSmallFileThreshold(DataSize.of(1, KILOBYTE))
7173
.setVectorizedDecodingEnabled(false)
72-
.setMaxFooterReadSize(DataSize.of(25, MEGABYTE));
74+
.setMaxFooterReadSize(DataSize.of(25, MEGABYTE))
75+
.setMaxPageReadSize(DataSize.of(123, MEGABYTE));
7376

7477
assertFullMapping(properties, expected);
7578
}

0 commit comments

Comments
 (0)