Skip to content

Pyarrow data type, default to small type and fix large type override #1859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ This produces the following result with `tbl.scan().to_arrow()`:

```python
pyarrow.Table
city: large_string
city: string
lat: double
long: double
----
Expand Down Expand Up @@ -476,7 +476,7 @@ This produces the following result with `tbl.scan().to_arrow()`:

```python
pyarrow.Table
city: large_string
city: string
lat: double
long: double
----
Expand Down Expand Up @@ -957,14 +957,14 @@ split_offsets: list<item: int64>
equality_ids: list<item: int32>
child 0, item: int32
sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: large_string
child 5, upper_bound: large_string
child 4, lower_bound: string
child 5, upper_bound: string
child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
Expand Down Expand Up @@ -998,7 +998,7 @@ equality_ids:[[[],[]]]
sort_order_id:[[[],[]]]
readable_metrics: [
-- is_valid: all not null
-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: large_string, upper_bound: large_string>
-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string>
-- is_valid: all not null
-- child 0 type: int64
[140]
Expand All @@ -1008,9 +1008,9 @@ readable_metrics: [
[0]
-- child 3 type: int64
[null]
-- child 4 type: large_string
-- child 4 type: string
["Amsterdam"]
-- child 5 type: large_string
-- child 5 type: string
["San Francisco"]
-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
Expand Down
2 changes: 1 addition & 1 deletion mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya

| Key | Example | Description |
| ------------------------------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| pyarrow.use-large-types-on-read | True | Use large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is True. |
| pyarrow.use-large-types-on-read | False | Force large PyArrow types i.e. [large_string](https://arrow.apache.org/docs/python/generated/pyarrow.large_string.html), [large_binary](https://arrow.apache.org/docs/python/generated/pyarrow.large_binary.html) and [large_list](https://arrow.apache.org/docs/python/generated/pyarrow.large_list.html) field types on table scans. The default value is False. |

<!-- markdown-link-check-enable-->

Expand Down
12 changes: 6 additions & 6 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:

def list(self, list_type: ListType, element_result: pa.DataType) -> pa.DataType:
element_field = self.field(list_type.element_field, element_result)
return pa.large_list(value_type=element_field)
return pa.list_(value_type=element_field)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that we need to change this. We use schema_to_pyarrow in many places:

  • Schema.as_arrow(), this can be problematic when people already allocate buffers that are larger than what fits in the small ones.
  • _ConvertToArrowExpression.{visit_in,visit_not_in}, I checked manually, and it looks like we can mix large and normal types here :)
  • ArrowProjectionVisitor has the issue similar to what you've described in Arrow: Infer the types when reading #1669 (comment). I think the other way around is also an issue. If you would promote a large_string, it would now produce a binary and not a large_binary.
  • ArrowScan.to_table()will return the schema when there is no data, both small and large are okay.
  • DataScan.to_arrow_batch_reader(), I think we should always update to the large type. Since this is streaming, we don't know upfront if the small buffers are big enough, therefore it is safe to go with the large ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko Just coming back to this PR. Is there a reason why we'd want to default to large_list?

The difference between list_ and large_list is the number of elements supported by the list. According to the large_list docs,

Unless you need to represent data larger than 2**31 elements, you should prefer list_().

2**31 is 2_147_483_648, 2 billion items in the list seems pretty rare.

I did a small experiment, this works with list_

import pyarrow as pa
import numpy as np

size = 2**31 - 2
pa.array([np.zeros(size, dtype=np.int8)], type=pa.list_(pa.int8()))

but this will crash python, and would require large_list

import pyarrow as pa
import numpy as np

size = 2**31 - 1
pa.array([np.zeros(size, dtype=np.int8)], type=pa.list_(pa.int8()))


def map(self, map_type: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
key_field = self.field(map_type.key_field, key_result)
Expand Down Expand Up @@ -675,7 +675,7 @@ def visit_timestamptz_ns(self, _: TimestamptzNanoType) -> pa.DataType:
return pa.timestamp(unit="ns", tz="UTC")

def visit_string(self, _: StringType) -> pa.DataType:
return pa.large_string()
return pa.string()

def visit_uuid(self, _: UUIDType) -> pa.DataType:
return pa.binary(16)
Expand All @@ -684,7 +684,7 @@ def visit_unknown(self, _: UnknownType) -> pa.DataType:
return pa.null()

def visit_binary(self, _: BinaryType) -> pa.DataType:
return pa.large_binary()
return pa.binary()


def _convert_scalar(value: Any, iceberg_type: IcebergType) -> pa.scalar:
Expand Down Expand Up @@ -1612,7 +1612,7 @@ def _table_from_scan_task(task: FileScanTask) -> pa.Table:
removed_in="0.11.0",
help_message=f"Property `{PYARROW_USE_LARGE_TYPES_ON_READ}` will be removed.",
)
result = result.cast(arrow_schema)
result = result.cast(_pyarrow_schema_ensure_large_types(arrow_schema))

if self._limit is not None:
return result.slice(0, self._limit)
Expand Down Expand Up @@ -1718,8 +1718,8 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array:
target_schema = schema_to_pyarrow(
promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids
)
if self._use_large_types is False:
target_schema = _pyarrow_schema_ensure_small_types(target_schema)
if self._use_large_types is True:
target_schema = _pyarrow_schema_ensure_large_types(target_schema)
return values.cast(target_schema)
elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type:
if field.field_type == TimestampType():
Expand Down
10 changes: 5 additions & 5 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier: Identifier)
],
schema=pa.schema(
[
pa.field("foo", pa.large_string(), nullable=True),
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=False),
pa.field("baz", pa.bool_(), nullable=True),
pa.field("large", pa.large_string(), nullable=True),
Expand Down Expand Up @@ -1462,7 +1462,7 @@ def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None:
{
"foo": ["a", None, "z"],
},
schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
schema=pa.schema([pa.field("foo", pa.string(), nullable=True)]),
)

