Skip to content

Commit aead822

Browse files
liviazhuanishshri-db
authored andcommitted
[SPARK-54078][SS] Deflake StateStoreSuite SPARK-40492: maintenance before unload
### What changes were proposed in this pull request? Fix flakiness in this suite caused by occasional flakiness in maintenance thread and long maintenance duration. Instead test the functionality required (maintenance is called before unloading for deactivated instances) by adding a function that allows us to pause/unpause maintenance. ### Why are the changes needed? Fix test flakiness ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #52783 from liviazhu/liviazhu-db/statestore-flakiness. Authored-by: Livia Zhu <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 146c2ba commit aead822

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,6 +1343,12 @@ object StateStore extends Logging {
13431343
}
13441344
}
13451345

1346+
// Pause maintenance for testing purposes only.
1347+
@volatile private var maintenancePaused: Boolean = false
1348+
private[spark] def setMaintenancePaused(maintPaused: Boolean): Unit = {
1349+
maintenancePaused = maintPaused
1350+
}
1351+
13461352
/**
13471353
* Execute background maintenance task in all the loaded store providers if they are still
13481354
* the active instances according to the coordinator.
@@ -1352,6 +1358,10 @@ object StateStore extends Logging {
13521358
if (SparkEnv.get == null) {
13531359
throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores")
13541360
}
1361+
if (maintenancePaused) {
1362+
logDebug("Maintenance paused")
1363+
return
1364+
}
13551365

13561366
// Providers that couldn't be processed now and need to be added back to the queue
13571367
val providersToRequeue = new ArrayBuffer[(StateStoreProviderId, StateStoreProvider)]()

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
10071007
}
10081008
}
10091009

1010+
// Ensure that maintenance is called before unloading
10101011
test("SPARK-40492: maintenance before unload") {
10111012
val conf = new SparkConf()
10121013
.setMaster("local")
@@ -1016,9 +1017,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
10161017
val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0), UUID.randomUUID)
10171018
val sqlConf = getDefaultSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
10181019
SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get)
1019-
sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
1020-
// Make maintenance interval large so that maintenance is called after deactivating instances.
1021-
sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 1.minute.toMillis)
1020+
sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 10)
1021+
sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 10L)
10221022
val storeConf = StateStoreConf(sqlConf)
10231023
val hadoopConf = new Configuration()
10241024

@@ -1058,17 +1058,23 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
10581058
assert(snapshotVersions.nonEmpty, "no snapshot file found")
10591059
}
10601060
}
1061+
// Pause maintenance
1062+
StateStore.setMaintenancePaused(true)
1063+
10611064
// Generate more versions such that there is another snapshot.
10621065
generateStoreVersions()
10631066

10641067
// If driver decides to deactivate all stores related to a query run,
10651068
// then this instance should be unloaded.
10661069
coordinatorRef.deactivateInstances(storeProviderId1.queryRunId)
1070+
1071+
// Resume maintenance which should unload the deactivated store
1072+
StateStore.setMaintenancePaused(false)
10671073
eventually(timeout(timeoutDuration)) {
10681074
assert(!StateStore.isLoaded(storeProviderId1))
10691075
}
10701076

1071-
// Earliest delta file should be scheduled a cleanup during unload.
1077+
// Ensure the earliest delta file should be cleaned up during unload.
10721078
tryWithProviderResource(newStoreProvider(storeProviderId1.storeId)) { provider =>
10731079
eventually(timeout(timeoutDuration)) {
10741080
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
@@ -2412,9 +2418,11 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
24122418
isSnapshot: Boolean): Boolean = {
24132419
val method = PrivateMethod[Path](Symbol("baseDir"))
24142420
val basePath = provider invokePrivate method()
2415-
val fileName = if (isSnapshot) s"$version.snapshot" else s"$version.delta"
2416-
val filePath = new File(basePath.toString, fileName)
2417-
filePath.exists
2421+
val fileNameHDFS = if (isSnapshot) s"$version.snapshot" else s"$version.delta"
2422+
val filePathHDFS = new File(basePath.toString, fileNameHDFS)
2423+
val fileNameRocks = if (isSnapshot) s"$version.zip" else s"$version.changelog"
2424+
val filePathRocks = new File(basePath.toString, fileNameRocks)
2425+
filePathHDFS.exists || filePathRocks.exists
24182426
}
24192427

24202428
def updateVersionTo(

0 commit comments

Comments
 (0)