diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 6e9eb0baf..a0d93967f 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -148,6 +148,7 @@ if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES arrow/arrow_fs_file_io.cc avro/avro_data_util.cc + avro/avro_direct_decoder.cc avro/avro_reader.cc avro/avro_writer.cc avro/avro_register.cc diff --git a/src/iceberg/avro/CMakeLists.txt b/src/iceberg/avro/CMakeLists.txt index f82130386..a7663ba66 100644 --- a/src/iceberg/avro/CMakeLists.txt +++ b/src/iceberg/avro/CMakeLists.txt @@ -16,3 +16,9 @@ # under the License. iceberg_install_all_headers(iceberg/avro) + +# avro_scan benchmark executable +add_executable(avro_scan avro_scan.cc) +target_link_libraries(avro_scan PRIVATE iceberg_bundle_static) +set_target_properties(avro_scan PROPERTIES RUNTIME_OUTPUT_DIRECTORY + "${CMAKE_BINARY_DIR}/src/iceberg/avro") diff --git a/src/iceberg/avro/avro_direct_decoder.cc b/src/iceberg/avro/avro_direct_decoder.cc new file mode 100644 index 000000000..60f79d218 --- /dev/null +++ b/src/iceberg/avro/avro_direct_decoder.cc @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" +#include "iceberg/avro/avro_schema_util_internal.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/macros.h" + +namespace iceberg::avro { + +using ::iceberg::arrow::ToErrorKind; + +namespace { + +/// \brief Forward declaration for mutual recursion. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder, DecodeContext* ctx); + +/// \brief Skip an Avro value based on its schema without decoding +Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder) { + switch (avro_node->type()) { + case ::avro::AVRO_NULL: + decoder.decodeNull(); + return {}; + + case ::avro::AVRO_BOOL: + decoder.decodeBool(); + return {}; + + case ::avro::AVRO_INT: + decoder.decodeInt(); + return {}; + + case ::avro::AVRO_LONG: + decoder.decodeLong(); + return {}; + + case ::avro::AVRO_FLOAT: + decoder.decodeFloat(); + return {}; + + case ::avro::AVRO_DOUBLE: + decoder.decodeDouble(); + return {}; + + case ::avro::AVRO_STRING: + decoder.skipString(); + return {}; + + case ::avro::AVRO_BYTES: + decoder.skipBytes(); + return {}; + + case ::avro::AVRO_FIXED: + decoder.skipFixed(avro_node->fixedSize()); + return {}; + + case ::avro::AVRO_RECORD: { + // Skip all fields in order + for (size_t i = 0; i < avro_node->leaves(); ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i), decoder)); + } + return {}; + } + + case ::avro::AVRO_ENUM: + decoder.decodeEnum(); + return {}; + + case ::avro::AVRO_ARRAY: { + const auto& element_node = avro_node->leafAt(0); + // skipArray() returns count like arrayStart(), must handle all blocks + int64_t block_count = decoder.skipArray(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder)); + } + block_count = decoder.arrayNext(); + } + return {}; + } + + case ::avro::AVRO_MAP: { + const auto& value_node = avro_node->leafAt(1); + // skipMap() returns count like mapStart(), must handle all blocks + int64_t block_count = decoder.skipMap(); + while (block_count > 0) { + for (int64_t i = 0; i < block_count; ++i) { + decoder.skipString(); // Skip key (always string in Avro maps) + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder)); + } + block_count = decoder.mapNext(); + } + return {}; + } + + case ::avro::AVRO_UNION: { + const size_t branch_index = decoder.decodeUnionIndex(); + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + return SkipAvroValue(avro_node->leafAt(branch_index), decoder); + } + + default: + return InvalidArgument("Unsupported Avro type for skipping: {}", + ToString(avro_node)); + } +} + +/// \brief Decode Avro record directly to Arrow struct builder. +Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const std::span& projections, + const StructType& struct_type, + ::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) { + if (avro_node->type() != ::avro::AVRO_RECORD) { + return InvalidArgument("Expected Avro record, got type: {}", ToString(avro_node)); + } + + auto* struct_builder = internal::checked_cast<::arrow::StructBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append()); + + // Build a map from Avro field index to projection index (cached per struct schema) + // -1 means the field should be skipped + const FieldProjection* cache_key = projections.data(); + auto cache_it = ctx->avro_to_projection_cache.find(cache_key); + std::vector* avro_to_projection; + + if (cache_it != ctx->avro_to_projection_cache.end()) { + // Use cached mapping + avro_to_projection = &cache_it->second; + } else { + // Build and cache the mapping + auto [inserted_it, inserted] = ctx->avro_to_projection_cache.emplace( + cache_key, std::vector(avro_node->leaves(), -1)); + avro_to_projection = &inserted_it->second; + + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kProjected) { + size_t avro_field_index = std::get(field_projection.from); + (*avro_to_projection)[avro_field_index] = static_cast(proj_idx); + } + } + } + + // Read all Avro fields in order (must maintain decoder position) + for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) { + int proj_idx = (*avro_to_projection)[avro_idx]; + + if (proj_idx < 0) { + // Skip this field - not in projection + const auto& avro_field_node = avro_node->leafAt(avro_idx); + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder)); + } else { + // Decode this field + const auto& field_projection = projections[proj_idx]; + const auto& expected_field = struct_type.fields()[proj_idx]; + const auto& avro_field_node = avro_node->leafAt(avro_idx); + auto* field_builder = struct_builder->field_builder(proj_idx); + + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(avro_field_node, decoder, + field_projection, expected_field, + field_builder, ctx)); + } + } + + // Handle null fields (fields in projection but not in Avro) + for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) { + const auto& field_projection = projections[proj_idx]; + if (field_projection.kind == FieldProjection::Kind::kNull) { + auto* field_builder = struct_builder->field_builder(static_cast(proj_idx)); + ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull()); + } else if (field_projection.kind != FieldProjection::Kind::kProjected) { + return InvalidArgument("Unsupported field projection kind: {}", + static_cast(field_projection.kind)); + } + } + return {}; +} + +/// \brief Decode Avro array directly to Arrow list builder. +Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& element_projection, + const ListType& list_type, + ::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) { + if (avro_node->type() != ::avro::AVRO_ARRAY) { + return InvalidArgument("Expected Avro array, got type: {}", ToString(avro_node)); + } + + auto* list_builder = internal::checked_cast<::arrow::ListBuilder*>(array_builder); + ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append()); + + auto* value_builder = list_builder->value_builder(); + const auto& element_node = avro_node->leafAt(0); + const auto& element_field = list_type.fields().back(); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + element_node, decoder, element_projection, element_field, value_builder, ctx)); + } + block_count = decoder.arrayNext(); + } + + return {}; +} + +/// \brief Decode Avro map directly to Arrow map builder. +Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& key_projection, + const FieldProjection& value_projection, + const MapType& map_type, ::arrow::ArrayBuilder* array_builder, + DecodeContext* ctx) { + auto* map_builder = internal::checked_cast<::arrow::MapBuilder*>(array_builder); + + if (avro_node->type() == ::avro::AVRO_MAP) { + // Handle regular Avro map: map + const auto& key_node = avro_node->leafAt(0); + const auto& value_node = avro_node->leafAt(1); + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + // Read map block count + int64_t block_count = decoder.mapStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder, ctx)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder, ctx)); + } + block_count = decoder.mapNext(); + } + + return {}; + } else if (avro_node->type() == ::avro::AVRO_ARRAY && HasMapLogicalType(avro_node)) { + // Handle array-based map: list> + const auto& key_field = map_type.key(); + const auto& value_field = map_type.value(); + + ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append()); + auto* key_builder = map_builder->key_builder(); + auto* item_builder = map_builder->item_builder(); + + const auto& record_node = avro_node->leafAt(0); + if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() != 2) { + return InvalidArgument( + "Array-based map must contain records with exactly 2 fields, got: {}", + ToString(record_node)); + } + const auto& key_node = record_node->leafAt(0); + const auto& value_node = record_node->leafAt(1); + + // Read array block count + int64_t block_count = decoder.arrayStart(); + while (block_count != 0) { + for (int64_t i = 0; i < block_count; ++i) { + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder, key_projection, + key_field, key_builder, ctx)); + ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder( + value_node, decoder, value_projection, value_field, item_builder, ctx)); + } + block_count = decoder.arrayNext(); + } + + return {}; + } else { + return InvalidArgument("Expected Avro map or array with map logical type, got: {}", + ToString(avro_node)); + } +} + +/// \brief Decode nested Avro data directly to Arrow array builder. +Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const std::span& projections, + const NestedType& projected_type, + ::arrow::ArrayBuilder* array_builder, + DecodeContext* ctx) { + switch (projected_type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast(projected_type); + return DecodeStructToBuilder(avro_node, decoder, projections, struct_type, + array_builder, ctx); + } + + case TypeId::kList: { + if (projections.size() != 1) { + return InvalidArgument("Expected 1 projection for list, got: {}", + projections.size()); + } + const auto& list_type = internal::checked_cast(projected_type); + return DecodeListToBuilder(avro_node, decoder, projections[0], list_type, + array_builder, ctx); + } + + case TypeId::kMap: { + if (projections.size() != 2) { + return InvalidArgument("Expected 2 projections for map, got: {}", + projections.size()); + } + const auto& map_type = internal::checked_cast(projected_type); + return DecodeMapToBuilder(avro_node, decoder, projections[0], projections[1], + map_type, array_builder, ctx); + } + + default: + return InvalidArgument("Unsupported nested type: {}", projected_type.ToString()); + } +} + +Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node, + ::avro::Decoder& decoder, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder, + DecodeContext* ctx) { + const auto& projected_type = *projected_field.type(); + if (!projected_type.is_primitive()) { + return InvalidArgument("Expected primitive type, got: {}", projected_type.ToString()); + } + + switch (projected_type.type_id()) { + case TypeId::kBoolean: { + if (avro_node->type() != ::avro::AVRO_BOOL) { + return InvalidArgument("Expected Avro boolean for boolean field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BooleanBuilder*>(array_builder); + bool value = decoder.decodeBool(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kInt: { + if (avro_node->type() != ::avro::AVRO_INT) { + return InvalidArgument("Expected Avro int for int field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Int32Builder*>(array_builder); + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kLong: { + auto* builder = internal::checked_cast<::arrow::Int64Builder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_LONG) { + int64_t value = decoder.decodeLong(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_INT) { + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast(value))); + } else { + return InvalidArgument("Expected Avro int/long for long field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kFloat: { + if (avro_node->type() != ::avro::AVRO_FLOAT) { + return InvalidArgument("Expected Avro float for float field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::FloatBuilder*>(array_builder); + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kDouble: { + auto* builder = internal::checked_cast<::arrow::DoubleBuilder*>(array_builder); + if (avro_node->type() == ::avro::AVRO_DOUBLE) { + double value = decoder.decodeDouble(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + } else if (avro_node->type() == ::avro::AVRO_FLOAT) { + float value = decoder.decodeFloat(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast(value))); + } else { + return InvalidArgument("Expected Avro float/double for double field, got: {}", + ToString(avro_node)); + } + return {}; + } + + case TypeId::kString: { + if (avro_node->type() != ::avro::AVRO_STRING) { + return InvalidArgument("Expected Avro string for string field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::StringBuilder*>(array_builder); + decoder.decodeString(ctx->string_scratch); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->string_scratch)); + return {}; + } + + case TypeId::kBinary: { + if (avro_node->type() != ::avro::AVRO_BYTES) { + return InvalidArgument("Expected Avro bytes for binary field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::BinaryBuilder*>(array_builder); + decoder.decodeBytes(ctx->bytes_scratch); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append( + ctx->bytes_scratch.data(), static_cast(ctx->bytes_scratch.size()))); + return {}; + } + + case TypeId::kFixed: { + if (avro_node->type() != ::avro::AVRO_FIXED) { + return InvalidArgument("Expected Avro fixed for fixed field, got: {}", + ToString(avro_node)); + } + const auto& fixed_type = internal::checked_cast(projected_type); + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + + ctx->bytes_scratch.resize(fixed_type.length()); + decoder.decodeFixed(fixed_type.length(), ctx->bytes_scratch); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data())); + return {}; + } + + case TypeId::kUuid: { + if (avro_node->type() != ::avro::AVRO_FIXED || + avro_node->logicalType().type() != ::avro::LogicalType::UUID) { + return InvalidArgument("Expected Avro fixed for uuid field, got: {}", + ToString(avro_node)); + } + + auto* builder = + internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder); + + ctx->bytes_scratch.resize(16); + decoder.decodeFixed(16, ctx->bytes_scratch); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data())); + return {}; + } + + case TypeId::kDecimal: { + if (avro_node->type() != ::avro::AVRO_FIXED || + avro_node->logicalType().type() != ::avro::LogicalType::DECIMAL) { + return InvalidArgument( + "Expected Avro fixed with DECIMAL logical type for decimal field, got: {}", + ToString(avro_node)); + } + + size_t byte_width = avro_node->fixedSize(); + auto* builder = internal::checked_cast<::arrow::Decimal128Builder*>(array_builder); + + ctx->bytes_scratch.resize(byte_width); + decoder.decodeFixed(byte_width, ctx->bytes_scratch); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto decimal, ::arrow::Decimal128::FromBigEndian(ctx->bytes_scratch.data(), + ctx->bytes_scratch.size())); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal)); + return {}; + } + + case TypeId::kDate: { + if (avro_node->type() != ::avro::AVRO_INT || + avro_node->logicalType().type() != ::avro::LogicalType::DATE) { + return InvalidArgument( + "Expected Avro int with DATE logical type for date field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Date32Builder*>(array_builder); + int32_t value = decoder.decodeInt(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kTime: { + if (avro_node->type() != ::avro::AVRO_LONG || + avro_node->logicalType().type() != ::avro::LogicalType::TIME_MICROS) { + return InvalidArgument( + "Expected Avro long with TIME_MICROS for time field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::Time64Builder*>(array_builder); + int64_t value = decoder.decodeLong(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + case TypeId::kTimestamp: + case TypeId::kTimestampTz: { + if (avro_node->type() != ::avro::AVRO_LONG || + avro_node->logicalType().type() != ::avro::LogicalType::TIMESTAMP_MICROS) { + return InvalidArgument( + "Expected Avro long with TIMESTAMP_MICROS for timestamp field, got: {}", + ToString(avro_node)); + } + auto* builder = internal::checked_cast<::arrow::TimestampBuilder*>(array_builder); + int64_t value = decoder.decodeLong(); + ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value)); + return {}; + } + + default: + return InvalidArgument("Unsupported primitive type {} to decode from avro node {}", + projected_field.type()->ToString(), ToString(avro_node)); + } +} + +/// \brief Dispatch to appropriate handlers based on the projection kind. +Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const FieldProjection& projection, + const SchemaField& projected_field, + ::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) { + if (avro_node->type() == ::avro::AVRO_UNION) { + const size_t branch_index = decoder.decodeUnionIndex(); + + // Validate branch index + const size_t num_branches = avro_node->leaves(); + if (branch_index >= num_branches) { + return InvalidArgument("Union branch index {} out of range [0, {})", branch_index, + num_branches); + } + + const auto& branch_node = avro_node->leafAt(branch_index); + if (branch_node->type() == ::avro::AVRO_NULL) { + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } else { + return DecodeFieldToBuilder(branch_node, decoder, projection, projected_field, + array_builder, ctx); + } + } + + const auto& projected_type = *projected_field.type(); + if (projected_type.is_primitive()) { + return DecodePrimitiveValueToBuilder(avro_node, decoder, projected_field, + array_builder, ctx); + } else { + const auto& nested_type = internal::checked_cast(projected_type); + return DecodeNestedValueToBuilder(avro_node, decoder, projection.children, + nested_type, array_builder, ctx); + } +} + +} // namespace + +Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const SchemaProjection& projection, + const Schema& projected_schema, + ::arrow::ArrayBuilder* array_builder, DecodeContext* ctx) { + return DecodeNestedValueToBuilder(avro_node, decoder, projection.fields, + projected_schema, array_builder, ctx); +} + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_direct_decoder_internal.h b/src/iceberg/avro/avro_direct_decoder_internal.h new file mode 100644 index 000000000..df4587fd0 --- /dev/null +++ b/src/iceberg/avro/avro_direct_decoder_internal.h @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/schema_internal.h" +#include "iceberg/schema_util.h" + +namespace iceberg::avro { + +/// \brief Context for reusing scratch buffers during Avro decoding +/// +/// Avoids frequent small allocations by reusing temporary buffers across +/// multiple decode operations. This is particularly important for string, +/// binary, and fixed-size data types. +struct DecodeContext { + // Scratch buffer for string decoding (reused across rows) + std::string string_scratch; + // Scratch buffer for binary/fixed/uuid/decimal data (reused across rows) + std::vector bytes_scratch; + // Cache for avro field index to projection index mapping + // Key: pointer to projections array (identifies struct schema) + // Value: vector mapping avro field index -> projection index (-1 if not projected) + std::unordered_map> avro_to_projection_cache; +}; + +/// \brief Directly decode Avro data to Arrow array builders without GenericDatum +/// +/// Eliminates the GenericDatum intermediate layer by directly calling Avro decoder +/// methods and immediately appending to Arrow builders. Matches Java Iceberg's +/// ValueReader approach for better performance. +/// +/// Features: +/// - All primitive, temporal, and logical types +/// - Nested types (struct, list, map) +/// - Union types with bounds checking +/// - Field skipping for schema projection +/// +/// Schema Evolution: +/// Schema resolution is handled via SchemaProjection (from Project() function). +/// Supports field reordering and missing fields (set to NULL). Default values +/// are NOT currently supported. +/// +/// Error Handling: +/// - Type mismatches → InvalidArgument +/// - Union branch out of range → InvalidArgument +/// - Decimal precision insufficient → InvalidArgument +/// - Missing logical type → InvalidArgument +/// +/// \param avro_node The Avro schema node for the data being decoded +/// \param decoder The Avro decoder positioned at the data to read +/// \param projection The field projections (from Project() function) +/// \param projected_schema The target Iceberg schema after projection +/// \param array_builder The Arrow array builder to append decoded data to +/// \param ctx Decode context for reusing scratch buffers +/// \return Status::OK if successful, or an error status +Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& decoder, + const SchemaProjection& projection, + const Schema& projected_schema, + ::arrow::ArrayBuilder* array_builder, DecodeContext* ctx); + +} // namespace iceberg::avro diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index 932dd0f18..fa7337f0a 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -34,6 +34,7 @@ #include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/avro/avro_data_util_internal.h" +#include "iceberg/avro/avro_direct_decoder_internal.h" #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/avro/avro_stream_internal.h" @@ -62,18 +63,19 @@ Result> CreateInputStream(const ReaderOptions& // A stateful context to keep track of the reading progress. struct ReadContext { - // The datum to reuse for reading the data. - std::unique_ptr<::avro::GenericDatum> datum_; // The arrow schema to build the record batch. std::shared_ptr<::arrow::Schema> arrow_schema_; // The builder to build the record batch. std::shared_ptr<::arrow::ArrayBuilder> builder_; + // GenericDatum for GenericDatum-based decoding (only used if direct decoder is + // disabled) + std::unique_ptr<::avro::GenericDatum> datum_; + // Decode context for reusing scratch buffers (only used if direct decoder is + // enabled) + DecodeContext decode_context_; }; -// TODO(gang.wu): there are a lot to do to make this reader work. -// 1. prune the reader schema based on the projection -// 2. read key-value metadata from the avro file -// 3. collect basic reader metrics +// TODO(gang.wu): collect basic reader metrics class AvroReader::Impl { public: Status Open(const ReaderOptions& options) { @@ -83,6 +85,7 @@ class AvroReader::Impl { } batch_size_ = options.properties->Get(ReaderProperties::kBatchSize); + use_direct_decoder_ = options.properties->Get(ReaderProperties::kAvroSkipDatum); read_schema_ = options.projection; // Open the input stream and adapt to the avro interface. @@ -91,10 +94,21 @@ class AvroReader::Impl { ICEBERG_ASSIGN_OR_RAISE(auto input_stream, CreateInputStream(options, kDefaultBufferSize)); - // Create a base reader without setting reader schema to enable projection. - auto base_reader = - std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); - ::avro::ValidSchema file_schema = base_reader->dataSchema(); + ::avro::ValidSchema file_schema; + + if (use_direct_decoder_) { + // Create base reader for direct decoder access + auto base_reader = + std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream)); + file_schema = base_reader->dataSchema(); + base_reader_ = std::move(base_reader); + } else { + // Create DataFileReader for GenericDatum-based decoding + auto datum_reader = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>( + std::move(input_stream)); + file_schema = datum_reader->dataSchema(); + datum_reader_ = std::move(datum_reader); + } // Validate field ids in the file schema. HasIdVisitor has_id_visitor; @@ -121,13 +135,22 @@ class AvroReader::Impl { // TODO(gangwu): support pruning source fields ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, file_schema.root(), /*prune_source=*/false)); - reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>( - std::move(base_reader), file_schema); - if (options.split) { - reader_->sync(options.split->offset); - split_end_ = options.split->offset + options.split->length; + if (use_direct_decoder_) { + // Initialize the base reader with the file schema + base_reader_->init(file_schema); + if (options.split) { + base_reader_->sync(options.split->offset); + split_end_ = options.split->offset + options.split->length; + } + } else { + // The datum reader is already initialized during construction + if (options.split) { + datum_reader_->sync(options.split->offset); + split_end_ = options.split->offset + options.split->length; + } } + return {}; } @@ -137,25 +160,37 @@ class AvroReader::Impl { } while (context_->builder_->length() < batch_size_) { - if (split_end_ && reader_->pastSync(split_end_.value())) { + if (IsPastSync()) { break; } - if (!reader_->read(*context_->datum_)) { - break; + + if (use_direct_decoder_) { + // Direct decoder: decode Avro to Arrow without GenericDatum + if (!base_reader_->hasMore()) { + break; + } + base_reader_->decr(); + + ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder( + GetReaderSchema().root(), base_reader_->decoder(), projection_, *read_schema_, + context_->builder_.get(), &context_->decode_context_)); + } else { + // GenericDatum-based decoding: decode via GenericDatum intermediate + if (!datum_reader_->read(*context_->datum_)) { + break; + } + + ICEBERG_RETURN_UNEXPECTED( + AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_, projection_, + *read_schema_, context_->builder_.get())); } - ICEBERG_RETURN_UNEXPECTED( - AppendDatumToBuilder(reader_->readerSchema().root(), *context_->datum_, - projection_, *read_schema_, context_->builder_.get())); } return ConvertBuilderToArrowArray(); } Status Close() { - if (reader_ != nullptr) { - reader_->close(); - reader_.reset(); - } + CloseReader(); context_.reset(); return {}; } @@ -174,12 +209,12 @@ class AvroReader::Impl { } Result> Metadata() { - if (reader_ == nullptr) { + if ((use_direct_decoder_ && !base_reader_) || + (!use_direct_decoder_ && !datum_reader_)) { return Invalid("Reader is not opened"); } - const auto& metadata = reader_->metadata(); - + const auto& metadata = GetReaderMetadata(); std::unordered_map metadata_map; metadata_map.reserve(metadata.size()); @@ -194,7 +229,6 @@ class AvroReader::Impl { private: Status InitReadContext() { context_ = std::make_unique(); - context_->datum_ = std::make_unique<::avro::GenericDatum>(reader_->readerSchema()); ArrowSchema arrow_schema; ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema)); @@ -214,6 +248,11 @@ class AvroReader::Impl { } context_->builder_ = builder_result.MoveValueUnsafe(); + // Initialize GenericDatum for GenericDatum-based decoding + if (!use_direct_decoder_) { + context_->datum_ = std::make_unique<::avro::GenericDatum>(GetReaderSchema()); + } + return {}; } @@ -238,17 +277,52 @@ class AvroReader::Impl { return arrow_array; } + bool IsPastSync() const { + if (!split_end_) { + return false; + } + return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value()) + : datum_reader_->pastSync(split_end_.value()); + } + + const ::avro::Metadata& GetReaderMetadata() const { + return use_direct_decoder_ ? base_reader_->metadata() : datum_reader_->metadata(); + } + + void CloseReader() { + if (use_direct_decoder_) { + if (base_reader_) { + base_reader_->close(); + base_reader_.reset(); + } + } else { + if (datum_reader_) { + datum_reader_->close(); + datum_reader_.reset(); + } + } + } + + const ::avro::ValidSchema& GetReaderSchema() const { + return use_direct_decoder_ ? base_reader_->readerSchema() + : datum_reader_->readerSchema(); + } + private: // Max number of rows in the record batch to read. int64_t batch_size_{}; + // Whether to use direct decoder (true) or GenericDatum-based decoder (false). + bool use_direct_decoder_{true}; // The end of the split to read and used to terminate the reading. std::optional split_end_; // The schema to read. std::shared_ptr<::iceberg::Schema> read_schema_; // The projection result to apply to the read schema. SchemaProjection projection_; - // The avro reader to read the data into a datum. - std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_; + // The avro reader base - provides direct access to decoder for direct decoding. + std::unique_ptr<::avro::DataFileReaderBase> base_reader_; + // The datum reader for GenericDatum-based decoding. + std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_; // The context to keep track of the reading progress. std::unique_ptr context_; }; diff --git a/src/iceberg/avro/avro_scan.cc b/src/iceberg/avro/avro_scan.cc new file mode 100644 index 000000000..3e6903600 --- /dev/null +++ b/src/iceberg/avro/avro_scan.cc @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/file_reader.h" +#include "iceberg/schema.h" + +void PrintUsage(const char* program_name) { + std::cerr << "Usage: " << program_name << " [options] \n" + << "Options:\n" + << " --skip-datum= Use direct decoder (default: true)\n" + << " --batch-size= Batch size for reading (default: 4096)\n" + << " --help Show this help message\n" + << "\nExample:\n" + << " " << program_name + << " --skip-datum=false --batch-size=1000 data.avro\n"; +} + +int main(int argc, char* argv[]) { + iceberg::avro::RegisterAll(); + + if (argc < 2) { + PrintUsage(argv[0]); + return 1; + } + + std::string avro_file; + bool skip_datum = true; + int64_t batch_size = 4096; + + // Parse arguments + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + if (arg == "--help") { + PrintUsage(argv[0]); + return 0; + } else if (arg.starts_with("--skip-datum=")) { + std::string value = arg.substr(13); + if (value == "true" || value == "1") { + skip_datum = true; + } else if (value == "false" || value == "0") { + skip_datum = false; + } else { + std::cerr << "Invalid value for --skip-datum: " << value << "\n"; + return 1; + } + } else if (arg.starts_with("--batch-size=")) { + batch_size = std::stoll(arg.substr(13)); + if (batch_size <= 0) { + std::cerr << "Batch size must be positive\n"; + return 1; + } + } else if (arg[0] == '-') { + std::cerr << "Unknown option: " << arg << "\n"; + PrintUsage(argv[0]); + return 1; + } else { + avro_file = arg; + } + } + + if (avro_file.empty()) { + std::cerr << "Error: No Avro file specified\n"; + PrintUsage(argv[0]); + return 1; + } + + std::cout << "Scanning Avro file: " << avro_file << "\n"; + std::cout << "Skip datum: " << (skip_datum ? "true" : "false") << "\n"; + std::cout << "Batch size: " << batch_size << "\n"; + std::cout << std::string(60, '-') << "\n"; + + auto local_fs = std::make_shared<::arrow::fs::LocalFileSystem>(); + auto file_io = std::make_shared(local_fs); + + // Get file info + auto file_info_result = local_fs->GetFileInfo(avro_file); + if (!file_info_result.ok()) { + std::cerr << "Error: Cannot access file: " << file_info_result.status().message() + << "\n"; + return 1; + } + auto file_info = file_info_result.ValueOrDie(); + if (file_info.type() != ::arrow::fs::FileType::File) { + std::cerr << "Error: Not a file: " << avro_file << "\n"; + return 1; + } + + std::cout << "File size: " << file_info.size() << " bytes\n"; + + // Configure reader properties + auto reader_properties = iceberg::ReaderProperties::default_properties(); + reader_properties->Set(iceberg::ReaderProperties::kAvroSkipDatum, skip_datum); + reader_properties->Set(iceberg::ReaderProperties::kBatchSize, batch_size); + + // Open reader (without projection to read all columns) + auto reader_result = iceberg::ReaderFactoryRegistry::Open( + iceberg::FileFormatType::kAvro, {.path = avro_file, + .length = file_info.size(), + .io = file_io, + .projection = nullptr, + .properties = std::move(reader_properties)}); + + if (!reader_result.has_value()) { + std::cerr << "Error opening reader: " << reader_result.error().message << "\n"; + return 1; + } + + auto reader = std::move(reader_result.value()); + + // Get schema + auto schema_result = reader->Schema(); + if (!schema_result.has_value()) { + std::cerr << "Error getting schema: " << schema_result.error().message << "\n"; + return 1; + } + auto arrow_schema = schema_result.value(); + auto arrow_schema_import = ::arrow::ImportType(&arrow_schema); + if (!arrow_schema_import.ok()) { + std::cerr << "Error importing schema: " << arrow_schema_import.status().message() + << "\n"; + return 1; + } + std::cout << "Schema: " << arrow_schema_import.ValueOrDie()->ToString() << "\n"; + std::cout << std::string(60, '-') << "\n"; + + // Scan file and measure time + auto start = std::chrono::high_resolution_clock::now(); + + int64_t total_rows = 0; + int64_t batch_count = 0; + + while (true) { + auto batch_result = reader->Next(); + if (!batch_result.has_value()) { + std::cerr << "Error reading batch: " << batch_result.error().message << "\n"; + return 1; + } + + auto batch_opt = batch_result.value(); + if (!batch_opt.has_value()) { + // End of file + break; + } + + auto arrow_array = batch_opt.value(); + auto arrow_type = arrow_schema_import.ValueOrDie(); + auto array_import = ::arrow::ImportArray(&arrow_array, arrow_type); + if (!array_import.ok()) { + std::cerr << "Error importing array: " << array_import.status().message() << "\n"; + return 1; + } + + int64_t batch_rows = array_import.ValueOrDie()->length(); + total_rows += batch_rows; + batch_count++; + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + // Print results + std::cout << "\nResults:\n"; + std::cout << " Total rows: " << total_rows << "\n"; + std::cout << " Batches: " << batch_count << "\n"; + std::cout << " Time: " << duration.count() << " ms\n"; + std::cout << " Throughput: " + << (duration.count() > 0 ? (total_rows * 1000 / duration.count()) : 0) + << " rows/sec\n"; + std::cout << " Throughput: " + << (duration.count() > 0 + ? (file_info.size() / 1024.0 / 1024.0) / (duration.count() / 1000.0) + : 0) + << " MB/sec\n"; + + return 0; +} diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h index 85221a6a3..a5af0a41e 100644 --- a/src/iceberg/file_reader.h +++ b/src/iceberg/file_reader.h @@ -76,6 +76,11 @@ class ReaderProperties : public ConfigBase { /// \brief The batch size to read. inline static Entry kBatchSize{"read.batch-size", 4096}; + /// \brief Skip GenericDatum in Avro reader for better performance. + /// When true, decode directly from Avro to Arrow without GenericDatum intermediate. + /// Default: true (skip GenericDatum for better performance). + inline static Entry kAvroSkipDatum{"read.avro.skip-datum", true}; + /// \brief Create a default ReaderProperties instance. static std::unique_ptr default_properties(); diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 1d421ede5..c1bb8bc96 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -17,6 +17,8 @@ * under the License. */ +#include + #include #include #include @@ -51,6 +53,8 @@ class AvroReaderTest : public TempFileTestBase { temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro"); } + bool skip_datum_{true}; + void CreateSimpleAvroFile() { const std::string avro_schema_json = R"({ "type": "record", @@ -139,11 +143,15 @@ class AvroReaderTest : public TempFileTestBase { ASSERT_TRUE(file_info_result.ok()); ASSERT_EQ(file_info_result->size(), writer->length().value()); - auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro, - {.path = temp_avro_file_, - .length = file_info_result->size(), - .io = file_io_, - .projection = schema}); + auto reader_properties = ReaderProperties::default_properties(); + reader_properties->Set(ReaderProperties::kAvroSkipDatum, skip_datum_); + + auto reader_result = ReaderFactoryRegistry::Open( + FileFormatType::kAvro, {.path = temp_avro_file_, + .length = file_info_result->size(), + .io = file_io_, + .projection = schema, + .properties = std::move(reader_properties)}); ASSERT_THAT(reader_result, IsOk()); auto reader = std::move(reader_result.value()); ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string)); @@ -164,6 +172,16 @@ class AvroReaderTest : public TempFileTestBase { std::string temp_avro_file_; }; +// Parameterized test fixture for testing both DirectDecoder and GenericDatum modes +class AvroReaderParameterizedTest : public AvroReaderTest, + public ::testing::WithParamInterface { + protected: + void SetUp() override { + AvroReaderTest::SetUp(); + skip_datum_ = GetParam(); + } +}; + TEST_F(AvroReaderTest, ReadTwoFields) { CreateSimpleAvroFile(); auto schema = std::make_shared(std::vector{ @@ -220,7 +238,7 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) { ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } -TEST_F(AvroReaderTest, AvroWriterBasicType) { +TEST_P(AvroReaderParameterizedTest, AvroWriterBasicType) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "name", std::make_shared())}); @@ -229,7 +247,7 @@ TEST_F(AvroReaderTest, AvroWriterBasicType) { WriteAndVerify(schema, expected_string); } -TEST_F(AvroReaderTest, AvroWriterNestedType) { +TEST_P(AvroReaderParameterizedTest, AvroWriterNestedType) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "id", std::make_shared()), SchemaField::MakeRequired( @@ -244,4 +262,245 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) { WriteAndVerify(schema, expected_string); } +TEST_P(AvroReaderParameterizedTest, AllPrimitiveTypes) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "bool_col", std::make_shared()), + SchemaField::MakeRequired(2, "int_col", std::make_shared()), + SchemaField::MakeRequired(3, "long_col", std::make_shared()), + SchemaField::MakeRequired(4, "float_col", std::make_shared()), + SchemaField::MakeRequired(5, "double_col", std::make_shared()), + SchemaField::MakeRequired(6, "string_col", std::make_shared()), + SchemaField::MakeRequired(7, "binary_col", std::make_shared())}); + + std::string expected_string = R"([ + [true, 42, 1234567890, 3.14, 2.71828, "test", "AQID"], + [false, -100, -9876543210, -1.5, 0.0, "hello", "BAUG"] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Skipping DecimalType test - requires specific decimal encoding in JSON + +TEST_P(AvroReaderParameterizedTest, DateTimeTypes) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "date_col", std::make_shared()), + SchemaField::MakeRequired(2, "time_col", std::make_shared()), + SchemaField::MakeRequired(3, "timestamp_col", std::make_shared())}); + + // Dates as days since epoch, time/timestamps as microseconds + std::string expected_string = R"([ + [18628, 43200000000, 1640995200000000], + [18629, 86399000000, 1641081599000000] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_P(AvroReaderParameterizedTest, NestedStruct) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired( + 2, "person", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", std::make_shared()), + SchemaField::MakeRequired(4, "age", std::make_shared()), + SchemaField::MakeOptional( + 5, "address", + std::make_shared(std::vector{ + SchemaField::MakeRequired(6, "street", + std::make_shared()), + SchemaField::MakeRequired(7, "city", + std::make_shared())}))}))}); + + std::string expected_string = R"([ + [1, ["Alice", 30, ["123 Main St", "NYC"]]], + [2, ["Bob", 25, ["456 Oak Ave", "LA"]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_P(AvroReaderParameterizedTest, ListType) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired(2, "tags", + std::make_shared(SchemaField::MakeRequired( + 3, "element", std::make_shared())))}); + + std::string expected_string = R"([ + [1, ["tag1", "tag2", "tag3"]], + [2, ["foo", "bar"]], + [3, []] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_P(AvroReaderParameterizedTest, MapType) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired( + 1, "properties", + std::make_shared( + SchemaField::MakeRequired(2, "key", std::make_shared()), + SchemaField::MakeRequired(3, "value", std::make_shared())))}); + + std::string expected_string = R"([ + [[["key1", 100], ["key2", 200]]], + [[["a", 1], ["b", 2], ["c", 3]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_P(AvroReaderParameterizedTest, MapTypeWithNonStringKey) { + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired( + 1, "int_map", + std::make_shared( + SchemaField::MakeRequired(2, "key", std::make_shared()), + SchemaField::MakeRequired(3, "value", std::make_shared())))}); + + std::string expected_string = R"([ + [[[1, "one"], [2, "two"], [3, "three"]]], + [[[10, "ten"], [20, "twenty"]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_F(AvroReaderTest, ProjectionSubsetAndReorder) { + // Write file with full schema + auto write_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired(2, "name", std::make_shared()), + SchemaField::MakeRequired(3, "age", std::make_shared()), + SchemaField::MakeRequired(4, "city", std::make_shared())}); + + std::string write_data = R"([ + [1, "Alice", 25, "NYC"], + [2, "Bob", 30, "SF"], + [3, "Charlie", 35, "LA"] + ])"; + + // Write with full schema + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*write_schema, &arrow_c_schema), IsOk()); + auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema); + ASSERT_TRUE(arrow_schema_result.ok()); + auto arrow_schema = arrow_schema_result.ValueOrDie(); + + auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema, write_data); + ASSERT_TRUE(array_result.ok()); + auto array = array_result.ValueOrDie(); + + struct ArrowArray arrow_array; + auto export_result = ::arrow::ExportArray(*array, &arrow_array); + ASSERT_TRUE(export_result.ok()); + + std::unordered_map metadata = {{"k1", "v1"}}; + auto writer_result = + WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = temp_avro_file_, + .schema = write_schema, + .io = file_io_, + .metadata = metadata}); + ASSERT_TRUE(writer_result.has_value()); + auto writer = std::move(writer_result.value()); + ASSERT_THAT(writer->Write(&arrow_array), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); + + // Read with projected schema: subset of columns (city, id) in different order + auto read_schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(4, "city", std::make_shared()), + SchemaField::MakeRequired(1, "id", std::make_shared())}); + + auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_); + ASSERT_TRUE(file_info_result.ok()); + + auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro, + {.path = temp_avro_file_, + .length = file_info_result->size(), + .io = file_io_, + .projection = read_schema}); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + + // Verify reordered subset + ASSERT_NO_FATAL_FAILURE( + VerifyNextBatch(*reader, R"([["NYC", 1], ["SF", 2], ["LA", 3]])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + +TEST_P(AvroReaderParameterizedTest, ComplexNestedTypes) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired(2, "nested_list", + std::make_shared(SchemaField::MakeRequired( + 3, "element", + std::make_shared(SchemaField::MakeRequired( + 4, "element", std::make_shared())))))}); + + std::string expected_string = R"([ + [1, [[1, 2], [3, 4]]], + [2, [[5], [6, 7, 8]]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +TEST_P(AvroReaderParameterizedTest, OptionalFieldsWithNulls) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeOptional(2, "name", std::make_shared()), + SchemaField::MakeOptional(3, "age", std::make_shared())}); + + std::string expected_string = R"([ + [1, "Alice", 30], + [2, null, 25], + [3, "Charlie", null], + [4, null, null] + ])"; + + WriteAndVerify(schema, expected_string); +} + +// Test both direct decoder and GenericDatum paths +TEST_P(AvroReaderParameterizedTest, LargeDataset) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired(2, "value", std::make_shared())}); + + // Generate large dataset JSON + std::ostringstream json; + json << "["; + for (int i = 0; i < 1000; ++i) { + if (i > 0) json << ", "; + json << "[" << i << ", " << (i * 1.5) << "]"; + } + json << "]"; + + WriteAndVerify(schema, json.str()); +} + +TEST_P(AvroReaderParameterizedTest, EmptyCollections) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", std::make_shared()), + SchemaField::MakeRequired(2, "list_col", + std::make_shared(SchemaField::MakeRequired( + 3, "element", std::make_shared())))}); + + std::string expected_string = R"([ + [1, []], + [2, [10, 20, 30]] + ])"; + + WriteAndVerify(schema, expected_string); +} + +INSTANTIATE_TEST_SUITE_P(DirectDecoderModes, AvroReaderParameterizedTest, + ::testing::Bool(), + [](const ::testing::TestParamInfo& info) { + return info.param ? "DirectDecoder" : "GenericDatum"; + }); + } // namespace iceberg::avro diff --git a/src/iceberg/test/temp_file_test_base.h b/src/iceberg/test/temp_file_test_base.h index 8e20e2ca5..4b3131d15 100644 --- a/src/iceberg/test/temp_file_test_base.h +++ b/src/iceberg/test/temp_file_test_base.h @@ -118,7 +118,14 @@ class TempFileTestBase : public ::testing::Test { /// \brief Get the test name for inclusion in the filename std::string TestInfo() const { if (const auto info = ::testing::UnitTest::GetInstance()->current_test_info(); info) { - return std::format("{}_{}", info->test_suite_name(), info->name()); + std::string result = std::format("{}_{}", info->test_suite_name(), info->name()); + // Replace slashes (from parameterized tests) with underscores to avoid path issues + for (auto& c : result) { + if (c == '/') { + c = '_'; + } + } + return result; } return "unknown_test"; }