Skip to content

Commit

Permalink
apacheGH-38271: [C++][Parquet] Support reading parquet files with mul…
Browse files Browse the repository at this point in the history
…tiple gzip members (apache#38272)

### What changes are included in this PR?
Adding support in GZipCodec to decompress concatenated gzip members

### Are these changes tested?
test is attached

### Are there any user-facing changes?
no

* Closes: apache#38271

Lead-authored-by: amassalha <[email protected]>
Co-authored-by: Atheel Massalha <[email protected]>
Signed-off-by: mwish <[email protected]>
  • Loading branch information
amassalha and amassalha authored Nov 29, 2023
1 parent 61a4a80 commit 1d904d6
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 26 deletions.
41 changes: 41 additions & 0 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,47 @@ TEST_P(CodecTest, CodecRoundtrip) {
}
}

TEST(CodecTest, CodecRoundtripGzipMembers) {
std::unique_ptr<Codec> gzip_codec;
ASSERT_OK_AND_ASSIGN(gzip_codec, Codec::Create(Compression::GZIP));

for (int data_size : {0, 10000, 100000}) {
int64_t compressed_size_p1, compressed_size_p2;
uint32_t p1_size = data_size / 4;
uint32_t p2_size = data_size - p1_size;
std::vector<uint8_t> data_full = MakeRandomData(data_size);
std::vector<uint8_t> data_p1(data_full.begin(), data_full.begin() + p1_size);
std::vector<uint8_t> data_p2(data_full.begin() + p1_size, data_full.end());

int max_compressed_len_p1 =
static_cast<int>(gzip_codec->MaxCompressedLen(p1_size, data_p1.data()));
int max_compressed_len_p2 =
static_cast<int>(gzip_codec->MaxCompressedLen(p2_size, data_p2.data()));
std::vector<uint8_t> compressed(max_compressed_len_p1 + max_compressed_len_p2);

// Compress in 2 parts separately
ASSERT_OK_AND_ASSIGN(compressed_size_p1,
gzip_codec->Compress(p1_size, data_p1.data(),
max_compressed_len_p1, compressed.data()));
ASSERT_OK_AND_ASSIGN(
compressed_size_p2,
gzip_codec->Compress(p2_size, data_p2.data(), max_compressed_len_p2,
compressed.data() + compressed_size_p1));
compressed.resize(compressed_size_p1 + compressed_size_p2);

// Decompress the concatenated compressed gzip members
std::vector<uint8_t> decompressed(data_size);
int64_t actual_decompressed_size;
ASSERT_OK_AND_ASSIGN(
actual_decompressed_size,
gzip_codec->Decompress(compressed.size(), compressed.data(), decompressed.size(),
decompressed.data()));

ASSERT_EQ(data_size, actual_decompressed_size);
ASSERT_EQ(data_full, decompressed);
}
}

TEST(TestCodecMisc, SpecifyCompressionLevel) {
struct CombinationOption {
Compression::type codec;
Expand Down
59 changes: 34 additions & 25 deletions cpp/src/arrow/util/compression_zlib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ class GZipCodec : public Codec {

Result<int64_t> Decompress(int64_t input_length, const uint8_t* input,
int64_t output_buffer_length, uint8_t* output) override {
int64_t read_input_bytes = 0;
int64_t decompressed_bytes = 0;

if (!decompressor_initialized_) {
RETURN_NOT_OK(InitDecompressor());
}
Expand All @@ -392,40 +395,46 @@ class GZipCodec : public Codec {
return 0;
}

// Reset the stream for this block
if (inflateReset(&stream_) != Z_OK) {
return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
}
// inflate() will not automatically decode concatenated gzip members, keep calling
// inflate until reading all input data (GH-38271).
while (read_input_bytes < input_length) {
// Reset the stream for this block
if (inflateReset(&stream_) != Z_OK) {
return ZlibErrorPrefix("zlib inflateReset failed: ", stream_.msg);
}

int ret = 0;
// gzip can run in streaming mode or non-streaming mode. We only
// support the non-streaming use case where we present it the entire
// compressed input and a buffer big enough to contain the entire
// compressed output. In the case where we don't know the output,
// we just make a bigger buffer and try the non-streaming mode
// from the beginning again.
while (ret != Z_STREAM_END) {
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
stream_.avail_in = static_cast<uInt>(input_length);
stream_.next_out = reinterpret_cast<Bytef*>(output);
stream_.avail_out = static_cast<uInt>(output_buffer_length);
int ret = 0;
// gzip can run in streaming mode or non-streaming mode. We only
// support the non-streaming use case where we present it the entire
// compressed input and a buffer big enough to contain the entire
// compressed output. In the case where we don't know the output,
// we just make a bigger buffer and try the non-streaming mode
// from the beginning again.
stream_.next_in =
const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input + read_input_bytes));
stream_.avail_in = static_cast<uInt>(input_length - read_input_bytes);
stream_.next_out = reinterpret_cast<Bytef*>(output + decompressed_bytes);
stream_.avail_out = static_cast<uInt>(output_buffer_length - decompressed_bytes);

// We know the output size. In this case, we can use Z_FINISH
// which is more efficient.
ret = inflate(&stream_, Z_FINISH);
if (ret == Z_STREAM_END || ret != Z_OK) break;
if (ret == Z_OK) {
// Failure, buffer was too small
return Status::IOError("Too small a buffer passed to GZipCodec. InputLength=",
input_length, " OutputLength=", output_buffer_length);
}

// Failure, buffer was too small
return Status::IOError("Too small a buffer passed to GZipCodec. InputLength=",
input_length, " OutputLength=", output_buffer_length);
}
// Failure for some other reason
if (ret != Z_STREAM_END) {
return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
}

// Failure for some other reason
if (ret != Z_STREAM_END) {
return ZlibErrorPrefix("GZipCodec failed: ", stream_.msg);
read_input_bytes += stream_.total_in;
decompressed_bytes += stream_.total_out;
}

return stream_.total_out;
return decompressed_bytes;
}

int64_t MaxCompressedLen(int64_t input_length,
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ std::string rle_dict_uncompressed_corrupt_checksum() {
return data_file("rle-dict-uncompressed-corrupt-checksum.parquet");
}

std::string concatenated_gzip_members() {
return data_file("concatenated_gzip_members.parquet");
}

// TODO: Assert on definition and repetition levels
template <typename DType, typename ValueType>
void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size,
Expand Down Expand Up @@ -778,6 +782,25 @@ TEST_F(TestCheckDataPageCrc, CorruptDict) {
}
}

TEST(TestGzipMembersRead, TwoConcatenatedMembers) {
auto file_reader = ParquetFileReader::OpenFile(concatenated_gzip_members(),
/*memory_map=*/false);
auto col_reader = std::dynamic_pointer_cast<TypedColumnReader<Int64Type>>(
file_reader->RowGroup(0)->Column(0));
int64_t num_values = 0;
int64_t num_repdef = 0;
std::vector<int16_t> reps(1024);
std::vector<int16_t> defs(1024);
std::vector<int64_t> vals(1024);

num_repdef =
col_reader->ReadBatch(1024, defs.data(), reps.data(), vals.data(), &num_values);
EXPECT_EQ(num_repdef, 513);
for (int64_t i = 0; i < num_repdef; i++) {
EXPECT_EQ(i + 1, vals[i]);
}
}

TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) {
// PARQUET-816. Some files generated by older Parquet implementations may
// contain malformed data page metadata, and we can successfully decode them
Expand Down
2 changes: 1 addition & 1 deletion cpp/submodules/parquet-testing

0 comments on commit 1d904d6

Please sign in to comment.