Skip to content

Commit 83c4c56

Browse files
committed
Extend streaming tests with coordinated commits (1/2)
1 parent a920885 commit 83c4c56

10 files changed

+161
-132
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ object CoordinatedCommitsUtils extends DeltaLogging {
165165
protocol: Protocol,
166166
failIfImplUnavailable: Boolean): Option[CommitCoordinatorClient] = {
167167
metadata.coordinatedCommitsCoordinatorName.flatMap { commitCoordinatorStr =>
168-
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature))
168+
assert(protocol.isFeatureSupported(CoordinatedCommitsTableFeature),
169+
"coordinated commits table feature is not supported")
169170
val coordinatorConf = metadata.coordinatedCommitsCoordinatorConf
170171
val coordinatorOpt = CommitCoordinatorProvider.getCommitCoordinatorClientOpt(
171172
commitCoordinatorStr, coordinatorConf, spark)

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta
1818

1919
import java.util.Date
2020

21+
import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
2122
import org.apache.spark.sql.delta.actions.Protocol
2223
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2324
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
@@ -226,9 +227,9 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil
226227
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tbl))
227228

228229
val currentTime = new Date().getTime
229-
modifyDeltaTimestamp(deltaLog, 0, currentTime - 100000)
230-
modifyDeltaTimestamp(deltaLog, 1, currentTime)
231-
modifyDeltaTimestamp(deltaLog, 2, currentTime + 100000)
230+
modifyCommitTimestamp(deltaLog, 0, currentTime - 100000)
231+
modifyCommitTimestamp(deltaLog, 1, currentTime)
232+
modifyCommitTimestamp(deltaLog, 2, currentTime + 100000)
232233

233234
val readDf = sql(s"SELECT * FROM table_changes('$tbl', 0, now())")
234235
checkCDCAnswer(

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala

+22-18
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,23 @@
1616

1717
package org.apache.spark.sql.delta
1818

19-
import java.io.File
2019
import java.sql.Timestamp
2120
import java.text.SimpleDateFormat
2221
import java.util.Date
2322

2423
import scala.language.implicitConversions
2524

25+
import org.apache.spark.sql.delta.DeltaTestUtils.modifyCommitTimestamp
2626
import org.apache.spark.sql.delta.actions.AddCDCFile
2727
import org.apache.spark.sql.delta.commands.cdc.CDCReader
2828
import org.apache.spark.sql.delta.sources.{DeltaSourceOffset, DeltaSQLConf}
2929
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
3030
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
31-
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
31+
import org.apache.spark.sql.delta.util.JsonUtils
3232
import io.delta.tables._
3333
import org.apache.hadoop.fs.Path
3434

35-
import org.apache.spark.{SparkConf, SparkThrowable}
36-
import org.apache.spark.sql.DataFrame
35+
import org.apache.spark.SparkConf
3736
import org.apache.spark.sql.functions._
3837
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryException, StreamTest, Trigger}
3938
import org.apache.spark.sql.types.StructType
@@ -48,16 +47,6 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
4847
override protected def sparkConf: SparkConf = super.sparkConf
4948
.set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")
5049

51-
/** Modify timestamp for a delta commit, used to test timestamp querying */
52-
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
53-
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
54-
file.setLastModified(time)
55-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
56-
if (crc.exists()) {
57-
crc.setLastModified(time)
58-
}
59-
}
60-
6150
/**
6251
* Create two tests for maxFilesPerTrigger and maxBytesPerTrigger
6352
*/
@@ -198,11 +187,11 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
198187
// version 0
199188
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
200189
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
201-
modifyDeltaTimestamp(deltaLog, 0, 1000)
190+
modifyCommitTimestamp(deltaLog, 0, 1000)
202191

203192
// version 1
204193
Seq(-1).toDF("id").write.mode("append").delta(inputDir.toString)
205-
modifyDeltaTimestamp(deltaLog, 1, 2000)
194+
modifyCommitTimestamp(deltaLog, 1, 2000)
206195

207196
val deltaTable = io.delta.tables.DeltaTable.forPath(inputDir.getAbsolutePath)
208197
val startTs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
@@ -231,7 +220,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
231220
// version 0
232221
Seq(1, 2, 3, 4, 5, 6).toDF("id").write.delta(inputDir.toString)
233222
val deltaLog = DeltaLog.forTable(spark, inputDir.getAbsolutePath)
234-
modifyDeltaTimestamp(deltaLog, 0, 1000)
223+
modifyCommitTimestamp(deltaLog, 0, 1000)
235224

