Skip to content

Commit a1f3dcb

Browse files
EnricoMidongjoon-hyun
authored andcommitted
[SPARK-52509][CORE] Cleanup individual shuffles from fallback storage on RemoveShuffle event
### What changes were proposed in this pull request? Shuffle data of individual shuffles are deleted from the fallback storage during regular shuffle cleanup. ### Why are the changes needed? Currently, the shuffle data are only removed from the fallback storage on Spark context shutdown. Long running Spark jobs accumulate shuffle data, though this data is not used by Spark any more. Those shuffles should be cleaned up while Spark context is running. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests and manual test via [reproduction example](https://gist.github.com/EnricoMi/e9daa1176bce4c1211af3f3c5848112a/3140527bcbedec51ed2c571885db774c880cb941). Run the reproduction example without the ` <<< "$scala"`. In the Spark shell, execute this code: ```scala import org.apache.spark.sql.SaveMode val n = 100000000 val j = spark.sparkContext.broadcast(1000) val x = spark.range(0, n, 1, 100).select($"id".cast("int")) x.as[Int] .mapPartitions { it => if (it.hasNext && it.next < n / 100 * 80) Thread.sleep(2000); it } .groupBy($"value" % 1000).as[Int, Int] .flatMapSortedGroups($"value"){ case (m, it) => if (it.hasNext && it.next == 0) Thread.sleep(10000); it } .write.mode(SaveMode.Overwrite).csv("/tmp/spark.csv") ``` This writes some data of shuffle 0 to the fallback storage. Invoking `System.gc()` removes that shuffle directory from the fallback storage. Exiting the Spark shell removes the whole application directory. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51199 from EnricoMi/fallback-storage-cleanup. Authored-by: Enrico Minack <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b7f2ab2 commit a1f3dcb

File tree

4 files changed

+73
-19
lines changed

4 files changed

+73
-19
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,8 @@ class SparkContext(config: SparkConf) extends Logging {
641641
}
642642
_ui.foreach(_.setAppId(_applicationId))
643643
_env.blockManager.initialize(_applicationId)
644-
FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf)
644+
FallbackStorage.registerBlockManagerIfNeeded(
645+
_env.blockManager.master, _conf, _hadoopConfiguration)
645646

646647
// The metrics system for Driver need to be set spark.app.id to app ID.
647648
// So it should start after we get app ID from the task scheduler and set spark.app.id.

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -604,23 +604,27 @@ package object config {
604604
"cache block replication should be positive.")
605605
.createWithDefaultString("30s")
606606

607+
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
608+
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
609+
.doc("If true, Spark cleans up its fallback storage data once individual shuffles are " +
610+
"freed (interval configured via spark.cleaner.periodicGC.interval), and during " +
611+
"shutting down.")
612+
.version("3.2.0")
613+
.booleanConf
614+
.createWithDefault(false)
615+
607616
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =
608617
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
609618
.doc("The location for fallback storage during block manager decommissioning. " +
610619
"For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " +
611-
"The storage should be managed by TTL because Spark will not clean it up.")
620+
"The storage will not be cleaned up by Spark unless " +
621+
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP.key} is true. " +
622+
"Use an external clean up mechanism when false, for instance a TTL.")
612623
.version("3.1.0")
613624
.stringConf
614625
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
615626
.createOptional
616627

617-
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
618-
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
619-
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
620-
.version("3.2.0")
621-
.booleanConf
622-
.createWithDefault(false)
623-
624628
private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
625629
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
626630
.doc("Maximum disk space to use to store shuffle blocks before rejecting remote " +

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.network.util.JavaUtils
3636
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
3737
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
3838
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
39+
import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
3940
import org.apache.spark.util.Utils
4041

4142
/**
@@ -95,14 +96,20 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
9596
}
9697
}
9798

98-
private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
99+
private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf: Configuration)
100+
extends RpcEndpointRef(conf) {
99101
// scalastyle:off executioncontextglobal
100102
import scala.concurrent.ExecutionContext.Implicits.global
101103
// scalastyle:on executioncontextglobal
102104
override def address: RpcAddress = null
103105
override def name: String = "fallback"
104106
override def send(message: Any): Unit = {}
105107
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
108+
message match {
109+
case RemoveShuffle(shuffleId) =>
110+
FallbackStorage.cleanUp(conf, hadoopConf, Some(shuffleId))
111+
case _ => // no-op
112+
}
106113
Future{true.asInstanceOf[T]}
107114
}
108115
}
@@ -120,20 +127,25 @@ private[spark] object FallbackStorage extends Logging {
120127
}
121128

122129
/** Register the fallback block manager and its RPC endpoint. */
123-
def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: SparkConf): Unit = {
130+
def registerBlockManagerIfNeeded(
131+
master: BlockManagerMaster,
132+
conf: SparkConf,
133+
hadoopConf: Configuration): Unit = {
124134
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
125135
master.registerBlockManager(
126-
FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, new NoopRpcEndpointRef(conf))
136+
FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0,
137+
new FallbackStorageRpcEndpointRef(conf, hadoopConf))
127138
}
128139
}
129140

