|
22 | 22 | #include <gmock/gmock.h> |
23 | 23 | #include <gtest/gtest.h> |
24 | 24 |
|
| 25 | +#include "arrow/array.h" |
25 | 26 | #include "arrow/io/buffered.h" |
26 | 27 | #include "arrow/io/file.h" |
27 | 28 | #include "arrow/testing/gtest_util.h" |
@@ -2380,5 +2381,87 @@ TYPED_TEST(TestColumnWriterMaxRowsPerPage, RequiredLargeChunk) { |
2380 | 2381 | } |
2381 | 2382 | } |
2382 | 2383 |
|
| 2384 | +class TestArrowWriteSerialize : public ::testing::Test { |
| 2385 | + public: |
| 2386 | + void SetUp() { |
| 2387 | + // Create a Parquet schema |
| 2388 | + // Int8 maps to Int32 in Parquet with INT_8 converted type |
| 2389 | + auto node = schema::PrimitiveNode::Make("int8_col", Repetition::OPTIONAL, Type::INT32, |
| 2390 | + ConvertedType::INT_8); |
| 2391 | + schema_descriptor_ = std::make_shared<ColumnDescriptor>(node, 1, 0); |
| 2392 | + sink_ = CreateOutputStream(); |
| 2393 | + writer_properties_ = default_writer_properties(); |
| 2394 | + } |
| 2395 | + |
| 2396 | + std::shared_ptr<TypedColumnWriter<Int32Type>> BuildWriter() { |
| 2397 | + metadata_ = |
| 2398 | + ColumnChunkMetaDataBuilder::Make(writer_properties_, schema_descriptor_.get()); |
| 2399 | + std::unique_ptr<PageWriter> pager = |
| 2400 | + PageWriter::Open(sink_, Compression::UNCOMPRESSED, metadata_.get()); |
| 2401 | + std::shared_ptr<ColumnWriter> writer = |
| 2402 | + ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get()); |
| 2403 | + return std::static_pointer_cast<TypedColumnWriter<Int32Type>>(writer); |
| 2404 | + } |
| 2405 | + |
| 2406 | + std::shared_ptr<TypedColumnReader<Int32Type>> BuildReader(int64_t num_rows) { |
| 2407 | + EXPECT_OK_AND_ASSIGN(auto buffer, sink_->Finish()); |
| 2408 | + auto source = std::make_shared<::arrow::io::BufferReader>(buffer); |
| 2409 | + ReaderProperties reader_properties; |
| 2410 | + std::unique_ptr<PageReader> page_reader = PageReader::Open( |
| 2411 | + std::move(source), num_rows, Compression::UNCOMPRESSED, reader_properties); |
| 2412 | + std::shared_ptr<ColumnReader> reader = |
| 2413 | + ColumnReader::Make(schema_descriptor_.get(), std::move(page_reader)); |
| 2414 | + return std::static_pointer_cast<TypedColumnReader<Int32Type>>(reader); |
| 2415 | + } |
| 2416 | + |
| 2417 | + protected: |
| 2418 | + std::shared_ptr<ColumnDescriptor> schema_descriptor_; |
| 2419 | + std::shared_ptr<::arrow::io::BufferOutputStream> sink_; |
| 2420 | + std::shared_ptr<WriterProperties> writer_properties_; |
| 2421 | + std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_; |
| 2422 | +}; |
| 2423 | + |
| 2424 | +TEST_F(TestArrowWriteSerialize, AllNulls) { |
| 2425 | + std::shared_ptr<::arrow::Buffer> null_bitmap; |
| 2426 | + ASSERT_OK_AND_ASSIGN(null_bitmap, ::arrow::AllocateBitmap(100)); |
| 2427 | + // Set all bits to 0 (null) |
| 2428 | + ::arrow::bit_util::SetBitsTo(null_bitmap->mutable_data(), 0, 100, false); |
| 2429 | + |
| 2430 | + std::shared_ptr<::arrow::Buffer> data_buffer = nullptr; |
| 2431 | + |
| 2432 | + auto array_data = |
| 2433 | + ::arrow::ArrayData::Make(::arrow::int8(), 100, {null_bitmap, data_buffer}, 100); |
| 2434 | + auto array = ::arrow::MakeArray(array_data); |
| 2435 | + |
| 2436 | + auto typed_writer = this->BuildWriter(); |
| 2437 | + |
| 2438 | + std::vector<int16_t> def_levels(100, 0); |
| 2439 | + std::vector<int16_t> rep_levels(100, 0); |
| 2440 | + |
| 2441 | + auto arrow_writer_properties = default_arrow_writer_properties(); |
| 2442 | + ArrowWriteContext ctx(::arrow::default_memory_pool(), arrow_writer_properties.get()); |
| 2443 | + |
| 2444 | + ASSERT_OK(typed_writer->WriteArrow(def_levels.data(), rep_levels.data(), 100, *array, |
| 2445 | + &ctx, true)); |
| 2446 | + |
| 2447 | + typed_writer->Close(); |
| 2448 | + |
| 2449 | + auto typed_reader = this->BuildReader(100); |
| 2450 | + int64_t values_read = 0; |
| 2451 | + std::vector<int16_t> def_levels_out(100); |
| 2452 | + std::vector<int16_t> rep_levels_out(100); |
| 2453 | + std::vector<int32_t> values_out(100); |
| 2454 | + |
| 2455 | + int64_t rows_read = typed_reader->ReadBatch( |
| 2456 | + 100, def_levels_out.data(), rep_levels_out.data(), values_out.data(), &values_read); |
| 2457 | + |
| 2458 | + ASSERT_EQ(100, rows_read); |
| 2459 | + ASSERT_EQ(0, values_read); |
| 2460 | + |
| 2461 | + for (int i = 0; i < 100; ++i) { |
| 2462 | + ASSERT_EQ(0, def_levels_out[i]); |
| 2463 | + } |
| 2464 | +} |
| 2465 | + |
2383 | 2466 | } // namespace test |
2384 | 2467 | } // namespace parquet |
0 commit comments