@@ -32,16 +32,15 @@ import org.apache.spark.util.Utils
3232/**
3333 * An abstract memory manager that enforces how memory is shared between execution and storage.
3434 *
35- * In this context, execution memory refers to that used for computation in shuffles, joins, sorts
36- * and aggregations, while storage memory refers to that used for caching and propagating internal
37- * data across the cluster. There exists one MemoryManager per JVM.
35+ * In this context, execution memory refers to that used for computation in shuffles, joins,
36+ * sorts and aggregations, while storage memory refers to that used for caching and propagating
37+ * internal data across the cluster. There exists one MemoryManager per JVM.
3838 */
3939private [spark] abstract class MemoryManager (
4040 conf : SparkConf ,
4141 numCores : Int ,
4242 onHeapStorageMemory : Long ,
43- onHeapExecutionMemory : Long )
44- extends Logging {
43+ onHeapExecutionMemory : Long ) extends Logging {
4544
4645 require(onHeapExecutionMemory > 0 , " onHeapExecutionMemory must be > 0" )
4746
@@ -59,8 +58,7 @@ private[spark] abstract class MemoryManager(
5958 onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
6059 onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
6160
62- protected [this ] val maxOffHeapMemory =
63- if (conf.get(MEMORY_OFFHEAP_ENABLED )) conf.get(MEMORY_OFFHEAP_SIZE ) else 0
61+ protected [this ] val maxOffHeapMemory = if (conf.get(MEMORY_OFFHEAP_ENABLED )) conf.get(MEMORY_OFFHEAP_SIZE ) else 0
6462 protected [this ] val offHeapStorageMemory =
6563 (maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION )).toLong
6664
@@ -69,8 +67,8 @@ private[spark] abstract class MemoryManager(
6967
7068 /**
7169 * Total available on heap memory for storage, in bytes. This amount can vary over time,
72- * depending on the MemoryManager implementation. In this model, this is equivalent to the
73- * amount of memory not occupied by execution.
70+ * depending on the MemoryManager implementation.
71+ * In this model, this is equivalent to the amount of memory not occupied by execution.
7472 */
7573 def maxOnHeapStorageMemory : Long
7674
@@ -81,8 +79,8 @@ private[spark] abstract class MemoryManager(
8179 def maxOffHeapStorageMemory : Long
8280
8381 /**
84- * Set the [[MemoryStore ]] used by this manager to evict cached blocks. This must be set after
85- * construction due to initialization ordering constraints.
82+ * Set the [[MemoryStore ]] used by this manager to evict cached blocks.
83+ * This must be set after construction due to initialization ordering constraints.
8684 */
8785 final def setMemoryStore (store : MemoryStore ): Unit = synchronized {
8886 onHeapStorageMemoryPool.setMemoryStore(store)
@@ -92,20 +90,18 @@ private[spark] abstract class MemoryManager(
9290 /**
9391 * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
9492 *
95- * @return
96- * whether all N bytes were successfully granted.
93+ * @return whether all N bytes were successfully granted.
9794 */
9895 def acquireStorageMemory (blockId : BlockId , numBytes : Long , memoryMode : MemoryMode ): Boolean
9996
10097 /**
10198 * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
10299 *
103100 * This extra method allows subclasses to differentiate behavior between acquiring storage
104- * memory and acquiring unroll memory. For instance, the memory management model in Spark 1.5
105- * and before places a limit on the amount of space that can be freed from unrolling.
101+ * memory and acquiring unroll memory. For instance, the memory management model in Spark
102+ * 1.5 and before places a limit on the amount of space that can be freed from unrolling.
106103 *
107- * @return
108- * whether all N bytes were successfully granted.
104+ * @return whether all N bytes were successfully granted.
109105 */
110106 def acquireUnrollMemory (blockId : BlockId , numBytes : Long , memoryMode : MemoryMode ): Boolean
111107
@@ -118,30 +114,30 @@ private[spark] abstract class MemoryManager(
118114 * active tasks) before it is forced to spill. This can happen if the number of tasks increase
119115 * but an older task had a lot of memory already.
120116 */
121- private [memory] def acquireExecutionMemory (
117+ private [memory]
118+ def acquireExecutionMemory (
122119 numBytes : Long ,
123120 taskAttemptId : Long ,
124121 memoryMode : MemoryMode ): Long
125122
126123 /**
127124 * Release numBytes of execution memory belonging to the given task.
128125 */
129- private [memory] def releaseExecutionMemory (
126+ private [memory]
127+ def releaseExecutionMemory (
130128 numBytes : Long ,
131129 taskAttemptId : Long ,
132130 memoryMode : MemoryMode ): Unit = synchronized {
133131 memoryMode match {
134132 case MemoryMode .ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
135- case MemoryMode .OFF_HEAP =>
136- offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
133+ case MemoryMode .OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
137134 }
138135 }
139136
140137 /**
141138 * Release all memory for the given task and mark it as inactive (e.g. when a task ends).
142139 *
143- * @return
144- * the number of bytes freed.
140+ * @return the number of bytes freed.
145141 */
146142 private [memory] def releaseAllExecutionMemoryForTask (taskAttemptId : Long ): Long = synchronized {
147143 onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
@@ -188,28 +184,28 @@ private[spark] abstract class MemoryManager(
188184 }
189185
190186 /**
191- * On heap execution memory currently in use, in bytes.
187+ * On heap execution memory currently in use, in bytes.
192188 */
193189 final def onHeapExecutionMemoryUsed : Long = synchronized {
194190 onHeapExecutionMemoryPool.memoryUsed
195191 }
196192
197193 /**
198- * Off heap execution memory currently in use, in bytes.
194+ * Off heap execution memory currently in use, in bytes.
199195 */
200196 final def offHeapExecutionMemoryUsed : Long = synchronized {
201197 offHeapExecutionMemoryPool.memoryUsed
202198 }
203199
204200 /**
205- * On heap storage memory currently in use, in bytes.
201+ * On heap storage memory currently in use, in bytes.
206202 */
207203 final def onHeapStorageMemoryUsed : Long = synchronized {
208204 onHeapStorageMemoryPool.memoryUsed
209205 }
210206
211207 /**
212- * Off heap storage memory currently in use, in bytes.
208+ * Off heap storage memory currently in use, in bytes.
213209 */
214210 final def offHeapStorageMemoryUsed : Long = synchronized {
215211 offHeapStorageMemoryPool.memoryUsed
@@ -231,11 +227,9 @@ private[spark] abstract class MemoryManager(
231227 */
232228 final val tungstenMemoryMode : MemoryMode = {
233229 if (conf.get(MEMORY_OFFHEAP_ENABLED )) {
234- require(
235- conf.get(MEMORY_OFFHEAP_SIZE ) > 0 ,
230+ require(conf.get(MEMORY_OFFHEAP_SIZE ) > 0 ,
236231 " spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true" )
237- require(
238- Platform .unaligned(),
232+ require(Platform .unaligned(),
239233 " No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false." )
240234 MemoryMode .OFF_HEAP
241235 } else {
@@ -246,17 +240,17 @@ private[spark] abstract class MemoryManager(
246240 /**
247241 * The default page size, in bytes.
248242 *
249- * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value by
250- * looking at the number of cores available to the process, and the total amount of memory, and
251- * then divide it by a factor of safety.
243+ * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
244+ * by looking at the number of cores available to the process, and the total amount of memory,
245+ * and then divide it by a factor of safety.
252246 *
253247 * SPARK-37593 If we are using G1GC, ZGC or ShenandoahGC, it's better to take the
254- * LONG_ARRAY_OFFSET into consideration so that the requested memory size is power of 2 and can
255- * be divided by heap region size to reduce memory waste.
248+ * LONG_ARRAY_OFFSET into consideration so that the requested memory size is power of 2
249+ * and can be divided by heap region size to reduce memory waste.
256250 */
257251 private lazy val defaultPageSizeBytes = {
258- val minPageSize = 1L * 1024 * 1024 // 1MB
259- val maxPageSize = 64L * minPageSize // 64MB
252+ val minPageSize = 1L * 1024 * 1024 // 1MB
253+ val maxPageSize = 64L * minPageSize // 64MB
260254 val cores = if (numCores > 0 ) numCores else Runtime .getRuntime.availableProcessors()
261255 // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
262256 val safetyFactor = 16
@@ -267,7 +261,7 @@ private[spark] abstract class MemoryManager(
267261 val size = ByteArrayMethods .nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
268262 val chosenPageSize = math.min(maxPageSize, math.max(minPageSize, size))
269263 if ((Utils .isG1GC || Utils .isZGC || Utils .isShenandoahGC) &&
270- tungstenMemoryMode == MemoryMode .ON_HEAP ) {
264+ tungstenMemoryMode == MemoryMode .ON_HEAP ) {
271265 chosenPageSize - Platform .LONG_ARRAY_OFFSET
272266 } else {
273267 chosenPageSize
0 commit comments