Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/object-storage/file-formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ with Parquet files performed by supported object storage connectors:
This prevents workers from going into full GC or crashing due to poorly
configured Parquet writers.
- `15MB`
* - `parquet.max-page-read-size`
- Maximum allowed size of a parquet page during reads. Files with parquet pages
larger than this will generate an exception on read.
- `500MB`
:::

[](file-compression) is automatically performed and some details can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ParquetReaderOptions
private static final DataSize DEFAULT_MAX_BUFFER_SIZE = DataSize.of(8, MEGABYTE);
private static final DataSize DEFAULT_SMALL_FILE_THRESHOLD = DataSize.of(3, MEGABYTE);
private static final DataSize DEFAULT_MAX_FOOTER_READ_SIZE = DataSize.of(15, MEGABYTE);
private static final DataSize DEFAULT_MAX_PAGE_READ_SIZE = DataSize.of(500, MEGABYTE);

private final boolean ignoreStatistics;
private final DataSize maxReadBlockSize;
Expand All @@ -38,6 +39,7 @@ public class ParquetReaderOptions
private final DataSize smallFileThreshold;
private final boolean vectorizedDecodingEnabled;
private final DataSize maxFooterReadSize;
private final DataSize maxPageReadSize;

private ParquetReaderOptions()
{
Expand All @@ -51,6 +53,7 @@ private ParquetReaderOptions()
smallFileThreshold = DEFAULT_SMALL_FILE_THRESHOLD;
vectorizedDecodingEnabled = true;
maxFooterReadSize = DEFAULT_MAX_FOOTER_READ_SIZE;
maxPageReadSize = DEFAULT_MAX_PAGE_READ_SIZE;
}

private ParquetReaderOptions(
Expand All @@ -63,7 +66,8 @@ private ParquetReaderOptions(
boolean useBloomFilter,
DataSize smallFileThreshold,
boolean vectorizedDecodingEnabled,
DataSize maxFooterReadSize)
DataSize maxFooterReadSize,
DataSize maxPageReadSize)
{
this.ignoreStatistics = ignoreStatistics;
this.maxReadBlockSize = requireNonNull(maxReadBlockSize, "maxReadBlockSize is null");
Expand All @@ -76,6 +80,7 @@ private ParquetReaderOptions(
this.smallFileThreshold = requireNonNull(smallFileThreshold, "smallFileThreshold is null");
this.vectorizedDecodingEnabled = vectorizedDecodingEnabled;
this.maxFooterReadSize = requireNonNull(maxFooterReadSize, "maxFooterReadSize is null");
this.maxPageReadSize = requireNonNull(maxPageReadSize, "maxPageReadSize is null");
}

public static Builder builder()
Expand Down Expand Up @@ -143,6 +148,11 @@ public DataSize getMaxFooterReadSize()
return maxFooterReadSize;
}

public DataSize getMaxPageReadSize()
{
return maxPageReadSize;
}

public static class Builder
{
private boolean ignoreStatistics;
Expand All @@ -155,6 +165,7 @@ public static class Builder
private DataSize smallFileThreshold;
private boolean vectorizedDecodingEnabled;
private DataSize maxFooterReadSize;
private DataSize maxPageReadSize;

private Builder(ParquetReaderOptions parquetReaderOptions)
{
Expand All @@ -169,6 +180,7 @@ private Builder(ParquetReaderOptions parquetReaderOptions)
this.smallFileThreshold = parquetReaderOptions.smallFileThreshold;
this.vectorizedDecodingEnabled = parquetReaderOptions.vectorizedDecodingEnabled;
this.maxFooterReadSize = parquetReaderOptions.maxFooterReadSize;
this.maxPageReadSize = parquetReaderOptions.maxPageReadSize;
}

public Builder withIgnoreStatistics(boolean ignoreStatistics)
Expand Down Expand Up @@ -231,6 +243,12 @@ public Builder withMaxFooterReadSize(DataSize maxFooterReadSize)
return this;
}

public Builder withMaxPageReadSize(DataSize maxPageReadSize)
{
this.maxPageReadSize = requireNonNull(maxPageReadSize, "maxPageSize is null");
return this;
}

public ParquetReaderOptions build()
{
return new ParquetReaderOptions(
Expand All @@ -243,7 +261,8 @@ public ParquetReaderOptions build()
useBloomFilter,
smallFileThreshold,
vectorizedDecodingEnabled,
maxFooterReadSize);
maxFooterReadSize,
maxPageReadSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public static PageReader createPageReader(
ColumnDescriptor columnDescriptor,
@Nullable OffsetIndex offsetIndex,
Optional<String> fileCreatedBy,
Optional<FileDecryptionContext> decryptionContext)
Optional<FileDecryptionContext> decryptionContext,
long maxPageSizeInBytes)
{
// Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data.
// Row-group column statistics can be used to identify such cases and switch to faster non-nullable read
Expand All @@ -86,7 +87,8 @@ public static PageReader createPageReader(
metadata,
columnChunk,
offsetIndex,
columnDecryptionContext);
columnDecryptionContext,
maxPageSizeInBytes);

