Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51187][SQL][SS] Introduce the migration logic of config removal from SPARK-49699 #50314

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions docs/streaming/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](../sql-migration-gui
- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)
- Since Spark 4.0, new configuration `spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint` (default: `0.3`) controls the amount of additional space allowed in the checkpoint directory to store stale version files for batch deletion inside maintenance task. This is to amortize the cost of listing in cloud store. Setting this to `0` defaults to the old behavior. (See [SPARK-48931](https://issues.apache.org/jira/browse/SPARK-48931) for more details.)
- Since Spark 4.0, when relative path is used to output data in `DataStreamWriter` the resolution to absolute path is done in the Spark Driver and is not deferred to Spark Executor. This is to make Structured Streaming behavior similar to DataFrame API (`DataFrameWriter`). (See [SPARK-50854](https://issues.apache.org/jira/browse/SPARK-50854) for more details.)
- Since Spark 4.0, the deprecated config `spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan` has been removed. (See [SPARK-51187](https://issues.apache.org/jira/browse/SPARK-51187) for more details.)

## Upgrading from Structured Streaming 3.3 to 3.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,23 @@ object OffsetSeqMetadata extends Logging {
}
}
}

// SPARK-51187: The incorrect config is not added in the relevantSQLConfs, but the
// metadata in the offset log may have this if the batch ran from Spark 3.5.4.
// We need to pick the value from the metadata and set it in the new config.
// This also leads the further batches to have a correct config in the offset log.
// We leverage the fact that the name of incorrect config has the same postfix with
// the correct config, ".sql.optimizer.pruneFiltersCanPruneStreamingSubplan"

metadata.conf.keys.find { key =>
key.endsWith(".sql.optimizer.pruneFiltersCanPruneStreamingSubplan") &&
key != PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key
} match {
case Some(key) =>
val value = metadata.conf(key)
sessionConf.setConfString(PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key, value)

case _ =>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{"nextBatchWatermarkMs":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"3f409b2c-b22b-49f6-b6e4-86c2bdcddaba"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419905155,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1739419906627,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"5","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.databricks.sql.optimizer.pruneFiltersCanPruneStreamingSubplan":"false"}}
1
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,80 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}

test("SPARK-51187 validate that the incorrect config introduced in SPARK-49699 still takes " +
"effect when restarting from Spark 3.5.4") {
// Spark 3.5.4 is the only release we accidentally introduced the incorrect config.
// We just need to confirm that current Spark version will apply the fix of SPARK-49699 when
// the streaming query started from Spark 3.5.4. We should consistently apply the fix, instead
// of "on and off", because that may expose more possibility to break.

def isProblematicConfName(name: String): Boolean = {
name.endsWith(".sql.optimizer.pruneFiltersCanPruneStreamingSubplan") &&
name != SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key
}

withTempDir { dir =>
val input = getClass.getResource("/structured-streaming/checkpoint-version-3.5.4")
assert(input != null, "cannot find test resource")
val inputDir = new File(input.toURI)

// Copy test files to tempDir so that we won't modify the original data.
FileUtils.copyDirectory(inputDir, dir)

// Below is the code we extract checkpoint from Spark 3.5.4. We need to make sure the offset
// advancement continues from the last run.
val inputData = MemoryStream[Int]
val df = inputData.toDF()

inputData.addData(1, 2, 3, 4)
inputData.addData(5, 6, 7, 8)

testStream(df)(
StartStream(checkpointLocation = dir.getCanonicalPath),
AddData(inputData, 9, 10, 11, 12),
ProcessAllAvailable(),
AssertOnQuery { q =>
val confValue = q.lastExecution.sparkSession.conf.get(
SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN)
assert(confValue === false,
"The value for the incorrect config in offset metadata should be respected as the " +
"value of the fixed config")

val offsetLog = new OffsetSeqLog(spark, new File(dir, "offsets").getCanonicalPath)
def checkConfigFromMetadata(batchId: Long, expectCorrectConfig: Boolean): Unit = {
val offsetLogForBatch = offsetLog.get(batchId).get
val confInMetadata = offsetLogForBatch.metadata.get.conf
if (expectCorrectConfig) {
assert(confInMetadata.get(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key) ===
Some("false"),
"The new offset log should have the fixed config instead of the incorrect one."
)
assert(!confInMetadata.keys.exists(isProblematicConfName),
"The new offset log should not have the incorrect config.")
} else {
assert(
confInMetadata.find { case (key, _) =>
isProblematicConfName(key)
}.get._2 === "false",
"The offset log in test resource should have the incorrect config to test properly."
)
assert(
!confInMetadata.contains(SQLConf.PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key),
"The offset log in test resource should not have the fixed config."
)
}
}

assert(offsetLog.getLatestBatchId() === Some(2))
checkConfigFromMetadata(0, expectCorrectConfig = false)
checkConfigFromMetadata(1, expectCorrectConfig = false)
checkConfigFromMetadata(2, expectCorrectConfig = true)
true
}
)
}
}

private def checkAppendOutputModeException(df: DataFrame): Unit = {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down