Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private[spark] abstract class MemoryManager(
onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
protected[this] val maxOffHeapMemory =
if (conf.get(MEMORY_OFFHEAP_ENABLED)) conf.get(MEMORY_OFFHEAP_SIZE) else 0
protected[this] val offHeapStorageMemory =
(maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
*/
protected def createMemoryManager(
maxOnHeapExecutionMemory: Long,
maxOffHeapExecutionMemory: Long = 0L): MemoryManager
maxOffHeapExecutionMemory: Long = 0L,
isOffHeapEnabled: Boolean = true ): MemoryManager


// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
Expand Down Expand Up @@ -318,7 +320,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
test("off-heap execution allocations cannot exceed limit") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 2L,
maxOffHeapExecutionMemory = 1000L)
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = true)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val c = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
Expand All @@ -336,10 +338,10 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
}

test("task peak execution memory usage") {
test("task peak execution memory usage when offheap memory is enabled") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
maxOffHeapExecutionMemory = 1000L)
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = true)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
Expand All @@ -353,6 +355,24 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 500L)
}

test("task peak execution memory usage when offheap memory is disabled") {
val memoryManager = createMemoryManager(
maxOnHeapExecutionMemory = 1000L,
maxOffHeapExecutionMemory = 1000L, isOffHeapEnabled = false)

val tMemManager = new TaskMemoryManager(memoryManager, 1)
val offHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.OFF_HEAP)
val onHeapConsumer = new TestMemoryConsumer(tMemManager, MemoryMode.ON_HEAP)

val result1 = tMemManager.acquireExecutionMemory(500L, offHeapConsumer)
val result2 = tMemManager.acquireExecutionMemory(400L, onHeapConsumer)
assert(result1 === 0L)
assert(result2 === 400L)
assert(tMemManager.getMemoryConsumptionForThisTask === 400L)
assert(tMemManager.getPeakOnHeapExecutionMemory === 400L)
assert(tMemManager.getPeakOffHeapExecutionMemory === 0L)
}
}

private object MemoryManagerSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes

override protected def createMemoryManager(
maxOnHeapExecutionMemory: Long,
maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
maxOffHeapExecutionMemory: Long,
isOffHeapEnabled: Boolean = false ): UnifiedMemoryManager = {
val conf = new SparkConf()
.set(MEMORY_FRACTION, 1.0)
.set(TEST_MEMORY, maxOnHeapExecutionMemory)
.set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
.set(MEMORY_STORAGE_FRACTION, storageFraction)
.set(MEMORY_OFFHEAP_ENABLED, isOffHeapEnabled)
UnifiedMemoryManager(conf, numCores = 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
protected def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER,
memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = {
memoryManager: Option[UnifiedMemoryManager] = None,
isOffHeapEnabled: Boolean = false ): BlockManager = {
conf.set(TEST_MEMORY, maxMem)
conf.set(MEMORY_OFFHEAP_SIZE, maxMem)
conf.set(MEMORY_OFFHEAP_ENABLED, isOffHeapEnabled)
val serializerManager = new SerializerManager(serializer, conf)
val transfer = new NettyBlockTransferService(
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
Expand All @@ -98,6 +100,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf.set(MEMORY_FRACTION, 1.0)
conf.set(MEMORY_STORAGE_FRACTION, 0.999)
conf.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
conf.set(MEMORY_OFFHEAP_ENABLED, false)

// to make cached peers refresh frequently
conf.set(STORAGE_CACHED_PEERS_TTL, 10)
Expand Down Expand Up @@ -403,7 +406,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite

// As many stores as the replication factor
val stores = (1 to maxReplication).map {
i => makeBlockManager(storeSize, s"store$i")
i => makeBlockManager(storeSize, s"store$i", isOffHeapEnabled = true)
}

storageLevels.foreach { storageLevel =>
Expand Down Expand Up @@ -587,6 +590,7 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB
new SparkConf(false)
.set("spark.app.id", "test")
.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(
STORAGE_REPLICATION_POLICY,
classOf[BasicBlockReplicationPolicy].getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
val bmConf = testConf.map(_.setAll(conf.getAll)).getOrElse(conf)
bmConf.set(TEST_MEMORY, maxMem)
bmConf.set(MEMORY_OFFHEAP_SIZE, maxMem)
if(maxMem > 0) {
bmConf.set(MEMORY_OFFHEAP_ENABLED, true)
}
val serializer = new KryoSerializer(bmConf)
val encryptionKey = if (bmConf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(bmConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {
.set(UI_KILL_ENABLED, killEnabled)
.set(UI_TIMELINE_ENABLED, timelineEnabled)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
.set(MEMORY_OFFHEAP_ENABLED, true)
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
Expand Down