return new PageReader(
dataSourceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding;
import static java.util.Objects.requireNonNull;
Expand All @@ -53,6 +54,7 @@ public final class ParquetColumnChunkIterator
private final ChunkedInputStream input;
private final OffsetIndex offsetIndex;
private final Optional<ColumnDecryptionContext> decryptionContext;
private final long maxPageReadSizeInBytes;

private long valueCount;
private int dataPageCount;
Expand All @@ -67,7 +69,8 @@ public ParquetColumnChunkIterator(
ColumnChunkMetadata metadata,
ChunkedInputStream input,
@Nullable OffsetIndex offsetIndex,
Optional<ColumnDecryptionContext> decryptionContext)
Optional<ColumnDecryptionContext> decryptionContext,
long maxPageReadSizeInBytes)
{
this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null");
this.fileCreatedBy = requireNonNull(fileCreatedBy, "fileCreatedBy is null");
Expand All @@ -76,6 +79,8 @@ public ParquetColumnChunkIterator(
this.input = requireNonNull(input, "input is null");
this.offsetIndex = offsetIndex;
this.decryptionContext = requireNonNull(decryptionContext, "decryptionContext is null");
checkArgument(maxPageReadSizeInBytes > 0, "maxPageReadSizeInBytes must be positive");
this.maxPageReadSizeInBytes = maxPageReadSizeInBytes;
}

@Override
Expand All @@ -102,6 +107,16 @@ public Page next()
PageHeader pageHeader = readPageHeader(decryptionContext.map(ColumnDecryptionContext::metadataDecryptor).orElse(null), pageHeaderAAD);
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();

if (uncompressedPageSize > maxPageReadSizeInBytes) {
throw new ParquetCorruptionException(
dataSourceId,
"Parquet page size %d bytes exceeds maximum allowed size %d bytes for column %s",
uncompressedPageSize,
maxPageReadSizeInBytes,
descriptor);
}

Page result = null;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,15 @@ private ColumnChunk readPrimitive(PrimitiveField field)
}
ChunkedInputStream columnChunkInputStream = chunkReaders.get(new ChunkKey(fieldId, currentRowGroup));
columnReader.setPageReader(
createPageReader(dataSource.getId(), columnChunkInputStream, metadata, columnDescriptor, offsetIndex, fileCreatedBy, decryptionContext),
createPageReader(
dataSource.getId(),
columnChunkInputStream,
metadata,
columnDescriptor,
offsetIndex,
fileCreatedBy,
decryptionContext,
options.getMaxPageReadSize().toBytes()),
Optional.ofNullable(rowRanges));
}
ColumnChunk columnChunk = columnReader.readPrimitive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ public void singlePage(CompressionCodec compressionCodec, DataPageType dataPageT
Slices.wrappedBuffer(Arrays.copyOf(bytes, headerSize + 1)),
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 1, headerSize + 2)),
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, headerSize + 2, bytes.length))));

// verify page size limit - should fail when page size exceeds limit
long pageSize = pageHeader.getUncompressed_page_size();
assertThatThrownBy(() -> {
PageReader pageReader = createPageReader(valueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)), pageSize - 1);
pageReader.readPage();
})
.isInstanceOf(RuntimeException.class)
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
.hasMessageContaining("exceeds maximum allowed size");
}

@Test(dataProvider = "pageParameters")
Expand Down Expand Up @@ -151,6 +161,16 @@ public void manyPages(CompressionCodec compressionCodec, DataPageType dataPageTy
Slices.wrappedBuffer(Arrays.copyOf(bytes, pageSize - 2)),
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize - 2, pageSize * 2)),
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, pageSize * 2, bytes.length))));

// verify page size limit - should fail when page size exceeds limit
long uncompressedPageSize = pageHeader.getUncompressed_page_size();
assertThatThrownBy(() -> {
PageReader pageReader = createPageReader(totalValueCount, compressionCodec, false, ImmutableList.of(Slices.wrappedBuffer(bytes)), uncompressedPageSize - 1);
pageReader.readPage();
})
.isInstanceOf(RuntimeException.class)
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
.hasMessageContaining("exceeds maximum allowed size");
}

@Test(dataProvider = "pageParameters")
Expand Down Expand Up @@ -204,6 +224,17 @@ public void dictionaryPage(CompressionCodec compressionCodec, DataPageType dataP
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryHeaderSize - 1, dictionaryPageSize - 1)),
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize - 1, dictionaryPageSize + 1)),
Slices.wrappedBuffer(Arrays.copyOfRange(bytes, dictionaryPageSize + 1, bytes.length))));

