Skip to content

Commit e9095ab

Browse files
committed
Avoid reading unusually large parquet pages
Reading unusually large parquet pages can lead to workers going into full GC and crashing. This change adds a guard rail to fail reads of such files gracefully.
1 parent 296afe9 commit e9095ab

File tree

9 files changed

+120
-12
lines changed

9 files changed

+120
-12
lines changed

docs/src/main/sphinx/object-storage/file-formats.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ with Parquet files performed by supported object storage connectors:
115115
This prevents workers from going into full GC or crashing due to poorly
116116
configured Parquet writers.
117117
- `15MB`
118+
* - `parquet.max-page-read-size`
119+
- Maximum allowed size of a parquet page during reads. Files with parquet pages
120+
larger than this will generate an exception on read.
121+
- `500MB`
118122
:::
119123

120124
[](file-compression) is automatically performed and some details can be

lib/trino-parquet/src/main/java/io/trino/parquet/ParquetReaderOptions.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class ParquetReaderOptions
2727
private static final DataSize DEFAULT_MAX_BUFFER_SIZE = DataSize.of(8, MEGABYTE);
2828
private static final DataSize DEFAULT_SMALL_FILE_THRESHOLD = DataSize.of(3, MEGABYTE);
2929
private static final DataSize DEFAULT_MAX_FOOTER_READ_SIZE = DataSize.of(15, MEGABYTE);
30+
private static final DataSize DEFAULT_MAX_PAGE_READ_SIZE = DataSize.of(500, MEGABYTE);
3031

3132
private final boolean ignoreStatistics;
3233
private final DataSize maxReadBlockSize;
@@ -38,6 +39,7 @@ public class ParquetReaderOptions
3839
private final DataSize smallFileThreshold;
3940
private final boolean vectorizedDecodingEnabled;
4041
private final DataSize maxFooterReadSize;
42+
private final DataSize maxPageReadSize;
4143

4244
private ParquetReaderOptions()
4345
{
@@ -51,6 +53,7 @@ private ParquetReaderOptions()
5153
smallFileThreshold = DEFAULT_SMALL_FILE_THRESHOLD;
5254
vectorizedDecodingEnabled = true;
5355
maxFooterReadSize = DEFAULT_MAX_FOOTER_READ_SIZE;
56+
maxPageReadSize = DEFAULT_MAX_PAGE_READ_SIZE;
5457
}
5558

5659
private ParquetReaderOptions(
@@ -63,7 +66,8 @@ private ParquetReaderOptions(
6366
boolean useBloomFilter,
6467
DataSize smallFileThreshold,
6568
boolean vectorizedDecodingEnabled,
66-
DataSize maxFooterReadSize)
69+
DataSize maxFooterReadSize,
70+
DataSize maxPageReadSize)
6771
{
6872
this.ignoreStatistics = ignoreStatistics;
6973
this.maxReadBlockSize = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null");
@@ -76,6 +80,7 @@ private ParquetReaderOptions(
7680
this.smallFileThreshold = requireNonNull(smallFileThreshold, "smallFileThreshold is null");
7781
this.vectorizedDecodingEnabled = vectorizedDecodingEnabled;
7882
this.maxFooterReadSize = requireNonNull(maxFooterReadSize, "maxFooterReadSize is null");
83+
this.maxPageReadSize = requireNonNull(maxPageReadSize, "maxPageReadSize is null");
7984
}
8085

8186
public static Builder builder()
@@ -143,6 +148,11 @@ public DataSize getMaxFooterReadSize()
143148
return maxFooterReadSize;
144149
}
145150

151+
public DataSize getMaxPageReadSize()
152+
{
153+
return maxPageReadSize;
154+
}
155+
146156
public static class Builder
147157
{
148158
private boolean ignoreStatistics;
@@ -155,6 +165,7 @@ public static class Builder
155165
private DataSize smallFileThreshold;
156166
private boolean vectorizedDecodingEnabled;
157167
private DataSize maxFooterReadSize;
168+
private DataSize maxPageReadSize;
158169

159170
private Builder(ParquetReaderOptions parquetReaderOptions)
160171
{
@@ -169,6 +180,7 @@ private Builder(ParquetReaderOptions parquetReaderOptions)
169180
this.smallFileThreshold = parquetReaderOptions.smallFileThreshold;
170181
this.vectorizedDecodingEnabled = parquetReaderOptions.vectorizedDecodingEnabled;
171182
this.maxFooterReadSize = parquetReaderOptions.maxFooterReadSize;
183+
this.maxPageReadSize = parquetReaderOptions.maxPageReadSize;
172184
}
173185

174186
public Builder withIgnoreStatistics(boolean ignoreStatistics)
@@ -231,6 +243,12 @@ public Builder withMaxFooterReadSize(DataSize maxFooterReadSize)
231243
return this;
232244
}
233245