tbl = catalog.create_table(identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)})
Expand All @@ -1474,7 +1474,7 @@ def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None:
},
schema=pa.schema(
[
pa.field("foo", pa.large_string(), nullable=True),
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=True),
]
),
Expand Down Expand Up @@ -1514,7 +1514,7 @@ def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> N
{
"foo": ["a", None, "z"],
},
schema=pa.schema([pa.field("foo", pa.large_string(), nullable=True)]),
schema=pa.schema([pa.field("foo", pa.string(), nullable=True)]),
)

pa_table_with_column = pa.Table.from_pydict(
Expand All @@ -1524,7 +1524,7 @@ def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> N
},
schema=pa.schema(
[
pa.field("foo", pa.large_string(), nullable=True),
pa.field("foo", pa.string(), nullable=True),
pa.field("bar", pa.int32(), nullable=True),
]
),
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2510,8 +2510,8 @@ def pa_schema() -> "pa.Schema":
return pa.schema(
[
("bool", pa.bool_()),
("string", pa.large_string()),
("string_long", pa.large_string()),
("string", pa.string()),
("string_long", pa.string()),
("int", pa.int32()),
("long", pa.int64()),
("float", pa.float32()),
Expand All @@ -2525,7 +2525,7 @@ def pa_schema() -> "pa.Schema":
# ("time", pa.time64("us")),
# Not natively supported by Arrow
# ("uuid", pa.fixed(16)),
("binary", pa.large_binary()),
("binary", pa.binary()),
("fixed", pa.binary(16)),
]
)
Expand Down
15 changes: 9 additions & 6 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,9 +872,12 @@ def test_table_scan_keep_types(catalog: Catalog) -> None:


@pytest.mark.integration
@pytest.mark.filterwarnings(
"ignore:Deprecated in 0.10.0, will be removed in 0.11.0. Property `pyarrow.use-large-types-on-read` will be removed.:DeprecationWarning"
)
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_override_with_small_types"
def test_table_scan_override_with_large_types(catalog: Catalog) -> None:
identifier = "default.test_table_scan_override_with_large_types"
arrow_table = pa.Table.from_arrays(
[
pa.array(["a", "b", "c"]),
Expand All @@ -900,15 +903,15 @@ def test_table_scan_override_with_small_types(catalog: Catalog) -> None:
with tbl.update_schema() as update_schema:
update_schema.update_column("string-to-binary", BinaryType())

tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "False"
tbl.io.properties[PYARROW_USE_LARGE_TYPES_ON_READ] = "True"
result_table = tbl.scan().to_arrow()

expected_schema = pa.schema(
[
pa.field("string", pa.string()),
pa.field("string", pa.large_string()),
pa.field("string-to-binary", pa.large_binary()),
pa.field("binary", pa.binary()),
pa.field("list", pa.list_(pa.string())),
pa.field("binary", pa.large_binary()),
pa.field("list", pa.large_list(pa.large_string())),
]
)
assert result_table.schema.equals(expected_schema)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_writes/test_partitioned_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ def test_unsupported_transform(

with pytest.raises(
ValueError,
match="FeatureUnsupported => Unsupported data type for truncate transform: LargeBinary",
match="FeatureUnsupported => Unsupported data type for truncate transform: Binary",
):
tbl.append(arrow_table_with_null)

Expand Down
Loading