Skip to content

Commit

Permalink
[Spark] Create DV tombstones when dropping Deletion Vectors #117451 (#…
Browse files Browse the repository at this point in the history
…4062)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

Drop feature with checkpoint protection allows dropped features to live
in the table history. This has consequences in the vacuum command. This
is because vacuum only reads the latest snapshot, and uses that to
decide what can be deleted or not. This could lead clients that do not
support past feature(s) to incorrectly vacuum auxiliary files, unless
the latest snapshot can prevent this from happening using only features
from the reduced protocol. The way that the table snapshot prevents
deletion of files is by referencing files that were recently removed
from the table (in the last 7 days) in RemoveFile actions or
"tombstones".

This is problematic when a newer feature extends the functionality of
these tombstones. For instance, the Deletion Vectors feature has
`RemoveFile` actions that contain a reference both to the main file as
well as the auxiliary Deletion Vector file. But after the downgrade,
clients will not see that auxiliary DV file reference anymore, and will
not preserve Deletion Vectors that were only removed from the table
within the past 7 days.

This PR resolves this issue by ensuring that the auxiliary files are
referenced by adding RemoveFile actions during the pre-downgrade process
of Deletion Vectors.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Added new tests in `DeltaFastDropFeatureSuite`.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No.
  • Loading branch information
andreaschat-db authored Jan 17, 2025
1 parent 9245817 commit 71e672d
Show file tree
Hide file tree
Showing 12 changed files with 658 additions and 64 deletions.
17 changes: 16 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ trait RecordChecksum extends DeltaLogging {
// Incrementally compute the new version checksum, if the old one is available.
val ignoreAddFilesInOperation =
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)
val ignoreRemoveFilesInOperation =
RecordChecksum.operationNamesWhereRemoveFilesIgnoredForIncrementalCrc.contains(operationName)
val persistentDVsOnTableReadable =
DeletionVectorUtils.deletionVectorsReadable(protocol, metadata)
val persistentDVsOnTableWritable =
Expand All @@ -226,6 +228,7 @@ trait RecordChecksum extends DeltaLogging {
oldSnapshot,
actions,
ignoreAddFilesInOperation,
ignoreRemoveFilesInOperation,
includeAddFilesInCrc,
persistentDVsOnTableReadable,
persistentDVsOnTableWritable
Expand All @@ -244,6 +247,8 @@ trait RecordChecksum extends DeltaLogging {
* @param actions used to incrementally compute new checksum.
* @param ignoreAddFiles for transactions whose add file actions refer to already-existing files
* e.g., [[DeltaOperations.ComputeStats]] transactions.
* @param ignoreRemoveFiles for transactions that generate RemoveFiles for auxiliary files
* e.g., [[DeltaOperations.AddDeletionVectorsTombstones]].
* @param persistentDVsOnTableReadable Indicates whether commands modifying this table are allowed
* to read deletion vectors.
* @param persistentDVsOnTableWritable Indicates whether commands modifying this table are allowed
Expand All @@ -260,6 +265,7 @@ trait RecordChecksum extends DeltaLogging {
oldSnapshot: Option[Snapshot],
actions: Seq[Action],
ignoreAddFiles: Boolean,
ignoreRemoveFiles: Boolean,
includeAllFilesInCRC: Boolean,
persistentDVsOnTableReadable: Boolean,
persistentDVsOnTableWritable: Boolean
Expand Down Expand Up @@ -319,6 +325,8 @@ trait RecordChecksum extends DeltaLogging {
numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ + dvCount)
deletedRecordCountsHistogramOpt.foreach(_.insert(dvCardinality))

case _: RemoveFile if ignoreRemoveFiles => ()

// extendedFileMetadata == true implies fields partitionValues, size, and tags are present
case r: RemoveFile if r.extendedFileMetadata == Some(true) =>
val size = r.size.get
Expand Down Expand Up @@ -597,14 +605,21 @@ trait RecordChecksum extends DeltaLogging {

object RecordChecksum {
// Operations where we should ignore AddFiles in the incremental checksum computation.
val operationNamesWhereAddFilesIgnoredForIncrementalCrc = Set(
private[delta] val operationNamesWhereAddFilesIgnoredForIncrementalCrc = Set(
// The transaction that computes stats is special -- it re-adds files that already exist, in
// order to update their min/max stats. We should not count those against the totals.
DeltaOperations.ComputeStats(Seq.empty).name,
// Backfill/Tagging re-adds existing AddFiles without changing the underlying data files.
// Incremental commits should ignore backfill commits.
DeltaOperations.RowTrackingBackfill().name
)

// Operations where we should ignore RemoveFiles in the incremental checksum computation.
private[delta] val operationNamesWhereRemoveFilesIgnoredForIncrementalCrc = Set(
// Deletion vector tombstones are only required to protect DVs from vacuum. They should be
// ignored in checksum calculation.
DeltaOperations.AddDeletionVectorsTombstones.name
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,21 @@ object DeltaOperations {

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/**
* Recorded when dropping deletion vectors. Deletion Vector tombstones directly reference
* deletion vector files within the retention period. This is to protect them from deletion
* against oblivious writers when vacuuming.
*/
object AddDeletionVectorsTombstones extends Operation("Deletion Vector Tombstones") {
override val parameters: Map[String, Any] = Map.empty

// This operation should only introduce RemoveFile actions.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true

override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false)
}

/** Recorded when columns are added. */
case class AddColumns(
colsToAdd: Seq[QualifiedColTypeWithPositionForLog]) extends Operation("ADD COLUMNS") {
Expand Down
21 changes: 21 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram

Expand Down Expand Up @@ -50,6 +51,17 @@ object DeltaUDF {
f: Array[Long] => DeletedRecordCountsHistogram): UserDefinedFunction =
createUdfFromTemplateUnsafe(deletedRecordCountsHistogramFromArrayLongTemplate, f, udf(f))

def stringFromDeletionVectorDescriptor(
f: DeletionVectorDescriptor => String): UserDefinedFunction =
createUdfFromTemplateUnsafe(stringFromDeletionVectorDescriptorTemplate, f, udf(f))

def booleanFromDeletionVectorDescriptor(
f: DeletionVectorDescriptor => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromDeletionVectorDescriptorTemplate, f, udf(f))

def booleanFromString(s: String => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromStringTemplate, s, udf(s))

def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f))

Expand All @@ -74,6 +86,15 @@ object DeltaUDF {
udf((_: Array[Long]) => DeletedRecordCountsHistogram(Array.empty))
.asInstanceOf[SparkUserDefinedFunction]

private lazy val stringFromDeletionVectorDescriptorTemplate =
udf((_: DeletionVectorDescriptor) => "").asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromDeletionVectorDescriptorTemplate =
udf((_: DeletionVectorDescriptor) => false).asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromStringTemplate =
udf((_: String) => false).asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromMapTemplate =
udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction]

Expand Down
Loading

0 comments on commit 71e672d

Please sign in to comment.