Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ARG BASE_IMAGE_SPARK_VERSION=4.0.1
FROM apache/spark:${BASE_IMAGE_SPARK_VERSION}

# Dependency versions - keep these compatible
ARG ICEBERG_VERSION=1.10.0
ARG ICEBERG_VERSION=1.10.1
ARG ICEBERG_SPARK_RUNTIME_VERSION=4.0_2.13
ARG SPARK_VERSION=4.0.1
ARG HADOOP_VERSION=3.4.1
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,11 @@ def visit_string(self, _: StringType) -> pa.DataType:
return pa.large_string()

def visit_uuid(self, _: UUIDType) -> pa.DataType:
return pa.uuid()
# TODO: Change to uuid when PyArrow implements filtering for UUID types
# Using binary(16) instead of pa.uuid() because filtering is not
# implemented for UUID types in PyArrow
# (context: https://github.com/apache/iceberg-python/issues/2372)
return pa.binary(16)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is the right fix.

We explicity changed binary(16) to uuid in this PR
https://github.com/apache/iceberg-python/pull/2007/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR687

The current change reverts it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that my other comment clarifies this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which comment? I would prefer to keep it UUID and fix this on the Java side.

Python and Rust Arrow implementations don't recognize Java's UUID metadata.

Most implementations don't really look at the Arrow/Parquet/etc logical annotations, so both uuid (which is a fixed[16] with an UUID label on it) and cast it to a type that's compatible with the query engine. Spark has shown to be problematic because it doesn't have a native UUID type, but it handles it internally as a string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to this comment #2881 (comment), which points to the #2372 issue where I explain the problem. In this issue your suggestion was to change to fixed[16].

@kevinjqliu added the error we're getting now in the other comment. The problem is filtering a parquet file with UUID, not reading it.

E   pyarrow.lib.ArrowNotImplementedError: Function 'equal' has no kernel matching input types (extension<arrow.uuid>, extension<arrow.uuid>)

pyarrow/error.pxi:92: ArrowNotImplementedError

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the PR body to add the filtering error.

Copy link
Contributor

@Fokko Fokko Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I think that's reasonable.

Let's add a comment in the code, referring to #2372 explaining why we we use binary(16) rather than uuid()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko done in a7bdfca


def visit_unknown(self, _: UnknownType) -> pa.DataType:
"""Type `UnknownType` can be promoted to any primitive type in V3+ tables per the Iceberg spec."""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def test_add_files_with_valid_upcast(
pa.field("list", pa.list_(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
pa.field("double", pa.float64(), nullable=True),
pa.field("uuid", pa.uuid(), nullable=True),
pa.field("uuid", pa.binary(16), nullable=True),
)
)
)
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,15 +610,15 @@ def test_partitioned_tables(catalog: Catalog) -> None:
def test_unpartitioned_uuid_table(catalog: Catalog) -> None:
unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned")
arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow()
assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967")]
assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967").bytes]

arrow_table_neq = unpartitioned_uuid.scan(
row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'"
).to_arrow()
assert arrow_table_neq["uuid_col"].to_pylist() == [
uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226"),
uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b"),
uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e"),
uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226").bytes,
uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b").bytes,
uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e").bytes,
]


Expand Down
64 changes: 62 additions & 2 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ def test_table_write_schema_with_valid_upcast(
pa.field("list", pa.list_(pa.int64()), nullable=False),
pa.field("map", pa.map_(pa.string(), pa.int64()), nullable=False),
pa.field("double", pa.float64(), nullable=True), # can support upcasting float to double
pa.field("uuid", pa.uuid(), nullable=True),
pa.field("uuid", pa.binary(16), nullable=True),
)
)
)
Expand Down Expand Up @@ -2138,7 +2138,7 @@ def test_uuid_partitioning(session_catalog: Catalog, spark: SparkSession, transf
tbl.append(arr_table)

lhs = [r[0] for r in spark.table(identifier).collect()]
rhs = [str(u.as_py()) for u in tbl.scan().to_arrow()["uuid"].combine_chunks()]
rhs = [str(uuid.UUID(bytes=u.as_py())) for u in tbl.scan().to_arrow()["uuid"].combine_chunks()]
assert lhs == rhs


Expand Down Expand Up @@ -2530,3 +2530,63 @@ def test_v3_write_and_read_row_lineage(spark: SparkSession, session_catalog: Cat
assert tbl.metadata.next_row_id == initial_next_row_id + len(test_data), (
"Expected next_row_id to be incremented by the number of added rows"
)


@pytest.mark.integration
def test_write_uuid_in_pyiceberg_and_scan(session_catalog: Catalog, spark: SparkSession) -> None:
"""Test UUID compatibility between PyIceberg and Spark.

UUIDs must be written as binary(16) for Spark compatibility since Java Arrow
metadata differs from Python Arrow metadata for UUID types.
"""
identifier = "default.test_write_uuid_in_pyiceberg_and_scan"

catalog = load_catalog("default", type="in-memory")
catalog.create_namespace("ns")

schema = Schema(NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False))

test_data_with_null = {
"uuid_col": [
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
None,
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
]
}

try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

table = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema)

arrow_table = pa.table(test_data_with_null, schema=schema.as_arrow())

# Write with pyarrow
table.append(arrow_table)

# Write with pyspark
spark.sql(
f"""
INSERT INTO {identifier} VALUES ("22222222-2222-2222-2222-222222222222")
"""
)
df = spark.table(identifier)

table.refresh()