236225
val df1 = spark.readStream
237226
.option(DeltaOptions.CDC_READ_OPTION, "true")
@@ -278,7 +267,7 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
278267
Seq(1, 2, 3).toDF("id").write.delta(inputDir.toString)
279268
val inputPath = inputDir.getAbsolutePath
280269
val deltaLog = DeltaLog.forTable(spark, inputPath)
281-
modifyDeltaTimestamp(deltaLog, 0, 1000)
270+
modifyCommitTimestamp(deltaLog, 0, 1000)
282271

283272
val deltaTable = io.delta.tables.DeltaTable.forPath(inputPath)
284273

@@ -1065,6 +1054,21 @@ class DeltaCDCStreamDeletionVectorSuite extends DeltaCDCStreamSuite
10651054
}
10661055

10671056
class DeltaCDCStreamSuite extends DeltaCDCStreamSuiteBase
1057+
class DeltaCDCStreamWithCoordinatedCommitsBatch1Suite
1058+
extends DeltaCDCStreamSuite {
1059+
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(1)
1060+
}
1061+
1062+
class DeltaCDCStreamWithCoordinatedCommitsBatch10Suite
1063+
extends DeltaCDCStreamSuite {
1064+
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
1065+
}
1066+
1067+
class DeltaCDCStreamWithCoordinatedCommitsBatch100Suite
1068+
extends DeltaCDCStreamSuite {
1069+
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
1070+
}
1071+
10681072
abstract class DeltaCDCStreamColumnMappingSuiteBase extends DeltaCDCStreamSuite
10691073
with ColumnMappingStreamingBlockedWorkflowSuiteBase with DeltaColumnMappingSelectedTestMixin {
10701074

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala

+36-63
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Date
2323
import scala.collection.JavaConverters._
2424

2525
// scalastyle:off import.ordering.noEmptyLine
26-
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
26+
import org.apache.spark.sql.delta.DeltaTestUtils.{modifyCommitTimestamp, BOOLEAN_DOMAIN}
2727
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
2828
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
2929
import org.apache.spark.sql.delta.sources.DeltaSQLConf
@@ -108,16 +108,6 @@ abstract class DeltaCDCSuiteBase
108108
schemaMode: Option[DeltaBatchCDFSchemaMode] = Some(BatchCDFSchemaLegacy),
109109
readerOptions: Map[String, String] = Map.empty): DataFrame
110110

