Skip to content

Commit 0a79a8d

Browse files
committed
UTs
1 parent a2f4e27 commit 0a79a8d

File tree

3 files changed

+27
-5
lines changed

3 files changed

+27
-5
lines changed

core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
143143
*/
144144
protected def createMemoryManager(
145145
maxOnHeapExecutionMemory: Long,
146-
maxOffHeapExecutionMemory: Long = 0L): MemoryManager
146+
maxOffHeapExecutionMemory: Long = 0L,
147+
isOffHeapEnabled: Boolean = true ): MemoryManager
148+
147149

148150
// -- Tests of sharing of execution memory between tasks ----------------------------------------
149151
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
@@ -336,10 +338,10 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
336338
assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
337339
}
338340

339-
test("task peak execution memory usage") {
341+
test("task peak execution memory usage when offheap memory is enabled") {
340342
val memoryManager = createMemoryManager(
341343
maxOnHeapExecutionMemory = 1000L,
342-
maxOffHeapExecutionMemory = 1000L)
344+
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = true)
343345

344346
val tMemManager = new TaskMemoryManager(memoryManager, 1)
345347
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
@@ -353,6 +355,24 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
353355
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
354356
assert(tMemManager.getPeakOffHeapExecutionMemory === 500L)
355357
}
358+
359+
test("task peak execution memory usage when offheap memory is disabled") {
360+
val memoryManager = createMemoryManager(
361+
maxOnHeapExecutionMemory = 1000L,
362+
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = false)
363+
364+
val tMemManager = new TaskMemoryManager(memoryManager, 1)
365+
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
366+
val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP)
367+
368+
val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer)
369+
val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer)
370+
assert(result1 === 0L)
371+
assert(result2 === 400L)
372+
assert(tMemManager.getMemoryConsumptionForThisTask === 400L)
373+
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
374+
assert(tMemManager.getPeakOffHeapExecutionMemory === 0L)
375+
}
356376
}
357377

358378
private object MemoryManagerSuite {

core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
4242

4343
override protected def createMemoryManager(
4444
maxOnHeapExecutionMemory: Long,
45-
maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
45+
maxOffHeapExecutionMemory: Long,
46+
isOffHeapEnabled: Boolean = false ): UnifiedMemoryManager = {
4647
val conf = new SparkConf()
4748
.set(MEMORY_FRACTION, 1.0)
4849
.set(TEST_MEMORY, maxOnHeapExecutionMemory)
4950
.set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
5051
.set(MEMORY_STORAGE_FRACTION, storageFraction)
51-
.set(MEMORY_OFFHEAP_ENABLED, true)
52+
.set(MEMORY_OFFHEAP_ENABLED, isOffHeapEnabled)
5253
UnifiedMemoryManager(conf, numCores = 1)
5354
}
5455

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB
587587
new SparkConf(false)
588588
.set("spark.app.id", "test")
589589
.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
590+
.set(MEMORY_OFFHEAP_ENABLED, true)
590591
.set(
591592
STORAGE_REPLICATION_POLICY,
592593
classOf[BasicBlockReplicationPolicy].getName)

0 commit comments

Comments
 (0)