Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2082,13 +2082,18 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc
self.trunc_length = trunc_length

expected_physical_type = _primitive_to_physical(iceberg_type)

# TODO: Refactor to use promotion logic
if expected_physical_type != physical_type_string:
# Allow promotable physical types
# INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts
if (physical_type_string == "INT32" and expected_physical_type == "INT64") or (
physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE"
):
pass
# Allow DECIMAL to be stored as FIXED_LEN_BYTE_ARRAY, INT32 or INT64
elif physical_type_string == "FIXED_LEN_BYTE_ARRAY" and expected_physical_type in ("INT32", "INT64"):
pass
else:
raise ValueError(
f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
Expand Down Expand Up @@ -2506,12 +2511,16 @@ def data_file_statistics_from_parquet_metadata(

if isinstance(stats_col.iceberg_type, DecimalType) and statistics.physical_type != "FIXED_LEN_BYTE_ARRAY":
scale = stats_col.iceberg_type.scale
col_aggs[field_id].update_min(
unscaled_to_decimal(statistics.min_raw, scale)
) if statistics.min_raw is not None else None
col_aggs[field_id].update_max(
unscaled_to_decimal(statistics.max_raw, scale)
) if statistics.max_raw is not None else None
(
col_aggs[field_id].update_min(unscaled_to_decimal(statistics.min_raw, scale))
if statistics.min_raw is not None
else None
)
(
col_aggs[field_id].update_max(unscaled_to_decimal(statistics.max_raw, scale))
if statistics.max_raw is not None
else None
)
else:
col_aggs[field_id].update_min(statistics.min)
col_aggs[field_id].update_max(statistics.max)
Expand Down
37 changes: 37 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2188,6 +2188,43 @@ def test_stats_aggregator_update_max(vals: List[Any], primitive_type: PrimitiveT
assert stats.current_max == expected_result


@pytest.mark.parametrize(
"iceberg_type, physical_type_string",
[
# Exact match
(IntegerType(), "INT32"),
# Allowed INT32 -> INT64 promotion
(LongType(), "INT32"),
# Allowed FLOAT -> DOUBLE promotion
(DoubleType(), "FLOAT"),
# Allowed FIXED_LEN_BYTE_ARRAY -> INT32
(DecimalType(precision=2, scale=2), "FIXED_LEN_BYTE_ARRAY"),
# Allowed FIXED_LEN_BYTE_ARRAY -> INT64
(DecimalType(precision=12, scale=2), "FIXED_LEN_BYTE_ARRAY"),
],
)
def test_stats_aggregator_conditionally_allowed_types_pass(iceberg_type: PrimitiveType, physical_type_string: str) -> None:
stats = StatsAggregator(iceberg_type, physical_type_string)

assert stats.primitive_type == iceberg_type
assert stats.current_min is None
assert stats.current_max is None


@pytest.mark.parametrize(
"iceberg_type, physical_type_string",
[
# Fail case: INT64 cannot be cast to INT32
(IntegerType(), "INT64"),
],
)
def test_stats_aggregator_physical_type_does_not_match_expected_raise_error(
iceberg_type: PrimitiveType, physical_type_string: str
) -> None:
with pytest.raises(ValueError, match="Unexpected physical type"):
StatsAggregator(iceberg_type, physical_type_string)


def test_bin_pack_arrow_table(arrow_table_with_null: pa.Table) -> None:
# default packs to 1 bin since the table is small
bin_packed = bin_pack_arrow_table(
Expand Down