From e7fc6351dc9ead189b9c96694965f460d79b53ba Mon Sep 17 00:00:00 2001 From: Hazmi Date: Tue, 2 Jun 2026 13:18:28 +0000 Subject: [PATCH] [VL] Fixed iceberg config logic --- .../execution/AbstractIcebergWriteExec.scala | 2 +- .../enhanced/VeloxIcebergSuite.scala | 65 ++++++++++++++++++- .../gluten/execution/IcebergWriteExec.scala | 9 ++- 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala index d76c4256d4c..2b4d50a73f0 100644 --- a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala +++ b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala @@ -51,7 +51,7 @@ abstract class AbstractIcebergWriteExec extends IcebergWriteExec { MAX_TARGET_FILE_SIZE_SESSION.key -> getTargetFileSizeBytes ).foreach { case (key, value) => - if (SQLConf.get.getConfString(key, null) != null) { + if (SQLConf.get.getConfString(key, null) == null) { icebergProperties.put(key, value) } } diff --git a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala index e2df119bc0d..a22de347f0a 100644 --- a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala +++ b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala @@ -18,7 +18,6 @@ package org.apache.gluten.execution.enhanced import org.apache.gluten.execution._ import org.apache.gluten.tags.EnhancedFeaturesTest - import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.CommandResultExec import org.apache.spark.sql.execution.GlutenImplicits._ @@ -465,4 +464,68 @@ class VeloxIcebergSuite extends IcebergSuite { ) } } + test("iceberg native write respects target file size bytes") { + withTable("iceberg_small_target_tbl") { + spark.sql(""" + |CREATE TABLE iceberg_small_target_tbl ( + | id INT, + | payload STRING + |) USING iceberg + |TBLPROPERTIES ( + | 'write.format.default' = 'parquet', + | 'write.parquet.compression-codec' = 'uncompressed', + | 'write.parquet.row-group-size-bytes' = '4096', + | 'write.parquet.page-size-bytes' = '1024B', + | 'write.target-file-size-bytes' = '8192' + |) + |""".stripMargin) + + checkAnswer( + spark.sql(""" + |SHOW TBLPROPERTIES iceberg_small_target_tbl + |('write.target-file-size-bytes') + |""".stripMargin), + Seq(Row("write.target-file-size-bytes", "8192"))) + + val df = spark.sql(""" + |INSERT INTO iceberg_small_target_tbl + |SELECT /*+ COALESCE(1) */ + | CAST(id AS INT), + | concat( + | CAST(id AS STRING), + | '-', + | sha2(CAST(id AS STRING), 256), + | '-', + | sha2(CAST(id + 1000 AS STRING), 256) + | ) + |FROM range(1000) + |""".stripMargin) + + val commandPlan = + df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan + + assert(commandPlan.isInstanceOf[VeloxIcebergAppendDataExec]) + + checkAnswer( + spark.sql("SELECT COUNT(*) FROM iceberg_small_target_tbl"), + Seq(Row(1000L))) + + val files = spark.sql(""" + |SELECT file_size_in_bytes + |FROM default.iceberg_small_target_tbl.files + |""".stripMargin).collect().map(_.getLong(0)) + + assert(files.nonEmpty) + + assert( + files.length > 1, + s"Expected write.target-file-size-bytes=8192 to create multiple files, " + + s"but got files=${files.mkString("[", ", ", "]")}") + + assert( + files.max < 64L * 1024L, + s"Expected small target file size to keep max file size reasonably small, " + + s"but got files=${files.mkString("[", ", ", "]")}") + } + } } diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala index c93c918a1fa..0544e47c1dd 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala @@ -19,7 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.iceberg.{FileFormat, PartitionField, PartitionSpec, Schema, TableProperties} -import org.apache.iceberg.TableProperties.{ORC_COMPRESSION, ORC_COMPRESSION_DEFAULT, PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT, PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT, WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT} +import org.apache.iceberg.TableProperties.{ORC_COMPRESSION, ORC_COMPRESSION_DEFAULT, PARQUET_COMPRESSION, PARQUET_COMPRESSION_DEFAULT, PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT} import org.apache.iceberg.avro.AvroSchemaUtil import org.apache.iceberg.spark.source.IcebergWriteUtil import org.apache.iceberg.types.Type.TypeID @@ -50,15 +50,14 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec { } protected def getParquetPageSizeBytes: String = { - val config = IcebergWriteUtil.getWriteProperty(write) - config.getOrDefault( + val tableProps = IcebergWriteUtil.getTable(write).properties() + tableProps.getOrDefault( normalizeCapacityString(PARQUET_PAGE_SIZE_BYTES), normalizeCapacityString(PARQUET_PAGE_SIZE_BYTES_DEFAULT.toString)) } protected def getTargetFileSizeBytes: String = { - val config = IcebergWriteUtil.getWriteProperty(write) - config.getOrDefault(WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT.toString) + IcebergWriteUtil.getWriteConf(write).targetDataFileSize().toString } protected def getPartitionSpec: PartitionSpec = {