246+
public Builder withMaxPageReadSize(DataSize maxPageReadSize)
247+
{
248+
this.maxPageReadSize = requireNonNull(maxPageReadSize, "maxPageSize is null");
249+
return this;
250+
}
251+
234252
public ParquetReaderOptions build()
235253
{
236254
return new ParquetReaderOptions(
@@ -243,7 +261,8 @@ public ParquetReaderOptions build()
243261
useBloomFilter,
244262
smallFileThreshold,
245263
vectorizedDecodingEnabled,
246-
maxFooterReadSize);
264+
maxFooterReadSize,
265+
maxPageReadSize);
247266
}
248267
}
249268
}

lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public static PageReader createPageReader(
7070
ColumnDescriptor columnDescriptor,
7171
@Nullable OffsetIndex offsetIndex,
7272
Optional<String> fileCreatedBy,
73-
Optional<FileDecryptionContext> decryptionContext)
73+
Optional<FileDecryptionContext> decryptionContext,
74+
long maxPageSizeInBytes)
7475
{
7576
// Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data.
7677
// Row-group column statistics can be used to identify such cases and switch to faster non-nullable read
@@ -86,7 +87,8 @@ public static PageReader createPageReader(
8687
metadata,
8788
columnChunk,
8889
offsetIndex,
89-
columnDecryptionContext);
90+
columnDecryptionContext,
91+
maxPageSizeInBytes);
9092

9193
return new PageReader(
9294
dataSourceId,

lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetColumnChunkIterator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Optional;
4040
import java.util.OptionalLong;
4141

42+
import static com.google.common.base.Preconditions.checkArgument;
4243
import static com.google.common.base.Preconditions.checkState;
4344
import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding;
4445
import static java.util.Objects.requireNonNull;
@@ -53,6 +54,7 @@ public final class ParquetColumnChunkIterator
5354
private final ChunkedInputStream input;
5455
private final OffsetIndex offsetIndex;
5556
private final Optional<ColumnDecryptionContext> decryptionContext;
57+
private final long maxPageReadSizeInBytes;
5658

5759
private long valueCount;
5860
private int dataPageCount;
@@ -67,7 +69,8 @@ public ParquetColumnChunkIterator(
6769
ColumnChunkMetadata metadata,
6870
ChunkedInputStream input,
6971
@Nullable OffsetIndex offsetIndex,
70-
Optional<ColumnDecryptionContext> decryptionContext)
72+
Optional<ColumnDecryptionContext> decryptionContext,
73+
long maxPageReadSizeInBytes)
7174
{
7275
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
7376
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
@@ -76,6 +79,8 @@ public ParquetColumnChunkIterator(
7679
this.input = requireNonNull(input, "input is null");
7780
this.offsetIndex = offsetIndex;
7881
this.decryptionContext = requireNonNull(decryptionContext, "decryptionContext is null");
82+
checkArgument(maxPageReadSizeInBytes > 0, "maxPageReadSizeInBytes must be positive");
83+
this.maxPageReadSizeInBytes = maxPageReadSizeInBytes;
7984
}
8085

8186
@Override
@@ -102,6 +107,16 @@ public Page next()
102107
PageHeader pageHeader = readPageHeader(decryptionContext.map(ColumnDecryptionContext::metadataDecryptor).orElse(null), pageHeaderAAD);
103108
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
104109
int compressedPageSize = pageHeader.getCompressed_page_size();
110+
111+
if (uncompressedPageSize > maxPageReadSizeInBytes) {
112+
throw new ParquetCorruptionException(
113+
dataSourceId,
114+
"Parquet page size %d bytes exceeds maximum allowed size %d bytes for column %s",
115+
uncompressedPageSize,
116+
maxPageReadSizeInBytes,
117+
descriptor);
118+
}
119+
105120
Page result = null;
106121
switch (pageHeader.type) {
107122
case DICTIONARY_PAGE:

lib/trino-parquet/src/main/java/io/trino/parquet/reader/ParquetReader.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,15 @@ private ColumnChunk readPrimitive(PrimitiveField field)
667667
}
668668
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
669669
columnReader.setPageReader(
670-
createPageReader(dataSource.getId(), columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy, decryptionContext),
670+
createPageReader(
671+
dataSource.getId(),
672+
columnChunkInputStream,
673+
metadata,
674+
columnDescriptor,
675+
offsetIndex,
676+
fileCreatedBy,
677+
decryptionContext,
678+
options.getMaxPageReadSize().toBytes()),
671679
Optional.ofNullable(rowRanges));
672680
}
673681
ColumnChunk columnChunk = columnReader.readPrimitive();

lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestPageReader.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,16 @@ public void singlePage(CompressionCodec compressionCodec, DataPageType dataPageT
111111
Slices.wrappedBuffer(Arrays.copyOf(bytes, headerSize + 1)),
112112
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 1, headerSize + 2)),
113113
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 2, bytes.length))));
114+
115+
// verify page size limit - should fail when page size exceeds limit
116+
long pageSize = pageHeader.getUncompressed_page_size();
117+
assertThatThrownBy(() -> {
118+
PageReader pageReader = createPageReader(valueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)), pageSize - 1);
119+
pageReader.readPage();
120+
})
121+
.isInstanceOf(RuntimeException.class)
122+
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
123+
.hasMessageContaining("exceeds maximum allowed size");
114124
}
115125

116126
@Test(dataProvider = "pageParameters")
@@ -151,6 +161,16 @@ public void manyPages(CompressionCodec compressionCodec, DataPageType dataPageTy
151161
Slices.wrappedBuffer(Arrays.copyOf(bytes, pageSize - 2)),
152162
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize - 2, pageSize * 2)),
153163
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize * 2, bytes.length))));
164+
165+
// verify page size limit - should fail when page size exceeds limit
166+
long uncompressedPageSize = pageHeader.getUncompressed_page_size();
167+
assertThatThrownBy(() -> {
168+
PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)), uncompressedPageSize - 1);
169+
pageReader.readPage();
170+
})
171+
.isInstanceOf(RuntimeException.class)
172+
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
173+
.hasMessageContaining("exceeds maximum allowed size");
154174
}
155175

156176
@Test(dataProvider = "pageParameters")
@@ -204,6 +224,17 @@ public void dictionaryPage(CompressionCodec compressionCodec, DataPageType dataP
204224
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryHeaderSize - 1, dictionaryPageSize - 1)),
205225
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize - 1, dictionaryPageSize + 1)),
206226
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize + 1, bytes.length))));
227+
228+
// verify page size limit - should fail when page size exceeds limit
229+
long uncompressedPageSize = pageHeader.getUncompressed_page_size();
230+
assertThatThrownBy(() -> {
231+
PageReader limitedReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes)), uncompressedPageSize - 1);
232+
limitedReader.readDictionaryPage();
233+
limitedReader.readPage();
234+
})
235+
.isInstanceOf(RuntimeException.class)
236+
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
237+
.hasMessageContaining("exceeds maximum allowed size");
207238
}
208239

209240
@Test
@@ -384,6 +415,11 @@ private static byte[] compress(CompressionCodec compressionCodec, byte[] bytes,
384415
}
385416

386417
private static PageReader createPageReader(int valueCount, CompressionCodec compressionCodec, boolean hasDictionary, List<Slice> slices)
418+
{
419+
return createPageReader(valueCount, compressionCodec, hasDictionary, slices, Long.MAX_VALUE);
420+
}
421+
422+
private static PageReader createPageReader(int valueCount, CompressionCodec compressionCodec, boolean hasDictionary, List<Slice> slices, long maxPageSize)
387423
{
388424
EncodingStats.Builder encodingStats = new EncodingStats.Builder();
389425
if (hasDictionary) {
@@ -409,7 +445,8 @@ private static PageReader createPageReader(int valueCount, CompressionCodec comp
409445
new ColumnDescriptor(new String[] {}, new PrimitiveType(REQUIRED, INT32, ""), 0, 0),
410446
null,
411447
Optional.empty(),
412-
Optional.empty());
448+
Optional.empty(),
449+
maxPageSize);
413450
}
414451

