From 71e672d6170d20f9dcd903d30fb97e55a7eaabb7 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou <93710326+andreaschat-db@users.noreply.github.com> Date: Fri, 17 Jan 2025 17:05:59 +0100 Subject: [PATCH] [Spark] Create DV tombstones when dropping Deletion Vectors #117451 (#4062) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Drop feature with checkpoint protection allows dropped features to live in the table history. This has consequences in the vacuum command. This is because vacuum only reads the latest snapshot, and uses that to decide what can be deleted or not. This could lead clients that do not support past feature(s) to incorrectly vacuum auxiliary files, unless the latest snapshot can prevent this from happening using only features from the reduced protocol. The way that the table snapshot prevents deletion of files is by referencing files that were recently removed from the table (in the last 7 days) in RemoveFile actions or "tombstones". This is problematic when a newer feature extends the functionality of these tombstones. For instance, the Deletion Vectors feature has `RemoveFile` actions that contain a reference both to the main file as well as the auxiliary Deletion Vector file. But after the downgrade, clients will not see that auxiliary DV file reference anymore, and will not preserve Deletion Vectors that were only removed from the table within the past 7 days. This PR resolves this issue by ensuring that the auxiliary files are referenced by adding RemoveFile actions during the pre-downgrade process of Deletion Vectors. ## How was this patch tested? Added new tests in `DeltaFastDropFeatureSuite`. ## Does this PR introduce _any_ user-facing changes? No. --- .../org/apache/spark/sql/delta/Checksum.scala | 17 +- .../spark/sql/delta/DeltaOperations.scala | 15 + .../org/apache/spark/sql/delta/DeltaUDF.scala | 21 ++ .../PreDowngradeTableFeatureCommand.scala | 210 ++++++++--- .../apache/spark/sql/delta/TableFeature.scala | 8 + .../actions/DeletionVectorDescriptor.scala | 40 +- .../delta/actions/TableFeatureSupport.scala | 2 +- .../spark/sql/delta/actions/actions.scala | 4 + .../sql/delta/commands/VacuumCommand.scala | 12 +- .../commands/alterDeltaTableCommands.scala | 6 +- .../sql/delta/sources/DeltaSQLConf.scala | 30 ++ .../sql/delta/DeltaFastDropFeatureSuite.scala | 357 +++++++++++++++++- 12 files changed, 658 insertions(+), 64 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 867fc2f038a..786494d62cd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -213,6 +213,8 @@ trait RecordChecksum extends DeltaLogging { // Incrementally compute the new version checksum, if the old one is available. val ignoreAddFilesInOperation = RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName) + val ignoreRemoveFilesInOperation = + RecordChecksum.operationNamesWhereRemoveFilesIgnoredForIncrementalCrc.contains(operationName) val persistentDVsOnTableReadable = DeletionVectorUtils.deletionVectorsReadable(protocol, metadata) val persistentDVsOnTableWritable = @@ -226,6 +228,7 @@ trait RecordChecksum extends DeltaLogging { oldSnapshot, actions, ignoreAddFilesInOperation, + ignoreRemoveFilesInOperation, includeAddFilesInCrc, persistentDVsOnTableReadable, persistentDVsOnTableWritable @@ -244,6 +247,8 @@ trait RecordChecksum extends DeltaLogging { * @param actions used to incrementally compute new checksum. * @param ignoreAddFiles for transactions whose add file actions refer to already-existing files * e.g., [[DeltaOperations.ComputeStats]] transactions. + * @param ignoreRemoveFiles for transactions that generate RemoveFiles for auxiliary files + * e.g., [[DeltaOperations.AddDeletionVectorsTombstones]]. * @param persistentDVsOnTableReadable Indicates whether commands modifying this table are allowed * to read deletion vectors. * @param persistentDVsOnTableWritable Indicates whether commands modifying this table are allowed @@ -260,6 +265,7 @@ trait RecordChecksum extends DeltaLogging { oldSnapshot: Option[Snapshot], actions: Seq[Action], ignoreAddFiles: Boolean, + ignoreRemoveFiles: Boolean, includeAllFilesInCRC: Boolean, persistentDVsOnTableReadable: Boolean, persistentDVsOnTableWritable: Boolean @@ -319,6 +325,8 @@ trait RecordChecksum extends DeltaLogging { numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ + dvCount) deletedRecordCountsHistogramOpt.foreach(_.insert(dvCardinality)) + case _: RemoveFile if ignoreRemoveFiles => () + // extendedFileMetadata == true implies fields partitionValues, size, and tags are present case r: RemoveFile if r.extendedFileMetadata == Some(true) => val size = r.size.get @@ -597,7 +605,7 @@ trait RecordChecksum extends DeltaLogging { object RecordChecksum { // Operations where we should ignore AddFiles in the incremental checksum computation. - val operationNamesWhereAddFilesIgnoredForIncrementalCrc = Set( + private[delta] val operationNamesWhereAddFilesIgnoredForIncrementalCrc = Set( // The transaction that computes stats is special -- it re-adds files that already exist, in // order to update their min/max stats. We should not count those against the totals. DeltaOperations.ComputeStats(Seq.empty).name, @@ -605,6 +613,13 @@ object RecordChecksum { // Incremental commits should ignore backfill commits. DeltaOperations.RowTrackingBackfill().name ) + + // Operations where we should ignore RemoveFiles in the incremental checksum computation. + private[delta] val operationNamesWhereRemoveFilesIgnoredForIncrementalCrc = Set( + // Deletion vector tombstones are only required to protect DVs from vacuum. They should be + // ignored in checksum calculation. + DeltaOperations.AddDeletionVectorsTombstones.name + ) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index cc02b247cf9..351025883d5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -487,6 +487,21 @@ object DeltaOperations { override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } + + /** + * Recorded when dropping deletion vectors. Deletion Vector tombstones directly reference + * deletion vector files within the retention period. This is to protect them from deletion + * against oblivious writers when vacuuming. + */ + object AddDeletionVectorsTombstones extends Operation("Deletion Vector Tombstones") { + override val parameters: Map[String, Any] = Map.empty + + // This operation should only introduce RemoveFile actions. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) + } + /** Recorded when columns are added. */ case class AddColumns( colsToAdd: Seq[QualifiedColTypeWithPositionForLog]) extends Operation("ADD COLUMNS") { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala index 018c9a506f4..33dba57cec8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta +import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram @@ -50,6 +51,17 @@ object DeltaUDF { f: Array[Long] => DeletedRecordCountsHistogram): UserDefinedFunction = createUdfFromTemplateUnsafe(deletedRecordCountsHistogramFromArrayLongTemplate, f, udf(f)) + def stringFromDeletionVectorDescriptor( + f: DeletionVectorDescriptor => String): UserDefinedFunction = + createUdfFromTemplateUnsafe(stringFromDeletionVectorDescriptorTemplate, f, udf(f)) + + def booleanFromDeletionVectorDescriptor( + f: DeletionVectorDescriptor => Boolean): UserDefinedFunction = + createUdfFromTemplateUnsafe(booleanFromDeletionVectorDescriptorTemplate, f, udf(f)) + + def booleanFromString(s: String => Boolean): UserDefinedFunction = + createUdfFromTemplateUnsafe(booleanFromStringTemplate, s, udf(s)) + def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction = createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f)) @@ -74,6 +86,15 @@ object DeltaUDF { udf((_: Array[Long]) => DeletedRecordCountsHistogram(Array.empty)) .asInstanceOf[SparkUserDefinedFunction] + private lazy val stringFromDeletionVectorDescriptorTemplate = + udf((_: DeletionVectorDescriptor) => "").asInstanceOf[SparkUserDefinedFunction] + + private lazy val booleanFromDeletionVectorDescriptorTemplate = + udf((_: DeletionVectorDescriptor) => false).asInstanceOf[SparkUserDefinedFunction] + + private lazy val booleanFromStringTemplate = + udf((_: String) => false).asInstanceOf[SparkUserDefinedFunction] + private lazy val booleanFromMapTemplate = udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction] diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 13213c0bada..23db6c4e503 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit import scala.util.control.NonFatal +import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, RemoveFile} import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand @@ -27,10 +28,14 @@ import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} import org.apache.spark.sql.util.ScalaExtensions._ +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedTable +import org.apache.spark.sql.functions.{approx_count_distinct, col, not} + /** * A base class for implementing a preparation command for removing table features. @@ -45,14 +50,14 @@ sealed abstract class PreDowngradeTableFeatureCommand { * Returns true when it performs a cleaning action. When no action was required * it returns false. */ - def removeFeatureTracesIfNeeded(): Boolean + def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean } case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand with DeltaLogging { // To remove the feature we only need to remove the table property. - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { // Make sure feature data/metadata exist before proceeding. if (TestRemovableWriterFeature.validateRemoval(table.initialSnapshot)) return false @@ -62,21 +67,21 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2) val properties = Seq(TestRemovableWriterFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand( - table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) + table, properties, ifExists = true, fromDropFeatureCommand = true).run(spark) true } } case class TestUnsupportedReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand { - override def removeFeatureTracesIfNeeded(): Boolean = true + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = true } case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand with DeltaLogging { // To remove the feature we only need to remove the table property. - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { // Make sure feature data/metadata exist before proceeding. if (TestRemovableWriterWithHistoryTruncationFeature.validateRemoval(table.initialSnapshot)) { return false @@ -84,7 +89,7 @@ case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: Delt val properties = Seq(TestRemovableWriterWithHistoryTruncationFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand( - table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) + table, properties, ifExists = true, fromDropFeatureCommand = true).run(spark) true } } @@ -93,7 +98,7 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand with DeltaLogging { // To remove the feature we only need to remove the table property. - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { // Make sure feature data/metadata exist before proceeding. if (TestRemovableReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false @@ -103,7 +108,7 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) val properties = Seq(TestRemovableReaderWriterFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand( - table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) + table, properties, ifExists = true, fromDropFeatureCommand = true).run(spark) true } } @@ -111,12 +116,12 @@ case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) case class TestLegacyWriterFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand { /** Return true if we removed the property, false if no action was needed. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { if (TestRemovableLegacyWriterFeature.validateRemoval(table.initialSnapshot)) return false val properties = Seq(TestRemovableLegacyWriterFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand( - table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) + table, properties, ifExists = true, fromDropFeatureCommand = true).run(spark) true } } @@ -124,25 +129,133 @@ case class TestLegacyWriterFeaturePreDowngradeCommand(table: DeltaTableV2) case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand { /** Return true if we removed the property, false if no action was needed. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { if (TestRemovableLegacyReaderWriterFeature.validateRemoval(table.initialSnapshot)) return false val properties = Seq(TestRemovableLegacyReaderWriterFeature.TABLE_PROP_KEY) AlterTableUnsetPropertiesDeltaCommand( - table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) + table, properties, ifExists = true, fromDropFeatureCommand = true).run(spark) true } } -class DeletionVectorsRemovalMetrics( +private[delta] class DeletionVectorsRemovalMetrics( val numDeletionVectorsToRemove: Long, val numDeletionVectorRowsToRemove: Long, + var dvTombstonesWithinRetentionPeriod: Long = 0L, + var addDVTombstonesTime: Long = 0L, var downgradeTimeMs: Long = 0L) case class DeletionVectorsPreDowngradeCommand(table: DeltaTableV2) extends PreDowngradeTableFeatureCommand with DeltaLogging { + /** + * Create RemoveFiles (tombstones) that directly reference deletion vector within the retention + * period. These protect the latter from accidental removal from clients that do not support + * deletion vectors. + * + * Note, we always create the DV tombstones even for the drop feature with history + * truncation implementation. This is to protect against a corner case where the user run + * drop feature with fastDropFeature.enabled = false and then run again with + * fastDropFeature.enabled = true. + * + * @param checkIfSnapshotUpdatedSinceTs The timestamp to use for updating the snapshot. + * @param metrics The deletion vectors removal metrics. This function only updates the DV + * tombstone related metrics. + */ + private def generateDVTombstones( + spark: SparkSession, + checkIfSnapshotUpdatedSinceTs: Long, + metrics: DeletionVectorsRemovalMetrics): Unit = { + import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.delta.implicits._ + + if (!spark.conf.get(DeltaSQLConf.FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES)) return + + val startTimeNs = System.nanoTime() + val snapshotToUse = table.deltaLog.update( + checkIfUpdatedSinceTs = Some(checkIfSnapshotUpdatedSinceTs)) + + val deletionVectorPath = DeletionVectorDescriptor.urlEncodedPath( + deletionVectorCol = col("deletionVector"), + tablePath = table.deltaLog.dataPath) + val isInlineDeletionVector = DeletionVectorDescriptor.isInline(col("deletionVector")) + + // SnapshotToUse.tombstones returns only the tombstones within the retention period. The + // default tombstone retention period is 7 days. Note, that if a RemoveFile contains + // DeletionVectorDescriptor, it is guaranteed it is not a DV Tombstone. Furthermore, we + // use distinct to deduplicate the DV references. This is because we merge DVs, and as a + // result, several AddFiles may point to the same DV file. + val removeFilesWithDVs = snapshotToUse.tombstones + .filter(col("deletionVector").isNotNull) + .filter(not(isInlineDeletionVector)) + .select(deletionVectorPath.as("path")) + .distinct() + + // This is a union of the DV tombstones and the regular data file tombstones without DVs (we + // cannot tell the difference). We use it to identify which DV tombstones are already created. + val filesWithoutDVs = snapshotToUse.tombstones + .filter(col("deletionVector").isNull) + .select("path") + + val dvTombstonePathsToAdd = removeFilesWithDVs + .join(filesWithoutDVs, "path", "left_anti") + .as[String] + + val actionsToCommit = dvTombstonePathsToAdd.toLocalIterator().asScala.map { dvPath => + // Disable scala style rules to ignore warning that RemoveFile files should never be + // instantiated directly. + // scalastyle:off + RemoveFile( + path = dvPath, + deletionTimestamp = Some(table.deltaLog.clock.getTimeMillis()), + dataChange = false) + // scalastyle:on + } + + // We pay some overhead here to estimate the memory required to hold the results. + // Above some threshold we use commitLarge. This allows to use an iterator instead of + // materializing results in memory. However, it comes with some disadvantages: if there is a + // conflict the commit is not retried. + // A cheaper alternative would be to use snapshot.numDeletionVectorsOpt + // (right before the reorg in drop feature) but this does not capture deduplication as well as + // any reorgs that occurred before dropping DVs. + // We assume 1024 bytes are required per RemoveFile. + val tombstonesToAddCount = + dvTombstonePathsToAdd.select(approx_count_distinct("path")).as[Long].first + + val tombstoneCountThreshold = + spark.conf.get(DeltaSQLConf.FAST_DROP_FEATURE_DV_TOMBSTONE_COUNT_THRESHOLD) + + if (tombstonesToAddCount > tombstoneCountThreshold) { + table.startTransaction(Some(snapshotToUse)).commitLarge( + spark, + nonProtocolMetadataActions = actionsToCommit, + op = DeltaOperations.AddDeletionVectorsTombstones, + newProtocolOpt = None, + context = Map.empty, + metrics = Map("dvTombstonesWithinRetentionPeriod" -> tombstonesToAddCount.toString)) + } else { + table.startTransaction(Some(snapshotToUse)) + .commit(actionsToCommit.toList, DeltaOperations.AddDeletionVectorsTombstones) + } + + metrics.addDVTombstonesTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs) + metrics.dvTombstonesWithinRetentionPeriod = tombstonesToAddCount + } + + private def reorgTable(spark: SparkSession) = { + // Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog & + // table ID won't be used by DeltaReorgTableCommand. + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog + val tableId = Seq(table.name()).asIdentifier + + DeltaReorgTableCommand(target = ResolvedTable.create(catalog, tableId, table))(Nil) + .run(table.spark) + } + /** * We first remove the table feature property to prevent any transactions from committing * new DVs. This will cause any concurrent transactions tox fail. Then, we run PURGE @@ -152,38 +265,41 @@ case class DeletionVectorsPreDowngradeCommand(table: DeltaTableV2) * * @return Returns true if it removed DV metadata property and/or DVs. False otherwise. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded( + spark: SparkSession): Boolean = { + val startTimeNs = table.deltaLog.clock.nanoTime() + // Latest snapshot looks clean. No action is required. We may proceed // to the protocol downgrade phase. - if (DeletionVectorsTableFeature.validateRemoval(table.initialSnapshot)) return false + val snapshot = table.update() + val tracesFound = !DeletionVectorsTableFeature.validateRemoval(snapshot) + if (tracesFound) { + val properties = Seq(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key) + AlterTableUnsetPropertiesDeltaCommand( + table, properties, ifExists = true, fromDropFeatureCommand = true).run(spark) - val startTimeNs = System.nanoTime() - val properties = Seq(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key) - AlterTableUnsetPropertiesDeltaCommand( - table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) + reorgTable(spark) + } - val snapshot = table.update() val metrics = new DeletionVectorsRemovalMetrics( numDeletionVectorsToRemove = snapshot.numDeletionVectorsOpt.getOrElse(0L), numDeletionVectorRowsToRemove = snapshot.numDeletedRecordsOpt.getOrElse(0L)) - // Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog & - // table ID won't be used by DeltaReorgTableCommand. - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog - val tableId = Seq(table.name()).asIdentifier + reorgTable(spark) - DeltaReorgTableCommand(target = ResolvedTable.create(catalog, tableId, table))(Nil) - .run(table.spark) + // Even if there no DV traces in the table we check if there are missing DV tombstones. + // This is to protect against an edge case where all DV traces are cleaned before invoking + // the drop feature command. + generateDVTombstones(spark, startTimeNs, metrics) metrics.downgradeTimeMs = - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs) + TimeUnit.NANOSECONDS.toMillis(table.deltaLog.clock.nanoTime() - startTimeNs) recordDeltaEvent( table.deltaLog, opType = "delta.deletionVectorsFeatureRemovalMetrics", data = metrics) - true + tracesFound } } @@ -197,13 +313,13 @@ case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2) * @return True if it changed checkpoint policy metadata property to classic. * False otherwise. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { if (V2CheckpointTableFeature.validateRemoval(table.initialSnapshot)) return false val startTimeNs = System.nanoTime() val properties = Map(DeltaConfigs.CHECKPOINT_POLICY.key -> CheckpointPolicy.Classic.name) - AlterTableSetPropertiesDeltaCommand(table, properties).run(table.spark) + AlterTableSetPropertiesDeltaCommand(table, properties).run(spark) recordDeltaEvent( table.deltaLog, @@ -231,7 +347,7 @@ case class InCommitTimestampsPreDowngradeCommand(table: DeltaTableV2) * @return true if any change to the metadata (the three properties listed above) was made. * False otherwise. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { val startTimeNs = System.nanoTime() val currentMetadata = table.initialSnapshot.metadata val currentTableProperties = currentMetadata.configuration @@ -283,7 +399,7 @@ case class VacuumProtocolCheckPreDowngradeCommand(table: DeltaTableV2) * For downgrading the [[VacuumProtocolCheckTableFeature]], we don't need remove any traces, we * just need to remove the feature from the [[Protocol]]. */ - override def removeFeatureTracesIfNeeded(): Boolean = false + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = false } case class CoordinatedCommitsPreDowngradeCommand(table: DeltaTableV2) @@ -302,7 +418,7 @@ case class CoordinatedCommitsPreDowngradeCommand(table: DeltaTableV2) * if there were any unbackfilled commits that were backfilled. * false otherwise. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { val startTimeNs = System.nanoTime() var traceRemovalNeeded = false @@ -318,7 +434,7 @@ case class CoordinatedCommitsPreDowngradeCommand(table: DeltaTableV2) CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS, ifExists = true, fromDropFeatureCommand = true - ).run(table.spark) + ).run(spark) } catch { case NonFatal(e) => exceptionOpt = Some(e) @@ -366,14 +482,14 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) * * @return Return true if files were rewritten or metadata was removed. False otherwise. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { if (TypeWideningTableFeature.validateRemoval(table.initialSnapshot)) return false val startTimeNs = System.nanoTime() val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key) AlterTableUnsetPropertiesDeltaCommand( - table, properties, ifExists = true, fromDropFeatureCommand = true).run(table.spark) - val numFilesRewritten = rewriteFilesIfNeeded() + table, properties, ifExists = true, fromDropFeatureCommand = true).run(spark) + val numFilesRewritten = rewriteFilesIfNeeded(spark) val metadataRemoved = removeMetadataIfNeeded() recordDeltaEvent( @@ -393,7 +509,7 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) * schema. These are all files not added or modified after the last type change. * @return Return the number of files rewritten. */ - private def rewriteFilesIfNeeded(): Long = { + private def rewriteFilesIfNeeded(spark: SparkSession): Long = { if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) { return 0L } @@ -401,7 +517,7 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) // Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog & // table ID won't be used by DeltaReorgTableCommand. import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog + val catalog = spark.sessionState.catalogManager.currentCatalog.asTableCatalog val tableId = Seq(table.name()).asIdentifier val reorg = DeltaReorgTableCommand( @@ -409,7 +525,7 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) )(Nil) - val rows = reorg.run(table.spark) + val rows = reorg.run(spark) val metrics = rows.head.getAs[OptimizeMetrics](1) metrics.numFilesRemoved } @@ -447,9 +563,7 @@ case class ColumnMappingPreDowngradeCommand(table: DeltaTableV2) * @return Returns true if it removed table property and/or has rewritten the data. * False otherwise. */ - override def removeFeatureTracesIfNeeded(): Boolean = { - val spark = table.spark - + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { // Latest snapshot looks clean. No action is required. We may proceed // to the protocol downgrade phase. if (ColumnMappingTableFeature.validateRemoval(table.initialSnapshot)) return false @@ -476,7 +590,7 @@ case class CheckConstraintsPreDowngradeTableFeatureCommand(table: DeltaTableV2) * representation). Instead, we ask the user to explicitly drop the constraints before the table * feature can be dropped. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { val checkConstraintNames = Constraints.getCheckConstraintNames(table.initialSnapshot.metadata) if (checkConstraintNames.isEmpty) return false throw DeltaErrors.cannotDropCheckConstraintFeature(checkConstraintNames) @@ -507,13 +621,13 @@ case class CheckpointProtectionPreDowngradeCommand(table: DeltaTableV2) * expiration. This allows the drop process to proceed immediately after we cleanup the history * prior to requireCheckpointProtectionBeforeVersion. */ - override def removeFeatureTracesIfNeeded(): Boolean = { + override def removeFeatureTracesIfNeeded(spark: SparkSession): Boolean = { val snapshot = table.initialSnapshot if (!historyPriorToCheckpointProtectionVersionIsTruncated(snapshot)) { // Add a checkpoint here to make sure we can cleanup up everything before this commit. // This is because metadata cleanup operations, can only clean up to the latest checkpoint. - createEmptyCommitAndCheckpoint(table, System.nanoTime()) + createEmptyCommitAndCheckpoint(table, table.deltaLog.clock.nanoTime()) table.deltaLog.cleanUpExpiredLogs( snapshot, @@ -529,7 +643,7 @@ case class CheckpointProtectionPreDowngradeCommand(table: DeltaTableV2) // If history is truncated we do not need the property anymore. val property = DeltaConfigs.REQUIRE_CHECKPOINT_PROTECTION_BEFORE_VERSION.key AlterTableUnsetPropertiesDeltaCommand( - table, Seq(property), ifExists = true, fromDropFeatureCommand = true).run(table.spark) + table, Seq(property), ifExists = true, fromDropFeatureCommand = true).run(spark) // We did not do any changes that require history expiration. It is ok if the removed property // exists in history. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index ebef075a075..8eb00061775 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -660,6 +660,14 @@ object DeletionVectorsTableFeature DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata) } + /** + * Validate whether all deletion vector traces are removed from the snapshot. + * + * Note, we do not need to validate whether DV tombstones exist. These are added in the + * pre-downgrade stage and always cover all DVs within the retention period. This invariant can + * never change unless we enable again DVs. If DVs are enabled before the protocol downgrade + * we will abort the operation. + */ override def validateRemoval(snapshot: Snapshot): Boolean = { val dvsWritable = DeletionVectorUtils.deletionVectorsWritable(snapshot) val dvsExist = snapshot.numDeletionVectorsOpt.getOrElse(0L) > 0 diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/DeletionVectorDescriptor.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/DeletionVectorDescriptor.scala index fc31885993f..ba7a46f0910 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/DeletionVectorDescriptor.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/DeletionVectorDescriptor.scala @@ -21,6 +21,7 @@ import java.net.URI import java.util.{Base64, UUID} import org.apache.spark.sql.delta.DeltaErrors +import org.apache.spark.sql.delta.DeltaUDF import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.{Codec, DeltaEncoder, JsonUtils} import com.fasterxml.jackson.annotation.JsonIgnore @@ -32,7 +33,6 @@ import org.apache.spark.sql.{Column, Encoder} import org.apache.spark.sql.functions.{concat, lit, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils /** Information about a deletion vector attached to a file action. */ case class DeletionVectorDescriptor( @@ -128,6 +128,10 @@ case class DeletionVectorDescriptor( } } + /** Returns the url encoded absolute path of the deletion vector. */ + def urlEncodedPath(tablePath: Path): String = + SparkPath.fromPath(absolutePath(tablePath)).urlEncoded + /** * Produce a copy of this DV, but using an absolute path. * @@ -136,10 +140,9 @@ case class DeletionVectorDescriptor( def copyWithAbsolutePath(tableLocation: Path): DeletionVectorDescriptor = { storageType match { case UUID_DV_MARKER => - val absolutePath = this.absolutePath(tableLocation) this.copy( storageType = PATH_DV_MARKER, - pathOrInlineDv = SparkPath.fromPath(absolutePath).urlEncoded) + pathOrInlineDv = urlEncodedPath(tableLocation)) case PATH_DV_MARKER | INLINE_DV_MARKER => this.copy() } } @@ -215,6 +218,10 @@ object DeletionVectorDescriptor { final val INLINE_DV_MARKER: String = "i" final val UUID_DV_MARKER: String = "u" + private final val deletionVectorFileNameRegex = + raw"${new Path(DELETION_VECTOR_FILE_NAME_CORE).toUri}_([^.]+)\.bin".r + private final val deletionVectorFileNamePattern = deletionVectorFileNameRegex.pattern + final lazy val STRUCT_TYPE: StructType = Action.addFileSchema("deletionVector").dataType.asInstanceOf[StructType] @@ -262,6 +269,33 @@ object DeletionVectorDescriptor { sizeInBytes = data.length, cardinality = cardinality) + /** + * Returns whether the path points to a deletion vector file. + * Note, external writers are no enforced to create DV files with the same naming convertions. + * This function is intended for testing. */ + private[delta] def isDeletionVectorPath(path: Path): Boolean = + deletionVectorFileNamePattern.matcher(path.getName).matches() + + /** Only for testing. */ + private[delta] def isDeletionVectorPath(path: String): Boolean = + isDeletionVectorPath(new Path(path)) + + /** Same as above but as a column expression. Only for testing. */ + private[delta] def isDeletionVectorPath(pathCol: Column): Column = + DeltaUDF.booleanFromString(isDeletionVectorPath)(pathCol) + + /** Returns a boolean column that corresponds to whether each deletion vector is inline. */ + def isInline(dv: Column): Column = + DeltaUDF.booleanFromDeletionVectorDescriptor(_.isInline)(dv) + + /** + * Returns a column with the url encoded deletion vector paths. + * WARNING: It throws an exception if it encounters any inline DVs. The caller is responsible + * for handling these separately. + */ + def urlEncodedPath(deletionVectorCol: Column, tablePath: Path): Column = + DeltaUDF.stringFromDeletionVectorDescriptor(_.urlEncodedPath(tablePath))(deletionVectorCol) + /** * This produces the same output as [[DeletionVectorDescriptor.uniqueId]] but as a column * expression, so it can be used directly in a Spark query. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index ec779263398..7b75ac42c80 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -555,7 +555,7 @@ object DropTableFeatureUtils extends DeltaLogging { retryOnFailure: Boolean = false): Boolean = { val log = table.deltaLog val snapshot = log.update(checkIfUpdatedSinceTs = Some(snapshotRefreshStartTs)) - val emptyCommitTS = System.nanoTime() + val emptyCommitTS = table.deltaLog.clock.nanoTime() log.startTransaction(table.catalogTable, Some(snapshot)) .commit(Nil, DeltaOperations.EmptyCommit) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index cea72046b88..d136b6b4486 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -959,6 +959,10 @@ case class RemoveFile( @JsonIgnore override def getFileSize: Long = size.getOrElse(0L) + /** Only for testing. */ + @JsonIgnore + private [delta] def isDVTombstone: Boolean = DeletionVectorDescriptor.isDeletionVectorPath(new Path(path)) + } // scalastyle:on diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index bf9ad780ca9..fbefd6d2970 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.delta.actions.{AddCDCFile, AddFile, FileAction, Remo import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, DeltaFileOperations, FileNames, JsonUtils} +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, DeltaFileOperations, FileNames, JsonUtils, Utils => DeltaUtils} import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive import org.apache.spark.sql.delta.util.FileNames._ import com.fasterxml.jackson.databind.annotation.JsonDeserialize @@ -131,6 +131,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { } val relativizeIgnoreError = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR) + val dvDiscoveryDisabled = DeltaUtils.isTesting && spark.sessionState.conf.getConf( + DeltaSQLConf.FAST_DROP_FEATURE_DV_DISCOVERY_IN_VACUUM_DISABLED) val canonicalizedBasePath = SparkPath.fromPathString(basePath).urlEncoded snapshot.stateDS.mapPartitions { actions => @@ -148,7 +150,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { fa, fs, reservoirBase, - relativizeIgnoreError + relativizeIgnoreError, + dvDiscoveryDisabled ) case _ => Nil } @@ -864,7 +867,8 @@ trait VacuumCommandImpl extends DeltaCommand { action: FileAction, fs: FileSystem, basePath: Path, - relativizeIgnoreError: Boolean + relativizeIgnoreError: Boolean, + dvDiscoveryDisabled: Boolean ): Seq[String] = { val paths = getActionRelativePath(action, fs, basePath, relativizeIgnoreError) .map { @@ -873,7 +877,7 @@ trait VacuumCommandImpl extends DeltaCommand { }.getOrElse(Seq.empty) val deletionVectorPath = - getDeletionVectorRelativePathAndSize(action).map(_._1) + if (dvDiscoveryDisabled) None else getDeletionVectorRelativePathAndSize(action).map(_._1) paths ++ deletionVectorPath.toSeq } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 9849e5377e4..f684714c39f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -385,9 +385,9 @@ case class AlterTableDropFeatureDeltaCommand( // Note, for features that cannot be disabled we solely rely for correctness on // validateRemoval. val requiresHistoryValidation = removableFeature.requiresHistoryProtection - val startTimeNs = System.nanoTime() + val startTimeNs = table.deltaLog.clock.nanoTime() val preDowngradeMadeChanges = - removableFeature.preDowngradeCommand(table).removeFeatureTracesIfNeeded() + removableFeature.preDowngradeCommand(table).removeFeatureTracesIfNeeded(sparkSession) if (requiresHistoryValidation) { // Generate a checkpoint after the cleanup that is based on commits that do not use // the feature. This intends to help slow-moving tables to qualify for history truncation @@ -483,7 +483,7 @@ case class AlterTableDropFeatureDeltaCommand( val deltaLog = table.deltaLog recordDeltaOperation(deltaLog, "delta.ddl.alter.dropFeatureWithCheckpointProtection") { var startTimeNs = System.nanoTime() - removableFeature.preDowngradeCommand(table).removeFeatureTracesIfNeeded() + removableFeature.preDowngradeCommand(table).removeFeatureTracesIfNeeded(sparkSession) // Create and validate the barrier checkpoints. The checkpoint are created on top of // empty commits. However, this is not guaranteed. Other txns might interleave the empty diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index ece1f59a1d0..583157e6411 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -450,6 +450,36 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val FAST_DROP_FEATURE_DV_DISCOVERY_IN_VACUUM_DISABLED = + buildConf("tableFeatures.dev.fastDropFeature.DVDiscoveryInVacuum.disabled") + .internal() + .doc( + """Whether to allow DV discovery in Vacuum. + |This is config is only intended for testing purposes.""".stripMargin) + .booleanConf + .createWithDefault(false) + + val FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES = + buildConf("tableFeatures.dev.fastDropFeature.generateDVTombstones.enabled") + .internal() + .doc( + """Whether to generate DV tombstones when dropping deletion vectors. + |These make sure deletion vector files won't accidentally be vacuumed by clients + |that do not support DVs.""".stripMargin) + .booleanConf + .createWithDefaultFunction(() => SQLConf.get.getConf(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED)) + + val FAST_DROP_FEATURE_DV_TOMBSTONE_COUNT_THRESHOLD = + buildConf("tableFeatures.dev.fastDropFeature.dvTombstoneCountThreshold") + .doc( + """The maximum number of DV tombstones we are allowed store to memory when dropping + |deletion vectors. When the resulting number of DV tombstones is higher, we use + |a special commit for large outputs. This does not materialize results to memory + |but does not retry in case of a conflict.""".stripMargin) + .intConf + .checkValue(_ >= 0, "DVTombstoneCountThreshold must not be negative.") + .createWithDefault(10000) + val DELTA_MAX_SNAPSHOT_LINEAGE_LENGTH = buildConf("maxSnapshotLineageLength") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaFastDropFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaFastDropFeatureSuite.scala index 6c5cf56ba46..adcd6f16bd4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaFastDropFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaFastDropFeatureSuite.scala @@ -20,14 +20,22 @@ import java.io.File import java.text.SimpleDateFormat import java.util.concurrent.TimeUnit -import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN -import org.apache.spark.sql.delta.actions.Protocol +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescriptor, Protocol, RemoveFile} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.commands.AlterTableUnsetPropertiesDeltaCommand +import org.apache.spark.sql.delta.commands.DeltaReorgTableCommand +import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest -import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.DeltaFileOperations import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.analysis.ResolvedTable +import org.apache.spark.sql.functions.{col, not} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.ManualClock @@ -41,6 +49,7 @@ class DeltaFastDropFeatureSuite override def beforeAll(): Unit = { super.beforeAll() spark.conf.set(DeltaSQLConf.FAST_DROP_FEATURE_ENABLED.key, true.toString) + spark.conf.set(DeltaSQLConf.FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES.key, true.toString) enableDeletionVectors(spark, false, false, false) } @@ -91,12 +100,13 @@ class DeltaFastDropFeatureSuite protected def setModificationTimes( log: DeltaLog, + startTime: Long = System.currentTimeMillis(), startVersion: Long, endVersion: Long, daysToAdd: Int): Unit = { val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf()) for (version <- startVersion to endVersion) { - setModificationTime(log, System.currentTimeMillis(), version.toInt, daysToAdd, fs) + setModificationTime(log, startTime, version.toInt, daysToAdd, fs) } } @@ -575,4 +585,343 @@ class DeltaFastDropFeatureSuite } } } + + private def createTableWithDeletionVectors(deltaLog: DeltaLog): Unit = { + withSQLConf( + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> true.toString, + DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> true.toString) { + val dir = deltaLog.dataPath + val targetDF = spark.range(start = 0, end = 100, step = 1, numPartitions = 4) + targetDF.write.format("delta").save(dir.toString) + + val targetTable = io.delta.tables.DeltaTable.forPath(dir.toString) + + // Add some DVs. + targetTable.delete("id < 5 or id >= 95") + + // Add some more DVs for the same set of files. + targetTable.delete("id < 10 or id >= 90") + + // Assert that DVs exist. + assert(deltaLog.update().numDeletionVectorsOpt === Some(2L)) + } + } + + private def dropDeletionVectors(deltaLog: DeltaLog, truncateHistory: Boolean = false): Unit = { + sql(s"""ALTER TABLE delta.`${deltaLog.dataPath}` + |DROP FEATURE deletionVectors + |${if (truncateHistory) "TRUNCATE HISTORY" else ""} + |""".stripMargin) + + val snapshot = deltaLog.update() + val protocol = snapshot.protocol + assert(snapshot.numDeletionVectorsOpt.getOrElse(0L) === 0) + assert(snapshot.numDeletedRecordsOpt.getOrElse(0L) === 0) + assert(truncateHistory || + !protocol.readerFeatureNames.contains(DeletionVectorsTableFeature.name)) + } + + private def validateTombstones(log: DeltaLog): Unit = { + import org.apache.spark.sql.delta.implicits._ + + val snapshot = log.update() + val dvPath = DeletionVectorDescriptor.urlEncodedPath(col("deletionVector"), log.dataPath) + val isDVTombstone = DeletionVectorDescriptor.isDeletionVectorPath(col("path")) + val isInlineDeletionVector = DeletionVectorDescriptor.isInline(col("deletionVector")) + + val uniqueDvsFromParquetRemoveFiles = snapshot + .tombstones + .filter("deletionVector IS NOT NULL") + .filter(not(isInlineDeletionVector)) + .filter(not(isDVTombstone)) + .select(dvPath) + .distinct() + .as[String] + + val dvTombstones = snapshot + .tombstones + .filter(isDVTombstone) + .select("path") + .as[String] + + val dvTombstonesSet = dvTombstones.collect().toSet + + assert(dvTombstonesSet.nonEmpty) + assert(uniqueDvsFromParquetRemoveFiles.collect().toSet === dvTombstonesSet) + } + + for (withCommitLarge <- BOOLEAN_DOMAIN) + test("DV tombstones are created when dropping DVs" + + s"withCommitLarge: $withCommitLarge") { + val threshold = if (withCommitLarge) 0 else 10000 + withSQLConf( + DeltaSQLConf.FAST_DROP_FEATURE_DV_TOMBSTONE_COUNT_THRESHOLD.key -> threshold.toString) { + withTempPath { dir => + val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath) + createTableWithDeletionVectors(deltaLog) + dropDeletionVectors(deltaLog) + validateTombstones(deltaLog) + + val targetTable = io.delta.tables.DeltaTable.forPath(dir.toString) + assert(targetTable.toDF.collect().length === 80) + // DV Tombstones are recorded in the snapshot state. + assert(deltaLog.update().numOfRemoves === 8) + } + } + } + + test("DV tombstones are generated when no action is taken in pre-downgrade") { + withTempPath { dir => + val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath) + createTableWithDeletionVectors(deltaLog) + + // Remove all DV traces in advance. Table will look clean in DROP FEATURE. + val table = DeltaTableV2(spark, deltaLog.dataPath) + val properties = Seq(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key) + AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(spark) + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val catalog = spark.sessionState.catalogManager.currentCatalog.asTableCatalog + val tableId = Seq(table.name()).asIdentifier + DeltaReorgTableCommand(ResolvedTable.create(catalog, tableId, table))(Nil).run(spark) + assert(deltaLog.update().numDeletedRecordsOpt.forall(_ == 0)) + + dropDeletionVectors(deltaLog) + validateTombstones(deltaLog) + } + } + + test("We only create missing DV tombstones when dropping DVs") { + withTempPath { dir => + val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath) + createTableWithDeletionVectors(deltaLog) + dropDeletionVectors(deltaLog) + validateTombstones(deltaLog) + + // Re enable the feature and add more DVs. The delete touches a new file as well as a file + // that already contains a DV within the retention period. + sql( + s"""ALTER TABLE delta.`${dir.getAbsolutePath}` + |SET TBLPROPERTIES ( + |delta.feature.${DeletionVectorsTableFeature.name} = 'enabled', + |${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key} = 'true' + |)""".stripMargin) + + withSQLConf(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> true.toString) { + val targetTable = io.delta.tables.DeltaTable.forPath(dir.toString) + targetTable.delete("id > 20 and id <= 30") + assert(deltaLog.update().numDeletionVectorsOpt === Some(2L)) + } + + sql(s"ALTER TABLE delta.`${dir.getAbsolutePath}` DROP FEATURE deletionVectors") + + validateTombstones(deltaLog) + } + } + + test("We do not create tombstones when there are no RemoveFiles within the retention period") { + withTempPath { dir => + val clock = new ManualClock(System.currentTimeMillis()) + val deltaLog = DeltaLog.forTable(spark, new Path(dir.getAbsolutePath), clock) + createTableWithDeletionVectors(deltaLog) + + // Pretend tombstone retention period has passed (default 1 week). + val clockAdvanceMillis = DeltaLog.tombstoneRetentionMillis(deltaLog.update().metadata) + clock.advance(clockAdvanceMillis + TimeUnit.DAYS.toMillis(3)) + + dropDeletionVectors(deltaLog) + + assert(deltaLog.update().tombstones.collect().forall(_.isDVTombstone == false)) + } + } + + test("We create DV tombstones when mixing drop feature implementations") { + // When using the TRUNCATE HISTORY option we fallback to the legacy implementation. + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath) + createTableWithDeletionVectors(deltaLog) + + val e = intercept[DeltaTableFeatureException] { + dropDeletionVectors(deltaLog, truncateHistory = true) + } + checkError( + e, + "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", + parameters = Map( + "feature" -> "deletionVectors", + "logRetentionPeriodKey" -> "delta.logRetentionDuration", + "logRetentionPeriod" -> "30 days", + "truncateHistoryLogRetentionPeriod" -> "24 hours")) + + validateTombstones(deltaLog) + dropDeletionVectors(deltaLog, truncateHistory = false) + validateTombstones(deltaLog) + } + } + + test("DV tombstones are not created for inline DVs") { + withSQLConf( + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> true.toString) { + withTempPath { dir => + val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath) + val targetTable = () => io.delta.tables.DeltaTable.forPath(dir.toString) + + spark.range(start = 0, end = 100, step = 1, numPartitions = 1) + .write.format("delta").save(dir.toString) + + def removeRowsWithInlineDV(add: AddFile, markedRows: Long*): (AddFile, RemoveFile) = { + val bitmap = RoaringBitmapArray(markedRows: _*) + val serializedBitmap = bitmap.serializeAsByteArray(RoaringBitmapArrayFormat.Portable) + val cardinality = markedRows.size + val dv = DeletionVectorDescriptor.inlineInLog(serializedBitmap, cardinality) + + add.removeRows( + deletionVector = dv, + updateStats = true) + } + + // There should be a single AddFile. + val snapshot = deltaLog.update() + val addFile = snapshot.allFiles.first() + val (newAddFile, newRemoveFile) = removeRowsWithInlineDV(addFile, 3, 34, 67) + val actionsToCommit: Seq[Action] = Seq(newAddFile, newRemoveFile) + + deltaLog.startTransaction(catalogTableOpt = None, snapshotOpt = Some(snapshot)) + .commit(actionsToCommit, new DeltaOperations.TestOperation) + + // Verify the table is + assert(deltaLog.update().numDeletedRecordsOpt.exists(_ === 3)) + assert(deltaLog.update().numDeletionVectorsOpt.exists(_ === 1)) + + assert(targetTable().toDF.collect().length === 97) + + dropDeletionVectors(deltaLog) + // No DV tombstones should be have been created. + assert(deltaLog.update().tombstones.collect().forall(_.isDVTombstone == false)) + + assert(targetTable().toDF.collect().length === 97) + assert(deltaLog.update().numDeletedRecordsOpt.forall(_ === 0)) + assert(deltaLog.update().numDeletionVectorsOpt.forall(_ === 0)) + } + } + } + + for (generateDVTombstones <- BOOLEAN_DOMAIN) + test(s"Vacuum does not delete deletion vector files." + + s"generateDVTombstones: $generateDVTombstones") { + val targetDF = spark.range(start = 0, end = 100, step = 1, numPartitions = 4) + withSQLConf( + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> true.toString, + DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> true.toString, + DeltaSQLConf.FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES.key -> generateDVTombstones.toString, + // With this config we pretend the client does not support DVs. Therefore, it will not + // discover DVs from the RemoveFile actions. + DeltaSQLConf.FAST_DROP_FEATURE_DV_DISCOVERY_IN_VACUUM_DISABLED.key -> true.toString) { + withTempPath { dir => + val targetLog = DeltaLog.forTable(spark, dir.getAbsolutePath) + targetDF.write.format("delta").save(dir.toString) + val targetTable = io.delta.tables.DeltaTable.forPath(dir.toString) + + // Add some DVs. + targetTable.delete("id < 5 or id >= 95") + val versionWithDVs = targetLog.update().version + + // Unfortunately, there is no point in advancing the clock because the deletion timestamps + // in the RemoveFiles do not use the clock. Instead, we set the creation time back 10 days + // to all files created so far. These will be eligible for vacuum. + val fs = targetLog.logPath.getFileSystem(targetLog.newDeltaHadoopConf()) + val allFiles = DeltaFileOperations.localListDirs( + hadoopConf = targetLog.newDeltaHadoopConf(), + dirs = Seq(dir.getCanonicalPath), + recursive = false) + allFiles.foreach { p => + fs.setTimes(p.getHadoopPath, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(10), 0) + } + + // Add new DVs for the same set of files. + targetTable.delete("id < 10 or id >= 90") + + // Assert that DVs exist. + assert(targetLog.update().numDeletionVectorsOpt === Some(2L)) + + sql(s"ALTER TABLE delta.`${dir.getAbsolutePath}` DROP FEATURE deletionVectors") + + val snapshot = targetLog.update() + val protocol = snapshot.protocol + assert(snapshot.numDeletionVectorsOpt.getOrElse(0L) === 0) + assert(snapshot.numDeletedRecordsOpt.getOrElse(0L) === 0) + assert(!protocol.readerFeatureNames.contains(DeletionVectorsTableFeature.name)) + + targetTable.delete("id < 15 or id >= 85") + + // The DV files are outside the retention period. However, the DVs are still referenced in + // the history. Normally we should not delete any DVs. + sql(s"VACUUM '${dir.getAbsolutePath}'") + + val query = + sql(s"SELECT * FROM delta.`${dir.getAbsolutePath}` VERSION AS OF $versionWithDVs") + + if (generateDVTombstones) { + // At version 1 we only deleted 10 rows. + assert(query.collect().length === 90) + } else { + val e = intercept[SparkException] { + query.collect() + } + val msg = e.getCause.getMessage + assert(msg.contains("RowIndexFilterFileNotFoundException") || + msg.contains(".bin does not exist")) + } + } + } + } + + test("DV tombstones do not generate CDC") { + import org.apache.spark.sql.delta.commands.cdc.CDCReader + withTempPath { dir => + withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> true.toString) { + val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath) + createTableWithDeletionVectors(deltaLog) + val versionBeforeDrop = deltaLog.update().version + dropDeletionVectors(deltaLog) + + val deleteCountInDropFeature = CDCReader + .changesToBatchDF(deltaLog, versionBeforeDrop + 1, deltaLog.update().version, spark) + .filter(s"${CDCReader.CDC_TYPE_COLUMN_NAME} = '${CDCReader.CDC_TYPE_DELETE_STRING}'") + .count() + assert(deleteCountInDropFeature === 0) + } + } + } + + for (incrementalCommitEnabled <- BOOLEAN_DOMAIN) + test("Checksum computation does not take into account DV tombstones" + + s"incrementalCommitEnabled: $incrementalCommitEnabled") { + withTempPaths(2) { dirs => + var checksumWithDVTombstones: VersionChecksum = null + withSQLConf( + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString, + DeltaSQLConf.FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES.key -> true.toString) { + val deltaLog = DeltaLog.forTable(spark, dirs.head.getAbsolutePath) + createTableWithDeletionVectors(deltaLog) + dropDeletionVectors(deltaLog) + val snapshot = deltaLog.update() + checksumWithDVTombstones = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum) + } + + var checksumWithoutDVTombstones: VersionChecksum = null + withSQLConf( + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString, + DeltaSQLConf.FAST_DROP_FEATURE_GENERATE_DV_TOMBSTONES.key -> false.toString) { + val deltaLog = DeltaLog.forTable(spark, dirs.last.getAbsolutePath) + createTableWithDeletionVectors(deltaLog) + dropDeletionVectors(deltaLog) + val snapshot = deltaLog.update() + checksumWithoutDVTombstones = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum) + } + + // DV tombstones do not affect the number of files. + assert(checksumWithoutDVTombstones.numFiles === checksumWithDVTombstones.numFiles) + } + } }