diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index b6ad5659b1..e42c130779 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -2082,6 +2082,8 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc self.trunc_length = trunc_length expected_physical_type = _primitive_to_physical(iceberg_type) + + # TODO: Refactor to use promotion logic if expected_physical_type != physical_type_string: # Allow promotable physical types # INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts @@ -2089,6 +2091,9 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE" ): pass + # Allow DECIMAL to be stored as FIXED_LEN_BYTE_ARRAY, INT32 or INT64 + elif physical_type_string == "FIXED_LEN_BYTE_ARRAY" and expected_physical_type in ("INT32", "INT64"): + pass else: raise ValueError( f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}" @@ -2506,12 +2511,16 @@ def data_file_statistics_from_parquet_metadata( if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY": scale = stats_col.iceberg_type.scale - col_aggs[field_id].update_min( - unscaled_to_decimal(statistics.min_raw, scale) - ) if statistics.min_raw is not None else None - col_aggs[field_id].update_max( - unscaled_to_decimal(statistics.max_raw, scale) - ) if statistics.max_raw is not None else None + ( + col_aggs[field_id].update_min(unscaled_to_decimal(statistics.min_raw, scale)) + if statistics.min_raw is not None + else None + ) + ( + col_aggs[field_id].update_max(unscaled_to_decimal(statistics.max_raw, scale)) + if statistics.max_raw is not None + else None + ) else: col_aggs[field_id].update_min(statistics.min) col_aggs[field_id].update_max(statistics.max) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 09cd2421ea..114c63a670 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -2188,6 +2188,43 @@ def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveT assert stats.current_max == expected_result +@pytest.mark.parametrize( + "iceberg_type, physical_type_string", + [ + # Exact match + (IntegerType(), "INT32"), + # Allowed INT32 -> INT64 promotion + (LongType(), "INT32"), + # Allowed FLOAT -> DOUBLE promotion + (DoubleType(), "FLOAT"), + # Allowed FIXED_LEN_BYTE_ARRAY -> INT32 + (DecimalType(precision=2, scale=2), "FIXED_LEN_BYTE_ARRAY"), + # Allowed FIXED_LEN_BYTE_ARRAY -> INT64 + (DecimalType(precision=12, scale=2), "FIXED_LEN_BYTE_ARRAY"), + ], +) +def test_stats_aggregator_conditionally_allowed_types_pass(iceberg_type: PrimitiveType, physical_type_string: str) -> None: + stats = StatsAggregator(iceberg_type, physical_type_string) + + assert stats.primitive_type == iceberg_type + assert stats.current_min is None + assert stats.current_max is None + + +@pytest.mark.parametrize( + "iceberg_type, physical_type_string", + [ + # Fail case: INT64 cannot be cast to INT32 + (IntegerType(), "INT64"), + ], +) +def test_stats_aggregator_physical_type_does_not_match_expected_raise_error( + iceberg_type: PrimitiveType, physical_type_string: str +) -> None: + with pytest.raises(ValueError, match="Unexpected physical type"): + StatsAggregator(iceberg_type, physical_type_string) + + def test_bin_pack_arrow_table(arrow_table_with_null: pa.Table) -> None: # default packs to 1 bin since the table is small bin_packed = bin_pack_arrow_table(