From c27f35aa6200441f446b138581a3cd2d0b72616b Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Sun, 31 May 2026 12:06:43 +0300 Subject: [PATCH 1/3] [VL][Delta] Add DV scan info extraction utility --- .../delta/DeltaDeletionVectorScanInfo.scala | 208 +++++++++++++++++ .../DeltaDeletionVectorScanInfoSuite.scala | 154 +++++++++++++ .../delta/DeltaDeletionVectorScanInfo.scala | 218 ++++++++++++++++++ .../DeltaDeletionVectorScanInfoSuite.scala | 154 +++++++++++++ 4 files changed, 734 insertions(+) create mode 100644 backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala create mode 100644 backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala create mode 100644 backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala create mode 100644 backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala diff --git a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala new file mode 100644 index 00000000000..cad49ea2ffd --- /dev/null +++ b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.delta + +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import org.apache.hadoop.fs.Path + +import java.util.{ArrayList => JArrayList} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +object DeltaDeletionVectorScanInfo { + object RowIndexFilterType extends Enumeration { + type RowIndexFilterType = Value + val KEEP_ALL, IF_CONTAINED, IF_NOT_CONTAINED = Value + } + + import RowIndexFilterType._ + + final case class DeletionVectorInfo( + rowIndexFilterType: RowIndexFilterType, + descriptor: Option[DeletionVectorDescriptor], + serializedDeletionVector: Array[Byte]) { + def hasDeletionVector: Boolean = descriptor.isDefined + def cardinality: Long = descriptor.map(_.cardinality).getOrElse(0L) + } + + final case class PartitionFileScanInfo( + normalizedOtherMetadataColumns: Map[String, Object], + deletionVectorInfo: DeletionVectorInfo) + + private val RowIndexFilterIdEncoded = + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED + private val RowIndexFilterTypeKey = + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE + + def extract( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile): PartitionFileScanInfo = { + val metadata = otherMetadataColumns(file) + val normalizedMetadata = metadata -- Seq(RowIndexFilterIdEncoded, RowIndexFilterTypeKey) + val dvInfo = extractDeletionVectorInfo(spark, partitionColumnCount, file, metadata) + PartitionFileScanInfo(normalizedMetadata, dvInfo) + } + + def extractAll( + spark: SparkSession, + partitionColumnCount: Int, + files: Seq[PartitionedFile]): Seq[PartitionFileScanInfo] = { + files.map(extract(spark, partitionColumnCount, _)) + } + + def extractAllFromJava( + spark: SparkSession, + partitionColumnCount: Int, + files: java.util.List[PartitionedFile]): java.util.List[PartitionFileScanInfo] = { + new JArrayList(extractAll(spark, partitionColumnCount, files.asScala.toSeq).asJava) + } + + private def extractDeletionVectorInfo( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile, + metadata: Map[String, Object]): DeletionVectorInfo = { + val descriptorValue = metadata.get(RowIndexFilterIdEncoded) + val filterTypeValue = metadata.get(RowIndexFilterTypeKey) + + (descriptorValue, filterTypeValue) match { + case (None, None) => + DeletionVectorInfo(KEEP_ALL, None, Array.emptyByteArray) + case (Some(encodedDescriptor), Some(filterType)) => + val descriptor = parseDescriptor(encodedDescriptor.toString) + val serializedPayload = serializePayload(spark, partitionColumnCount, file, descriptor) + DeletionVectorInfo( + parseRowIndexFilterType(filterType.toString), + Some(descriptor), + serializedPayload) + case _ => + throw new IllegalStateException( + s"Both $RowIndexFilterIdEncoded and $RowIndexFilterTypeKey must either be present or absent") + } + } + + private def otherMetadataColumns(file: PartitionedFile): Map[String, Object] = { + val otherMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + if (otherMetadata == null) { + Map.empty + } else { + otherMetadata.asScala.toMap + } + } + + private def parseDescriptor(encodedDescriptor: String): DeletionVectorDescriptor = { + try { + DeletionVectorDescriptor.deserializeFromBase64(encodedDescriptor) + } catch { + case NonFatal(e) => + throw new IllegalArgumentException("Unable to parse Delta deletion vector descriptor", e) + } + } + + private def parseRowIndexFilterType(filterType: String): RowIndexFilterType = { + filterType match { + case "IF_CONTAINED" => IF_CONTAINED + case "IF_NOT_CONTAINED" => IF_NOT_CONTAINED + case "KEEP_ALL" => KEEP_ALL + case unexpected => + throw new IllegalStateException(s"Unexpected row index filter type: $unexpected") + } + } + + private def serializePayload( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile, + descriptor: DeletionVectorDescriptor): Array[Byte] = { + val tablePath = resolveTablePath(spark, partitionColumnCount, file) + if (tablePath == null) { + throw new IllegalStateException( + "Unable to resolve Delta table path while materializing deletion vector payload") + } + val dvStore = new HadoopFileSystemDVStore(spark.sessionState.newHadoopConf()) + StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + } + + private def resolveTablePath( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile): Path = { + val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent + var tablePath = fileParent + for (_ <- 0 until partitionColumnCount) { + tablePath = tablePath.getParent + } + if (tablePath != null && isDeltaTablePath(spark, tablePath)) { + return tablePath + } + + var candidate = fileParent + while (candidate != null && !isDeltaTablePath(spark, candidate)) { + candidate = candidate.getParent + } + if (candidate != null) candidate else tablePath + } + + private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = { + val deltaLogPath = new Path(tablePath, "_delta_log") + try { + deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath) + } catch { + case NonFatal(_) => false + } + } + + private def unescapePathName(path: String): String = { + if (path == null || path.indexOf('%') < 0) { + path + } else { + val builder = new StringBuilder(path.length) + var index = 0 + while (index < path.length) { + if (path.charAt(index) == '%' && index + 2 < path.length) { + val high = Character.digit(path.charAt(index + 1), 16) + val low = Character.digit(path.charAt(index + 2), 16) + if (high >= 0 && low >= 0) { + builder.append(((high << 4) | low).toChar) + index += 3 + } else { + builder.append(path.charAt(index)) + index += 1 + } + } else { + builder.append(path.charAt(index)) + index += 1 + } + } + builder.toString() + } + } +} diff --git a/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala b/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala new file mode 100644 index 00000000000..cbb70817ba4 --- /dev/null +++ b/backends-velox/src-delta33/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.delta + +import org.apache.gluten.delta.DeltaDeletionVectorScanInfo.RowIndexFilterType + +import org.apache.spark.SparkConf +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.{DeltaLog, GlutenDeltaParquetFileFormat} +import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.delta.test.DeltaSQLTestUtils +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.tags.ExtendedSQLTest + +import io.delta.sql.DeltaSparkSessionExtension +import org.apache.hadoop.fs.Path + +@ExtendedSQLTest +class DeltaDeletionVectorScanInfoSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLTestUtils { + + import testImplicits._ + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, classOf[DeltaSparkSessionExtension].getName) + .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[DeltaCatalog].getName) + .set("spark.databricks.delta.snapshotPartitions", "2") + } + + test("extracts essential Delta DV scan info from split metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val dataFile = DeltaLog + .forTable(spark, new Path(path)) + .update() + .allFiles + .collect() + .find(_.deletionVector != null) + .get + val partitionedFile = partitionedFileWithMetadata( + path, + dataFile.path, + dataFile.size, + Map( + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED -> + dataFile.deletionVector.serializeToBase64(), + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED", + "kept_key" -> "kept_value" + ) + ) + + val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile) + val dvInfo = scanInfo.deletionVectorInfo + + assert(dvInfo.hasDeletionVector) + assert(dvInfo.rowIndexFilterType == RowIndexFilterType.IF_CONTAINED) + assert(dvInfo.cardinality == dataFile.deletionVector.cardinality) + assert(dvInfo.serializedDeletionVector.nonEmpty) + assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> "kept_value")) + } + } + + test("returns keep-all scan info when Delta DV metadata is absent") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a")).toDF("id", "value").coalesce(1).write.format("delta").save(path) + + val dataFile = DeltaLog.forTable(spark, new Path(path)).update().allFiles.collect().head + val partitionedFile = partitionedFileWithMetadata( + path, + dataFile.path, + dataFile.size, + Map("kept_key" -> "kept_value")) + + val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile) + val dvInfo = scanInfo.deletionVectorInfo + + assert(!dvInfo.hasDeletionVector) + assert(dvInfo.rowIndexFilterType == RowIndexFilterType.KEEP_ALL) + assert(dvInfo.cardinality == 0L) + assert(dvInfo.serializedDeletionVector.isEmpty) + assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> "kept_value")) + } + } + + test("rejects partial Delta DV split metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a")).toDF("id", "value").coalesce(1).write.format("delta").save(path) + + val dataFile = DeltaLog.forTable(spark, new Path(path)).update().allFiles.collect().head + val partitionedFile = partitionedFileWithMetadata( + path, + dataFile.path, + dataFile.size, + Map(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED")) + + val error = intercept[IllegalStateException] { + DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile) + } + assert(error.getMessage.contains("must either be present or absent")) + } + } + + private def partitionedFileWithMetadata( + tablePath: String, + relativeFilePath: String, + fileSize: Long, + metadata: Map[String, Object]): PartitionedFile = { + PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(tablePath, relativeFilePath)), + start = 0L, + length = fileSize, + fileSize = fileSize, + otherConstantMetadataColumnValues = metadata + ) + } +} diff --git a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala new file mode 100644 index 00000000000..12df2338784 --- /dev/null +++ b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.delta + +import org.apache.gluten.sql.shims.SparkShimLoader + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArrayFormat, StoredBitmap} +import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore +import org.apache.spark.sql.execution.datasources.PartitionedFile + +import org.apache.hadoop.fs.Path + +import java.util.{ArrayList => JArrayList} + +import scala.collection.JavaConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +object DeltaDeletionVectorScanInfo { + object RowIndexFilterType extends Enumeration { + type RowIndexFilterType = Value + val KEEP_ALL, IF_CONTAINED, IF_NOT_CONTAINED = Value + } + + import RowIndexFilterType._ + + final case class DeletionVectorInfo( + rowIndexFilterType: RowIndexFilterType, + descriptor: Option[DeletionVectorDescriptor], + serializedDeletionVector: Array[Byte]) { + def hasDeletionVector: Boolean = descriptor.isDefined + def cardinality: Long = descriptor.map(_.cardinality).getOrElse(0L) + } + + final case class PartitionFileScanInfo( + normalizedOtherMetadataColumns: Map[String, Object], + deletionVectorInfo: DeletionVectorInfo) + + private val RowIndexFilterIdEncoded = + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED + private val RowIndexFilterTypeKey = + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE + + def extract( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile): PartitionFileScanInfo = { + val metadata = otherMetadataColumns(file) + val normalizedMetadata = metadata -- Seq(RowIndexFilterIdEncoded, RowIndexFilterTypeKey) + val dvInfo = extractDeletionVectorInfo(spark, partitionColumnCount, file, metadata) + PartitionFileScanInfo(normalizedMetadata, dvInfo) + } + + def extractAll( + spark: SparkSession, + partitionColumnCount: Int, + files: Seq[PartitionedFile]): Seq[PartitionFileScanInfo] = { + files.map(extract(spark, partitionColumnCount, _)) + } + + def extractAllFromJava( + spark: SparkSession, + partitionColumnCount: Int, + files: java.util.List[PartitionedFile]): java.util.List[PartitionFileScanInfo] = { + new JArrayList(extractAll(spark, partitionColumnCount, files.asScala.toSeq).asJava) + } + + private def extractDeletionVectorInfo( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile, + metadata: Map[String, Object]): DeletionVectorInfo = { + val descriptorValue = metadata.get(RowIndexFilterIdEncoded) + val filterTypeValue = metadata.get(RowIndexFilterTypeKey) + + (descriptorValue, filterTypeValue) match { + case (None, None) => + DeletionVectorInfo(KEEP_ALL, None, Array.emptyByteArray) + case (Some(encodedDescriptor), Some(filterType)) => + val descriptor = parseDescriptor(encodedDescriptor.toString) + val serializedPayload = serializePayload(spark, partitionColumnCount, file, descriptor) + DeletionVectorInfo( + parseRowIndexFilterType(filterType.toString), + Some(descriptor), + serializedPayload) + case _ => + throw new IllegalStateException( + s"Both $RowIndexFilterIdEncoded and $RowIndexFilterTypeKey must either be present or absent") + } + } + + private def otherMetadataColumns(file: PartitionedFile): Map[String, Object] = { + val otherMetadata = + SparkShimLoader.getSparkShims.getOtherConstantMetadataColumnValues(file) + if (otherMetadata == null) { + Map.empty + } else { + otherMetadata.asScala.toMap + } + } + + private def parseDescriptor(encodedDescriptor: String): DeletionVectorDescriptor = { + val methods = Seq("deserializeFromBase64", "fromJson") + methods.iterator + .map { + methodName => + Try { + val method = DeletionVectorDescriptor.getClass.getMethod(methodName, classOf[String]) + method + .invoke(DeletionVectorDescriptor, encodedDescriptor) + .asInstanceOf[DeletionVectorDescriptor] + }.toOption + } + .collectFirst { case Some(descriptor) => descriptor } + .getOrElse { + throw new IllegalArgumentException("Unable to parse Delta deletion vector descriptor") + } + } + + private def parseRowIndexFilterType(filterType: String): RowIndexFilterType = { + filterType match { + case "IF_CONTAINED" => IF_CONTAINED + case "IF_NOT_CONTAINED" => IF_NOT_CONTAINED + case "KEEP_ALL" => KEEP_ALL + case unexpected => + throw new IllegalStateException(s"Unexpected row index filter type: $unexpected") + } + } + + private def serializePayload( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile, + descriptor: DeletionVectorDescriptor): Array[Byte] = { + val tablePath = resolveTablePath(spark, partitionColumnCount, file) + if (tablePath == null) { + throw new IllegalStateException( + "Unable to resolve Delta table path while materializing deletion vector payload") + } + val dvStore = new HadoopFileSystemDVStore(spark.sessionState.newHadoopConf()) + StoredBitmap + .create(descriptor, tablePath) + .load(dvStore) + .serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + } + + private def resolveTablePath( + spark: SparkSession, + partitionColumnCount: Int, + file: PartitionedFile): Path = { + val fileParent = new Path(unescapePathName(file.filePath.toString)).getParent + var tablePath = fileParent + for (_ <- 0 until partitionColumnCount) { + tablePath = tablePath.getParent + } + if (tablePath != null && isDeltaTablePath(spark, tablePath)) { + return tablePath + } + + var candidate = fileParent + while (candidate != null && !isDeltaTablePath(spark, candidate)) { + candidate = candidate.getParent + } + if (candidate != null) candidate else tablePath + } + + private def isDeltaTablePath(spark: SparkSession, tablePath: Path): Boolean = { + val deltaLogPath = new Path(tablePath, "_delta_log") + try { + deltaLogPath.getFileSystem(spark.sessionState.newHadoopConf()).exists(deltaLogPath) + } catch { + case NonFatal(_) => false + } + } + + private def unescapePathName(path: String): String = { + if (path == null || path.indexOf('%') < 0) { + path + } else { + val builder = new StringBuilder(path.length) + var index = 0 + while (index < path.length) { + if (path.charAt(index) == '%' && index + 2 < path.length) { + val high = Character.digit(path.charAt(index + 1), 16) + val low = Character.digit(path.charAt(index + 2), 16) + if (high >= 0 && low >= 0) { + builder.append(((high << 4) | low).toChar) + index += 3 + } else { + builder.append(path.charAt(index)) + index += 1 + } + } else { + builder.append(path.charAt(index)) + index += 1 + } + } + builder.toString() + } + } +} diff --git a/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala b/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala new file mode 100644 index 00000000000..cbb70817ba4 --- /dev/null +++ b/backends-velox/src-delta40/test/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfoSuite.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.delta + +import org.apache.gluten.delta.DeltaDeletionVectorScanInfo.RowIndexFilterType + +import org.apache.spark.SparkConf +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.delta.{DeltaLog, GlutenDeltaParquetFileFormat} +import org.apache.spark.sql.delta.catalog.DeltaCatalog +import org.apache.spark.sql.delta.test.DeltaSQLTestUtils +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.tags.ExtendedSQLTest + +import io.delta.sql.DeltaSparkSessionExtension +import org.apache.hadoop.fs.Path + +@ExtendedSQLTest +class DeltaDeletionVectorScanInfoSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLTestUtils { + + import testImplicits._ + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key, classOf[DeltaSparkSessionExtension].getName) + .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[DeltaCatalog].getName) + .set("spark.databricks.delta.snapshotPartitions", "2") + } + + test("extracts essential Delta DV scan info from split metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a"), (2, "b"), (3, "c"), (4, "d")) + .toDF("id", "value") + .coalesce(1) + .write + .format("delta") + .save(path) + + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)") + spark.sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 4)") + + val dataFile = DeltaLog + .forTable(spark, new Path(path)) + .update() + .allFiles + .collect() + .find(_.deletionVector != null) + .get + val partitionedFile = partitionedFileWithMetadata( + path, + dataFile.path, + dataFile.size, + Map( + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_ID_ENCODED -> + dataFile.deletionVector.serializeToBase64(), + GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED", + "kept_key" -> "kept_value" + ) + ) + + val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile) + val dvInfo = scanInfo.deletionVectorInfo + + assert(dvInfo.hasDeletionVector) + assert(dvInfo.rowIndexFilterType == RowIndexFilterType.IF_CONTAINED) + assert(dvInfo.cardinality == dataFile.deletionVector.cardinality) + assert(dvInfo.serializedDeletionVector.nonEmpty) + assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> "kept_value")) + } + } + + test("returns keep-all scan info when Delta DV metadata is absent") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a")).toDF("id", "value").coalesce(1).write.format("delta").save(path) + + val dataFile = DeltaLog.forTable(spark, new Path(path)).update().allFiles.collect().head + val partitionedFile = partitionedFileWithMetadata( + path, + dataFile.path, + dataFile.size, + Map("kept_key" -> "kept_value")) + + val scanInfo = DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile) + val dvInfo = scanInfo.deletionVectorInfo + + assert(!dvInfo.hasDeletionVector) + assert(dvInfo.rowIndexFilterType == RowIndexFilterType.KEEP_ALL) + assert(dvInfo.cardinality == 0L) + assert(dvInfo.serializedDeletionVector.isEmpty) + assert(scanInfo.normalizedOtherMetadataColumns == Map("kept_key" -> "kept_value")) + } + } + + test("rejects partial Delta DV split metadata") { + withTempDir { + tempDir => + val path = tempDir.getCanonicalPath + Seq((1, "a")).toDF("id", "value").coalesce(1).write.format("delta").save(path) + + val dataFile = DeltaLog.forTable(spark, new Path(path)).update().allFiles.collect().head + val partitionedFile = partitionedFileWithMetadata( + path, + dataFile.path, + dataFile.size, + Map(GlutenDeltaParquetFileFormat.FILE_ROW_INDEX_FILTER_TYPE -> "IF_CONTAINED")) + + val error = intercept[IllegalStateException] { + DeltaDeletionVectorScanInfo.extract(spark, 0, partitionedFile) + } + assert(error.getMessage.contains("must either be present or absent")) + } + } + + private def partitionedFileWithMetadata( + tablePath: String, + relativeFilePath: String, + fileSize: Long, + metadata: Map[String, Object]): PartitionedFile = { + PartitionedFile( + partitionValues = InternalRow.empty, + filePath = SparkPath.fromPath(new Path(tablePath, relativeFilePath)), + start = 0L, + length = fileSize, + fileSize = fileSize, + otherConstantMetadataColumnValues = metadata + ) + } +} From 7fb1a1baffcfa25a31cea2091808128e0337a0a7 Mon Sep 17 00:00:00 2001 From: malinjawi Date: Tue, 2 Jun 2026 14:09:21 +0300 Subject: [PATCH 2/3] [VL][Delta] Hide DV descriptor from scan info API --- .../gluten/delta/DeltaDeletionVectorScanInfo.scala | 13 ++++++------- .../gluten/delta/DeltaDeletionVectorScanInfo.scala | 13 ++++++------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala index cad49ea2ffd..9b77b99c040 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala @@ -42,11 +42,9 @@ object DeltaDeletionVectorScanInfo { final case class DeletionVectorInfo( rowIndexFilterType: RowIndexFilterType, - descriptor: Option[DeletionVectorDescriptor], - serializedDeletionVector: Array[Byte]) { - def hasDeletionVector: Boolean = descriptor.isDefined - def cardinality: Long = descriptor.map(_.cardinality).getOrElse(0L) - } + hasDeletionVector: Boolean, + cardinality: Long, + serializedDeletionVector: Array[Byte]) final case class PartitionFileScanInfo( normalizedOtherMetadataColumns: Map[String, Object], @@ -91,13 +89,14 @@ object DeltaDeletionVectorScanInfo { (descriptorValue, filterTypeValue) match { case (None, None) => - DeletionVectorInfo(KEEP_ALL, None, Array.emptyByteArray) + DeletionVectorInfo(KEEP_ALL, false, 0L, Array.emptyByteArray) case (Some(encodedDescriptor), Some(filterType)) => val descriptor = parseDescriptor(encodedDescriptor.toString) val serializedPayload = serializePayload(spark, partitionColumnCount, file, descriptor) DeletionVectorInfo( parseRowIndexFilterType(filterType.toString), - Some(descriptor), + true, + descriptor.cardinality, serializedPayload) case _ => throw new IllegalStateException( diff --git a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala index 12df2338784..7128b032982 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala @@ -43,11 +43,9 @@ object DeltaDeletionVectorScanInfo { final case class DeletionVectorInfo( rowIndexFilterType: RowIndexFilterType, - descriptor: Option[DeletionVectorDescriptor], - serializedDeletionVector: Array[Byte]) { - def hasDeletionVector: Boolean = descriptor.isDefined - def cardinality: Long = descriptor.map(_.cardinality).getOrElse(0L) - } + hasDeletionVector: Boolean, + cardinality: Long, + serializedDeletionVector: Array[Byte]) final case class PartitionFileScanInfo( normalizedOtherMetadataColumns: Map[String, Object], @@ -92,13 +90,14 @@ object DeltaDeletionVectorScanInfo { (descriptorValue, filterTypeValue) match { case (None, None) => - DeletionVectorInfo(KEEP_ALL, None, Array.emptyByteArray) + DeletionVectorInfo(KEEP_ALL, false, 0L, Array.emptyByteArray) case (Some(encodedDescriptor), Some(filterType)) => val descriptor = parseDescriptor(encodedDescriptor.toString) val serializedPayload = serializePayload(spark, partitionColumnCount, file, descriptor) DeletionVectorInfo( parseRowIndexFilterType(filterType.toString), - Some(descriptor), + true, + descriptor.cardinality, serializedPayload) case _ => throw new IllegalStateException( From dd507392eb1411efb485be21a3d951bca3f71f67 Mon Sep 17 00:00:00 2001 From: malinjawi Date: Tue, 2 Jun 2026 22:05:46 +0300 Subject: [PATCH 3/3] [VL][Delta] Reorder DV scan info fields --- .../apache/gluten/delta/DeltaDeletionVectorScanInfo.scala | 6 +++--- .../apache/gluten/delta/DeltaDeletionVectorScanInfo.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala index 9b77b99c040..8ada3d755c1 100644 --- a/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala +++ b/backends-velox/src-delta33/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala @@ -41,8 +41,8 @@ object DeltaDeletionVectorScanInfo { import RowIndexFilterType._ final case class DeletionVectorInfo( - rowIndexFilterType: RowIndexFilterType, hasDeletionVector: Boolean, + rowIndexFilterType: RowIndexFilterType, cardinality: Long, serializedDeletionVector: Array[Byte]) @@ -89,13 +89,13 @@ object DeltaDeletionVectorScanInfo { (descriptorValue, filterTypeValue) match { case (None, None) => - DeletionVectorInfo(KEEP_ALL, false, 0L, Array.emptyByteArray) + DeletionVectorInfo(false, KEEP_ALL, 0L, Array.emptyByteArray) case (Some(encodedDescriptor), Some(filterType)) => val descriptor = parseDescriptor(encodedDescriptor.toString) val serializedPayload = serializePayload(spark, partitionColumnCount, file, descriptor) DeletionVectorInfo( - parseRowIndexFilterType(filterType.toString), true, + parseRowIndexFilterType(filterType.toString), descriptor.cardinality, serializedPayload) case _ => diff --git a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala index 7128b032982..69a5dc30784 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/gluten/delta/DeltaDeletionVectorScanInfo.scala @@ -42,8 +42,8 @@ object DeltaDeletionVectorScanInfo { import RowIndexFilterType._ final case class DeletionVectorInfo( - rowIndexFilterType: RowIndexFilterType, hasDeletionVector: Boolean, + rowIndexFilterType: RowIndexFilterType, cardinality: Long, serializedDeletionVector: Array[Byte]) @@ -90,13 +90,13 @@ object DeltaDeletionVectorScanInfo { (descriptorValue, filterTypeValue) match { case (None, None) => - DeletionVectorInfo(KEEP_ALL, false, 0L, Array.emptyByteArray) + DeletionVectorInfo(false, KEEP_ALL, 0L, Array.emptyByteArray) case (Some(encodedDescriptor), Some(filterType)) => val descriptor = parseDescriptor(encodedDescriptor.toString) val serializedPayload = serializePayload(spark, partitionColumnCount, file, descriptor) DeletionVectorInfo( - parseRowIndexFilterType(filterType.toString), true, + parseRowIndexFilterType(filterType.toString), descriptor.cardinality, serializedPayload) case _ =>