111-
/** Modify timestamp for a delta commit, used to test timestamp querying */
112-
def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
113-
val file = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
114-
file.setLastModified(time)
115-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
116-
if (crc.exists()) {
117-
crc.setLastModified(time)
118-
}
119-
}
120-
121111
/** Create table utility method */
122112
def ctas(srcTbl: String, dstTbl: String, disableCDC: Boolean = false): Unit = {
123113
val readDf = cdcRead(new TableName(srcTbl), StartingVersion("0"), EndingVersion("1"))
@@ -252,14 +242,14 @@ abstract class DeltaCDCSuiteBase
252242

253243
// modify timestamps
254244
// version 0
255-
modifyDeltaTimestamp(deltaLog, 0, 0)
245+
modifyCommitTimestamp(deltaLog, 0, 0)
256246
val tsAfterV0 = dateFormat.format(new Date(1))
257247

258248
// version 1
259-
modifyDeltaTimestamp(deltaLog, 1, 1000)
249+
modifyCommitTimestamp(deltaLog, 1, 1000)
260250
val tsAfterV1 = dateFormat.format(new Date(1001))
261251

262-
modifyDeltaTimestamp(deltaLog, 2, 2000)
252+
modifyCommitTimestamp(deltaLog, 2, 2000)
263253

264254
val readDf = cdcRead(
265255
new TablePath(tempDir.getAbsolutePath),
@@ -278,9 +268,9 @@ abstract class DeltaCDCSuiteBase
278268
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
279269
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
280270

281-
modifyDeltaTimestamp(deltaLog, 0, 0)
282-
modifyDeltaTimestamp(deltaLog, 1, 10000)
283-
modifyDeltaTimestamp(deltaLog, 2, 20000)
271+
modifyCommitTimestamp(deltaLog, 0, 0)
272+
modifyCommitTimestamp(deltaLog, 1, 10000)
273+
modifyCommitTimestamp(deltaLog, 2, 20000)
284274

285275
val ts0 = dateFormat.format(new Date(2000))
286276
val readDf = cdcRead(
@@ -299,9 +289,9 @@ abstract class DeltaCDCSuiteBase
299289
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
300290
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
301291

302-
modifyDeltaTimestamp(deltaLog, 0, 0)
303-
modifyDeltaTimestamp(deltaLog, 1, 1000)
304-
modifyDeltaTimestamp(deltaLog, 2, 2000)
292+
modifyCommitTimestamp(deltaLog, 0, 0)
293+
modifyCommitTimestamp(deltaLog, 1, 1000)
294+
modifyCommitTimestamp(deltaLog, 2, 2000)
305295

306296
val ts0 = dateFormat.format(new Date(0))
307297
val readDf = cdcRead(
@@ -320,9 +310,9 @@ abstract class DeltaCDCSuiteBase
320310
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
321311
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
322312

323-
modifyDeltaTimestamp(deltaLog, 0, 4000)
324-
modifyDeltaTimestamp(deltaLog, 1, 8000)
325-
modifyDeltaTimestamp(deltaLog, 2, 12000)
313+
modifyCommitTimestamp(deltaLog, 0, 4000)
314+
modifyCommitTimestamp(deltaLog, 1, 8000)
315+
modifyCommitTimestamp(deltaLog, 2, 12000)
326316

327317
val ts0 = dateFormat.format(new Date(1000))
328318
val ts1 = dateFormat.format(new Date(3000))
@@ -341,9 +331,9 @@ abstract class DeltaCDCSuiteBase
341331
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
342332
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
343333

344-
modifyDeltaTimestamp(deltaLog, 0, 0)
345-
modifyDeltaTimestamp(deltaLog, 1, 4000)
346-
modifyDeltaTimestamp(deltaLog, 2, 8000)
334+
modifyCommitTimestamp(deltaLog, 0, 0)
335+
modifyCommitTimestamp(deltaLog, 1, 4000)
336+
modifyCommitTimestamp(deltaLog, 2, 8000)
347337

348338
val ts0 = dateFormat.format(new Date(1000))
349339
val ts1 = dateFormat.format(new Date(3000))
@@ -363,9 +353,9 @@ abstract class DeltaCDCSuiteBase
363353
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
364354
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
365355

366-
modifyDeltaTimestamp(deltaLog, 0, 0)
367-
modifyDeltaTimestamp(deltaLog, 1, 4000)
368-
modifyDeltaTimestamp(deltaLog, 2, 8000)
356+
modifyCommitTimestamp(deltaLog, 0, 0)
357+
modifyCommitTimestamp(deltaLog, 1, 4000)
358+
modifyCommitTimestamp(deltaLog, 2, 8000)
369359

370360
val ts0 = dateFormat.format(new Date(3000))
371361
val ts1 = dateFormat.format(new Date(5000))
@@ -385,9 +375,9 @@ abstract class DeltaCDCSuiteBase
385375
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
386376
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
387377

388-
modifyDeltaTimestamp(deltaLog, 0, 0)
389-
modifyDeltaTimestamp(deltaLog, 1, 4000)
390-
modifyDeltaTimestamp(deltaLog, 2, 8000)
378+
modifyCommitTimestamp(deltaLog, 0, 0)
379+
modifyCommitTimestamp(deltaLog, 1, 4000)
380+
modifyCommitTimestamp(deltaLog, 2, 8000)
391381

392382
val ts0 = dateFormat.format(new Date(3000))
393383
val ts1 = dateFormat.format(new Date(1000))
@@ -406,9 +396,9 @@ abstract class DeltaCDCSuiteBase
406396
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
407397
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
408398

409-
modifyDeltaTimestamp(deltaLog, 0, 0)
410-
modifyDeltaTimestamp(deltaLog, 1, 4000)
411-
modifyDeltaTimestamp(deltaLog, 2, 8000)
399+
modifyCommitTimestamp(deltaLog, 0, 0)
400+
modifyCommitTimestamp(deltaLog, 1, 4000)
401+
modifyCommitTimestamp(deltaLog, 2, 8000)
412402

413403
val ts0 = dateFormat.format(new Date(5000))
414404
val ts1 = dateFormat.format(new Date(3000))
@@ -449,7 +439,7 @@ abstract class DeltaCDCSuiteBase
449439
// Set commit time during Daylight savings time change.
450440
val restoreDate = "2022-11-06 01:42:44"
451441
val timestamp = dateFormat.parse(s"$restoreDate -0800").getTime
452-
modifyDeltaTimestamp(deltaLog, 0, timestamp)
442+
modifyCommitTimestamp(deltaLog, 0, timestamp)
453443

454444
// Verify DST is respected.
455445
val e = intercept[Exception] {
@@ -558,9 +548,9 @@ abstract class DeltaCDCSuiteBase
558548
createTblWithThreeVersions(path = Some(tempDir.getAbsolutePath))
559549
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
560550

561-
modifyDeltaTimestamp(deltaLog, 0, 0)
562-
modifyDeltaTimestamp(deltaLog, 1, 1000)
563-
modifyDeltaTimestamp(deltaLog, 2, 2000)
551+
modifyCommitTimestamp(deltaLog, 0, 0)
552+
modifyCommitTimestamp(deltaLog, 1, 1000)
553+
modifyCommitTimestamp(deltaLog, 2, 2000)
564554

565555
val ts0 = dateFormat.format(new Date(2000))
566556
val ts1 = dateFormat.format(new Date(1))
@@ -795,13 +785,13 @@ abstract class DeltaCDCSuiteBase
795785

796786
// modify timestamps
797787
// version 0
798-
modifyDeltaTimestamp(deltaLog, 0, 0)
788+
modifyCommitTimestamp(deltaLog, 0, 0)
799789

800790
// version 1
801-
modifyDeltaTimestamp(deltaLog, 1, 1000)
791+
modifyCommitTimestamp(deltaLog, 1, 1000)
802792

803793
// version 2
804-
modifyDeltaTimestamp(deltaLog, 2, 2000)
794+
modifyCommitTimestamp(deltaLog, 2, 2000)
805795
val tsStart = dateFormat.format(new Date(3000))
806796
val tsEnd = dateFormat.format(new Date(4000))
807797

@@ -825,13 +815,13 @@ abstract class DeltaCDCSuiteBase
825815

826816
// modify timestamps
827817
// version 0
828-
modifyDeltaTimestamp(deltaLog, 0, 0)
818+
modifyCommitTimestamp(deltaLog, 0, 0)
829819

830820
// version 1
831-
modifyDeltaTimestamp(deltaLog, 1, 1000)
821+
modifyCommitTimestamp(deltaLog, 1, 1000)
832822

833823
// version 2
834-
modifyDeltaTimestamp(deltaLog, 2, 2000)
824+
modifyCommitTimestamp(deltaLog, 2, 2000)
835825

836826
val tsStart = dateFormat.format(new Date(0))
837827
val tsEnd = dateFormat.format(new Date(4000))
@@ -1107,23 +1097,6 @@ class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite
11071097
}
11081098

11091099
class DeltaCDCScalaSuiteWithCoordinatedCommitsBatch10 extends DeltaCDCScalaSuite
1110-
with CoordinatedCommitsBaseSuite {
1111-
1112-
/** Modify timestamp for a delta commit, used to test timestamp querying */
1113-
override def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = {
1114-
val fileProvider = DeltaCommitFileProvider(deltaLog.snapshot)
1115-
val file = new File(fileProvider.deltaFile(version).toUri)
1116-
InCommitTimestampTestUtils.overwriteICTInDeltaFile(
1117-
deltaLog,
1118-
new Path(file.getPath),
1119-
Some(time))
1120-
file.setLastModified(time)
1121-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
1122-
if (crc.exists()) {
1123-
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(time))
1124-
crc.setLastModified(time)
1125-
}
1126-
}
1127-
1100+
with CoordinatedCommitsBaseSuite {
11281101
override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10)
11291102
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala

+2-27
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ import scala.language.implicitConversions
2929
import com.databricks.spark.util.Log4jUsageLogger
3030
import org.apache.spark.sql.delta.DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED
3131
import org.apache.spark.sql.delta.DeltaHistoryManagerSuiteShims._
32-
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
32+
import org.apache.spark.sql.delta.DeltaTestUtils.{createTestAddFile, modifyCommitTimestamp}
3333
import org.apache.spark.sql.delta.catalog.DeltaTableV2
3434
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
3535
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3636
import org.apache.spark.sql.delta.stats.StatsUtils
3737
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
3838
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
39-
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
39+
import org.apache.spark.sql.delta.util.FileNames
4040
import org.scalatest.GivenWhenThen
4141

4242
import org.apache.spark.{SparkConf, SparkException}
@@ -64,31 +64,6 @@ trait DeltaTimeTravelTests extends QueryTest
6464

6565
protected val timeFormatter = new SimpleDateFormat("yyyyMMddHHmmssSSS")
6666

67-
protected def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = {
68-
val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version)
69-
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
70-
if (isICTEnabledForNewTables) {
71-
InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, filePath, Some(ts))
72-
if (FileNames.isUnbackfilledDeltaFile(filePath)) {
73-
// Also change the ICT in the backfilled file if it exists.
74-
val backfilledFilePath = FileNames.unsafeDeltaFile(deltaLog.logPath, version)
75-
val fs = backfilledFilePath.getFileSystem(deltaLog.newDeltaHadoopConf())
76-
if (fs.exists(backfilledFilePath)) {
77-
InCommitTimestampTestUtils.overwriteICTInDeltaFile(deltaLog, backfilledFilePath, Some(ts))
78-
}
79-
}
80-
if (crc.exists()) {
81-
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(ts))
82-
}
83-
} else {
84-
val file = new File(filePath.toUri)
85-
file.setLastModified(ts)
86-
if (crc.exists()) {
87-
crc.setLastModified(ts)
88-
}
89-
}
90-
}
91-
9267
protected def versionAsOf(table: String, version: Long): String = {
9368
s"$table version as of $version"
9469
}

0 commit comments

Comments
 (0)