130-
/** Clean up the generated fallback location for this app. */
131-
def cleanUp(conf: SparkConf, hadoopConf: Configuration): Unit = {
141+
/** Clean up the generated fallback location for this app (and shuffle id if given). */
142+
def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = {
132143
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
133144
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
134145
conf.contains("spark.app.id")) {
135-
val fallbackPath =
146+
val fallbackPath = shuffleId.foldLeft(
136147
new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, conf.getAppId)
148+
) { case (path, shuffleId) => new Path(path, shuffleId.toString) }
137149
val fallbackUri = fallbackPath.toUri
138150
val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
139151
// The fallback directory for this app may not be created yet.

core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
3030

3131
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
3232
import org.apache.spark.LocalSparkContext.withSpark
33+
import org.apache.spark.deploy.SparkHadoopUtil
3334
import org.apache.spark.internal.config._
3435
import org.apache.spark.io.CompressionCodec
3536
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
@@ -67,8 +68,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
6768
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
6869
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
6970
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
71+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
72+
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
7073
val fallbackStorage = new FallbackStorage(conf)
71-
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
74+
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
7275

7376
val bm = mock(classOf[BlockManager])
7477
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
@@ -118,8 +121,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
118121
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
119122
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
120123
"file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
124+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
125+
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
121126
val fallbackStorage = new FallbackStorage(conf)
122-
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
127+
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
123128

124129
val bm = mock(classOf[BlockManager])
125130
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
@@ -153,7 +158,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
153158
assert(readResult.nioByteBuffer().array().sameElements(content))
154159
}
155160

156-
test("SPARK-34142: fallback storage API - cleanUp") {
161+
test("SPARK-34142: fallback storage API - cleanUp app") {
157162
withTempDir { dir =>
158163
Seq(true, false).foreach { cleanUp =>
159164
val appId = s"test$cleanUp"
@@ -165,8 +170,38 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
165170
val location = new File(dir, appId)
166171
assert(location.mkdir())
167172
assert(location.exists())
173+
val shuffle = new File(location, "1")
174+
assert(shuffle.mkdir())
175+
assert(shuffle.exists())
168176
FallbackStorage.cleanUp(conf, new Configuration())
169177
assert(location.exists() != cleanUp)
178+
assert(shuffle.exists() != cleanUp)
179+
}
180+
}
181+
}
182+
183+
test("SPARK-34142: fallback storage API - cleanUp shuffle") {
184+
withTempDir { dir =>
185+
Seq(true, false).foreach { cleanUp =>
186+
val appId = s"test$cleanUp"
187+
val conf = new SparkConf(false)
188+
.set("spark.app.id", appId)
189+
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, dir.getAbsolutePath + "/")
190+
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, cleanUp)
191+
192+
val location = new File(dir, appId)
193+
assert(location.mkdir())
194+
assert(location.exists())
195+
val shuffle1 = new File(location, "1")
196+
assert(shuffle1.mkdir())
197+
assert(shuffle1.exists())
198+
val shuffle2 = new File(location, "2")
199+
assert(shuffle2.mkdir())
200+
assert(shuffle2.exists())
201+
FallbackStorage.cleanUp(conf, new Configuration(), Some(1))
202+
assert(location.exists())
203+
assert(shuffle1.exists() != cleanUp)
204+
assert(shuffle2.exists())
170205
}
171206
}
172207
}
@@ -177,6 +212,8 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
177212
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
178213
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
179214
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
215+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
216+
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
180217

181218
val ids = Set((1, 1L, 1))
182219
val bm = mock(classOf[BlockManager])
@@ -202,7 +239,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
202239

203240
when(bm.getPeers(mc.any()))
204241
.thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
205-
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
242+
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
206243
when(bm.master).thenReturn(bmm)
207244
val blockTransferService = mock(classOf[BlockTransferService])
208245
when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),

0 commit comments

Comments
 (0)