// verify page size limit - should fail when page size exceeds limit
long uncompressedPageSize = pageHeader.getUncompressed_page_size();
assertThatThrownBy(() -> {
PageReader limitedReader = createPageReader(totalValueCount, compressionCodec, true, ImmutableList.of(Slices.wrappedBuffer(bytes)), uncompressedPageSize - 1);
limitedReader.readDictionaryPage();
limitedReader.readPage();
})
.isInstanceOf(RuntimeException.class)
.hasRootCauseInstanceOf(io.trino.parquet.ParquetCorruptionException.class)
.hasMessageContaining("exceeds maximum allowed size");
}

@Test
Expand Down Expand Up @@ -384,6 +415,11 @@ private static byte[] compress(CompressionCodec compressionCodec, byte[] bytes,
}

private static PageReader createPageReader(int valueCount, CompressionCodec compressionCodec, boolean hasDictionary, List<Slice> slices)
{
return createPageReader(valueCount, compressionCodec, hasDictionary, slices, Long.MAX_VALUE);
}

private static PageReader createPageReader(int valueCount, CompressionCodec compressionCodec, boolean hasDictionary, List<Slice> slices, long maxPageSize)
{
EncodingStats.Builder encodingStats = new EncodingStats.Builder();
if (hasDictionary) {
Expand All @@ -409,7 +445,8 @@ private static PageReader createPageReader(int valueCount, CompressionCodec comp
new ColumnDescriptor(new String[] {}, new PrimitiveType(REQUIRED, INT32, ""), 0, 0),
null,
Optional.empty(),
Optional.empty());
Optional.empty(),
maxPageSize);
}

private static void assertDataPageEquals(PageHeader pageHeader, byte[] dataPage, byte[] compressedDataPage, DataPage decompressedPage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public void testWrittenPageSize()
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
null,
Optional.empty(),
Optional.empty());
Optional.empty(),
Long.MAX_VALUE);

pageReader.readDictionaryPage();
assertThat(pageReader.hasNext()).isTrue();
Expand Down Expand Up @@ -200,7 +201,8 @@ public void testWrittenPageValueCount()
new ColumnDescriptor(new String[] {"columna"}, new PrimitiveType(REQUIRED, INT32, "columna"), 0, 0),
null,
Optional.empty(),
Optional.empty());
Optional.empty(),
Long.MAX_VALUE);

pageReader.readDictionaryPage();
assertThat(pageReader.hasNext()).isTrue();
Expand All @@ -220,7 +222,8 @@ public void testWrittenPageValueCount()
new ColumnDescriptor(new String[] {"columnb"}, new PrimitiveType(REQUIRED, INT64, "columnb"), 0, 0),
null,
Optional.empty(),
Optional.empty());
Optional.empty(),
Long.MAX_VALUE);

pageReader.readDictionaryPage();
assertThat(pageReader.hasNext()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,23 @@ public DataSize getMaxFooterReadSize()
return options.getMaxFooterReadSize();
}

@Config("parquet.max-page-read-size")
@ConfigDescription("Maximum allowed size of a parquet page during reads. Files with parquet pages larger than this will generate an exception on read")
public ParquetReaderConfig setMaxPageReadSize(DataSize maxPageSize)
{
options = ParquetReaderOptions.builder(options)
.withMaxPageReadSize(maxPageSize)
.build();
return this;
}

@NotNull
@MinDataSize("4MB")
public DataSize getMaxPageReadSize()
{
return options.getMaxPageReadSize();
}

public ParquetReaderOptions toParquetReaderOptions()
{
return options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void testDefaults()
.setUseBloomFilter(true)
.setSmallFileThreshold(DataSize.of(3, MEGABYTE))
.setVectorizedDecodingEnabled(true)
.setMaxFooterReadSize(DataSize.of(15, MEGABYTE)));
.setMaxFooterReadSize(DataSize.of(15, MEGABYTE))
.setMaxPageReadSize(DataSize.of(500, MEGABYTE)));
}

@Test
Expand All @@ -57,6 +58,7 @@ public void testExplicitPropertyMappings()
.put("parquet.small-file-threshold", "1kB")
.put("parquet.experimental.vectorized-decoding.enabled", "false")
.put("parquet.max-footer-read-size", "25MB")
.put("parquet.max-page-read-size", "123MB")
.buildOrThrow();

ParquetReaderConfig expected = new ParquetReaderConfig()
Expand All @@ -69,7 +71,8 @@ public void testExplicitPropertyMappings()
.setUseBloomFilter(false)
.setSmallFileThreshold(DataSize.of(1, KILOBYTE))
.setVectorizedDecodingEnabled(false)
.setMaxFooterReadSize(DataSize.of(25, MEGABYTE));
.setMaxFooterReadSize(DataSize.of(25, MEGABYTE))
.setMaxPageReadSize(DataSize.of(123, MEGABYTE));

assertFullMapping(properties, expected);
}
Expand Down