assert df.count() == 4
assert len(table.scan().to_arrow()) == 4

result = df.where("uuid_col = '00000000-0000-0000-0000-000000000000'")
assert result.count() == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test on current main branch with 1.10.1 and this is the stacktrace. This is different from the stacktrace in #2007

E                   pyspark.errors.exceptions.connect.SparkException: Job aborted due to stage failure: Task 0 in stage 142.0 failed 1 times, most recent failure: Lost task 0.0 in stage 142.0 (TID 287) (fcaa97ba83c2 executor driver): java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.nio.ByteBuffer (java.util.UUID and java.nio.ByteBuffer are in module java.base of loader 'bootstrap')
E                       at java.base/java.nio.ByteBuffer.compareTo(ByteBuffer.java:267)
E                       at java.base/java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
E                       at java.base/java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
E                       at org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:306)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:352)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:79)
E                       at org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:162)
E                       at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:390)
E                       at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:409)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:103)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:73)
E                       at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:108)
E                       at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
E                       at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
E                       at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:126)
E                       at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43)
E                       at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141)
E                       at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
E                       at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
E                       at scala.Option.exists(Option.scala:406)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E                       at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E                       at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E                       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
E                       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
E                       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
E                       at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
E                       at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
E                       at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E                       at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
E                       at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
E                       at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
E                       at org.apache.spark.scheduler.Task.run(Task.scala:147)
E                       at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
E                       at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
E                       at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
E                       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
E                       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
E                       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
E                       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
E                       at java.base/java.lang.Thread.run(Thread.java:840)
E                   
E                   Driver stacktrace:
E                   
E                   JVM stacktrace:
E                   org.apache.spark.SparkException
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
E                       at scala.Option.getOrElse(Option.scala:201)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
E                       at scala.collection.immutable.List.foreach(List.scala:334)
E                       at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
E                       at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
E                       at scala.Option.foreach(Option.scala:437)
E                       at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
E                       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
E                       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
E                       at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
E                       at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
E                       at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
E                       at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
E                       at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
E                       at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
E                       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E                       at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
E                       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
E                       at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$2(SparkConnectPlanExecution.scala:155)
E                       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
E                       at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
E                       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
E                       at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
E                       at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
E                       at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
E                       at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
E                       at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
E                       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
E                       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
E                       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
E                       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
E                       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
E                       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
E                       at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.processAsArrowBatches(SparkConnectPlanExecution.scala:154)
E                       at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:78)
E                       at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:314)
E                       at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
E                       at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
E                       at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
E                       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
E                       at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
E                       at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
E                       at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
E                       at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
E                       at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
E                       at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
E                       at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
E                       at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
E                       at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
E                       at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)
E                   Caused by: java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.nio.ByteBuffer (java.util.UUID and java.nio.ByteBuffer are in module java.base of loader 'bootstrap')
E                       at java.nio.ByteBuffer.compareTo(ByteBuffer.java:267)
E                       at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
E                       at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
E                       at org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:306)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:352)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:79)
E                       at org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:162)
E                       at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:390)
E                       at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:409)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:103)
E                       at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:73)
E                       at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:108)
E                       at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
E                       at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
E                       at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:126)
E                       at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43)
E                       at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141)
E                       at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
E                       at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
E                       at scala.Option.exists(Option.scala:406)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
E                       at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E                       at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E                       at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E                       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(:-1)
E                       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(:-1)
E                       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(:-1)
E                       at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
E                       at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
E                       at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E                       at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
E                       at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
E                       at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
E                       at org.apache.spark.scheduler.Task.run(Task.scala:147)
E                       at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
E                       at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
E                       at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
E                       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
E                       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
E                       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
E                       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
E                       at java.lang.Thread.run(Thread.java:840)

.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py:1882: SparkException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i also downloaded the 2 data files

➜  Downloads parquet schema 00000-0-562a1d32-f5da-4e09-836a-b7d0d4e737e6.parquet
{
  "type" : "record",
  "name" : "schema",
  "fields" : [ {
    "name" : "uuid_col",
    "type" : [ "null", {
      "type" : "string",
      "logicalType" : "uuid"
    } ],
    "default" : null
  } ]
}
➜  Downloads parquet schema 00000-284-30c509c4-f8e9-46fb-a1f1-169e1c928e00-0-00001.parquet
{
  "type" : "record",
  "name" : "table",
  "fields" : [ {
    "name" : "uuid_col",
    "type" : {
      "type" : "string",
      "logicalType" : "uuid"
    }
  } ]
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stacktrace has changed because of the fix that I made in the Java implementation. This PR (apache/iceberg#14027) has more details about the problem and in the issue #2372 I explain the problem from the pyiceberg side and why we are changing back to binary(16).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Parquet files look wrong, and hot sure what happened there. UUID should annotate FILED_LEN_BYTE_ARRAY:

image

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pa.binary(16) change fixed the comparison issue but broke Parquet spec compliance by removing the UUID logical type annotation. We can get back to UUID in the visitor and raise an exception with a better message when the user tries to filter a UUID column, since PyArrow does not support filtering.


result = df.where("uuid_col = '22222222-2222-2222-2222-222222222222'")
assert result.count() == 1

result = table.scan(row_filter=EqualTo("uuid_col", uuid.UUID("00000000-0000-0000-0000-000000000000").bytes)).to_arrow()
assert len(result) == 1

result = table.scan(row_filter=EqualTo("uuid_col", uuid.UUID("22222222-2222-2222-2222-222222222222").bytes)).to_arrow()
assert len(result) == 1