Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
zeevm authored Jan 16, 2025
2 parents 70983b8 + b153bf3 commit cdcffce
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 47 deletions.
10 changes: 9 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,15 @@ class DeltaLog private(
* `read` and `write`.
*/
private def protocolCheck(tableProtocol: Protocol, readOrWrite: String): Unit = {
val clientSupportedProtocol = Action.supportedProtocolVersion()
val unsupportedTestFeatures =
if (spark.conf.get(DeltaSQLConf.UNSUPPORTED_TESTING_FEATURES_ENABLED)) {
TableFeature.testUnsupportedFeatures.toSeq
} else {
Seq.empty
}

val clientSupportedProtocol =
Action.supportedProtocolVersion(featuresToExclude = unsupportedTestFeatures)
// Depending on the operation, pull related protocol versions out of Protocol objects.
// `getEnabledFeatures` is a pointer to pull reader/writer features out of a Protocol.
val (clientSupportedVersions, tableRequiredVersion, getEnabledFeatures) = readOrWrite match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
}
}

case class TestUnsupportedReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand {
override def removeFeatureTracesIfNeeded(): Boolean = true
}

case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ sealed abstract class LegacyReaderWriterFeature(
with ReaderWriterFeatureType

object TableFeature {
val isTesting = DeltaUtils.isTesting

/**
* All table features recognized by this client. Update this set when you added a new Table
* Feature.
Expand Down Expand Up @@ -370,7 +372,7 @@ object TableFeature {
VariantTypeTableFeature,
CoordinatedCommitsTableFeature,
CheckpointProtectionTableFeature)
if (DeltaUtils.isTesting && testingFeaturesEnabled) {
if (isTesting && testingFeaturesEnabled) {
features ++= Set(
RedirectReaderWriterFeature,
RedirectWriterOnlyFeature,
Expand All @@ -379,6 +381,7 @@ object TableFeature {
TestWriterFeature,
TestWriterMetadataNoAutoUpdateFeature,
TestReaderWriterFeature,
TestUnsupportedReaderWriterFeature,
TestReaderWriterMetadataAutoUpdateFeature,
TestReaderWriterMetadataNoAutoUpdateFeature,
TestRemovableWriterFeature,
Expand All @@ -396,6 +399,10 @@ object TableFeature {
featureMap
}

/** Test only features that appear unsupported in order to test protocol validations. */
def testUnsupportedFeatures: Set[TableFeature] =
if (isTesting) Set(TestUnsupportedReaderWriterFeature) else Set.empty

private val allDependentFeaturesMap: Map[TableFeature, Set[TableFeature]] = {
val dependentFeatureTuples =
allSupportedFeaturesMap.values.toSeq.flatMap(f => f.requiredFeatures.map(_ -> f))
Expand Down Expand Up @@ -1106,6 +1113,19 @@ object TestRemovableWriterFeature
override def actionUsesFeature(action: Action): Boolean = false
}

/** Test feature that appears unsupported and it is used for testing protocol checks. */
object TestUnsupportedReaderWriterFeature
extends ReaderWriterFeature(name = "testUnsupportedReaderWriter")
with RemovableFeature {

override def validateRemoval(snapshot: Snapshot): Boolean = true

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TestUnsupportedReaderWriterFeaturePreDowngradeCommand(table)

override def actionUsesFeature(action: Action): Boolean = false
}

private[sql] object TestRemovableWriterFeatureWithDependency
extends WriterFeature(name = "testRemovableWriterFeatureWithDependency")
with FeatureAutomaticallyEnabledByMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,15 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val UNSUPPORTED_TESTING_FEATURES_ENABLED =
buildConf("tableFeatures.dev.unsupportedTableFeatures.enabled")
.internal()
.doc(
"""When turned on, it emulates the existence of unsupported features by the client.
|This config is only used for testing purposes.""".stripMargin)
.booleanConf
.createWithDefault(false)

val FAST_DROP_FEATURE_ENABLED =
buildConf("tableFeatures.dev.fastDropFeature.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import org.apache.spark.util.Utils
class CloneTableSQLSuite extends CloneTableSuiteBase
with DeltaColumnMappingTestUtils
{

override def beforeAll(): Unit = {
super.beforeAll()
disableDeletionVectors(spark.conf)
}

// scalastyle:off argcount
override protected def cloneTable(
source: String,
Expand Down Expand Up @@ -346,7 +352,7 @@ object CloneTableSQLTestUtils {
}

class CloneTableScalaDeletionVectorSuite
extends CloneTableSQLSuite
extends CloneTableScalaSuite
with DeltaSQLCommandTest
with DeltaExcludedTestMixin
with DeletionVectorsTestUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ trait CloneTableSuiteBase extends QueryTest
with DeltaColumnMappingTestUtils
with DeltaSQLCommandTest
with CoordinatedCommitsBaseSuite
with CoordinatedCommitsTestUtils {
with CoordinatedCommitsTestUtils
with DeletionVectorsTestUtils {

protected val TAG_HAS_SHALLOW_CLONE = new Tag("SHALLOW CLONE")
protected val TAG_MODIFY_PROTOCOL = new Tag("CHANGES PROTOCOL")
Expand Down Expand Up @@ -748,7 +749,9 @@ trait CloneTableSuiteBase extends QueryTest
}

testAllClones("CLONE with table properties to disable DV") { (source, target, isShallow) =>
withSQLConf(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true") {
withSQLConf(
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true",
DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> "true") {
spark.range(10).write.format("delta").save(source)
spark.sql(s"DELETE FROM delta.`$source` WHERE id = 1")
}
Expand Down Expand Up @@ -829,7 +832,7 @@ trait CloneTableSuiteBase extends QueryTest
val targetDeltaLog = DeltaLog.forTable(spark, target)
val targetSnapshot = targetDeltaLog.update()
assert(targetSnapshot.metadata.configuration ===
tblProperties ++ sourceSnapshot.metadata.configuration)
sourceSnapshot.metadata.configuration ++ tblProperties)
// Check that the protocol has been upgraded.
assert(StrictProtocolOrdering.fulfillsVersionRequirements(
actual = targetSnapshot.protocol,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession with De
spark.conf.set(DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS.key, merge.toString)
}

/** Disable persistent deletion vectors in new tables and all supported DML commands. */
def disableDeletionVectors(conf: RuntimeConfig): Unit = {
conf.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, false.toString)
conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, false.toString)
conf.set(DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key, false.toString)
}

def enableDeletionVectorsForAllSupportedOperations(spark: SparkSession): Unit =
enableDeletionVectors(spark, delete = true, update = true)

Expand Down
Loading

0 comments on commit cdcffce

Please sign in to comment.