415452
private static void assertDataPageEquals(PageHeader pageHeader, byte[] dataPage, byte[] compressedDataPage, DataPage decompressedPage)

lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ public void testWrittenPageSize()
146146
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
147147
null,
148148
Optional.empty(),
149-
Optional.empty());
149+
Optional.empty(),
150+
Long.MAX_VALUE);
150151

151152
pageReader.readDictionaryPage();
152153
assertThat(pageReader.hasNext()).isTrue();
@@ -200,7 +201,8 @@ public void testWrittenPageValueCount()
200201
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
201202
null,
202203
Optional.empty(),
203-
Optional.empty());
204+
Optional.empty(),
205+
Long.MAX_VALUE);
204206

205207
pageReader.readDictionaryPage();
206208
assertThat(pageReader.hasNext()).isTrue();
@@ -220,7 +222,8 @@ public void testWrittenPageValueCount()
220222
new ColumnDescriptor(new String[] {"columnb"}, new PrimitiveType(REQUIRED, INT64, "columnb"), 0, 0),
221223
null,
222224
Optional.empty(),
223-
Optional.empty());
225+
Optional.empty(),
226+
Long.MAX_VALUE);
224227

225228
pageReader.readDictionaryPage();
226229
assertThat(pageReader.hasNext()).isTrue();

plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetReaderConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,23 @@ public DataSize getMaxFooterReadSize()
191191
return options.getMaxFooterReadSize();
192192
}
193193

194+
@Config("parquet.max-page-read-size")
195+
@ConfigDescription("Maximum allowed size of a parquet page during reads. Files with parquet pages larger than this will generate an exception on read")
196+
public ParquetReaderConfig setMaxPageReadSize(DataSize maxPageSize)
197+
{
198+
options = ParquetReaderOptions.builder(options)
199+
.withMaxPageReadSize(maxPageSize)
200+
.build();
201+
return this;
202+
}
203+
204+
@NotNull
205+
@MinDataSize("4MB")
206+
public DataSize getMaxPageReadSize()
207+
{
208+
return options.getMaxPageReadSize();
209+
}
210+
194211
public ParquetReaderOptions toParquetReaderOptions()
195212
{
196213
return options;

plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestParquetReaderConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void testDefaults()
4040
.setUseBloomFilter(true)
4141
.setSmallFileThreshold(DataSize.of(3, MEGABYTE))
4242
.setVectorizedDecodingEnabled(true)
43-
.setMaxFooterReadSize(DataSize.of(15, MEGABYTE)));
43+
.setMaxFooterReadSize(DataSize.of(15, MEGABYTE))
44+
.setMaxPageReadSize(DataSize.of(500, MEGABYTE)));
4445
}
4546

4647
@Test
@@ -57,6 +58,7 @@ public void testExplicitPropertyMappings()
5758
.put("parquet.small-file-threshold", "1kB")
5859
.put("parquet.experimental.vectorized-decoding.enabled", "false")
5960
.put("parquet.max-footer-read-size", "25MB")
61+
.put("parquet.max-page-read-size", "123MB")
6062
.buildOrThrow();
6163

6264
ParquetReaderConfig expected = new ParquetReaderConfig()
@@ -69,7 +71,8 @@ public void testExplicitPropertyMappings()
6971
.setUseBloomFilter(false)
7072
.setSmallFileThreshold(DataSize.of(1, KILOBYTE))
7173
.setVectorizedDecodingEnabled(false)
72-
.setMaxFooterReadSize(DataSize.of(25, MEGABYTE));
74+
.setMaxFooterReadSize(DataSize.of(25, MEGABYTE))
75+
.setMaxPageReadSize(DataSize.of(123, MEGABYTE));
7376

7477
assertFullMapping(properties, expected);
7578
}

0 commit comments

Comments
 (0)