Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.gluten.utils.ConfigUtil

import org.apache.spark.memory.SparkMemoryUtil
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf}
import org.apache.spark.task.{TaskResource, TaskResources}

import org.slf4j.LoggerFactory

Expand All @@ -38,12 +37,13 @@ trait NativeMemoryManager {
def addSpiller(spiller: Spiller): Unit
def hold(): Unit
def getHandle(): Long
def release(): Unit
}

object NativeMemoryManager {
private class Impl(backendName: String, name: String)
extends NativeMemoryManager
with TaskResource {
extends NativeMemoryManager {
private val nmmName = s"[nmm-$name]"
private val LOGGER = LoggerFactory.getLogger(classOf[NativeMemoryManager])
private val spillers = Spillers.appendable()
private val mutableStats: mutable.Map[String, MemoryUsageStatsBuilder] = mutable.Map()
Expand Down Expand Up @@ -81,14 +81,14 @@ object NativeMemoryManager {
override def release(): Unit = {
if (!released.compareAndSet(false, true)) {
throw new GlutenException(
s"Memory manager instance already released: $handle, ${resourceName()}, ${priority()}")
s"Memory manager instance already released: $handle")
}

def dump(): String = {
SparkMemoryUtil.prettyPrintStats(
s"[${resourceName()}]",
nmmName,
new KnownNameAndStats() {
override def name: String = resourceName()
override def name: String = nmmName
override def stats: MemoryUsageStats = collectUsage()
})
Comment thread
zhztheplayer marked this conversation as resolved.
}
Expand All @@ -110,15 +110,9 @@ object NativeMemoryManager {
))
}
}
override def priority(): Int = {
// Memory managers should be released after all runtimes are released.
// So set the priority lower than runtime resources.
10
}
override def resourceName(): String = "nmm"
}

def apply(backendName: String, name: String): NativeMemoryManager = {
TaskResources.addAnonymousResource(new Impl(backendName, name))
new Impl(backendName, name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ object Runtime {
s"Runtime instance already released: $handle, ${resourceName()}, ${priority()}")
}
RuntimeJniWrapper.releaseRuntime(handle)

ntm.release()
nmm.release()
Comment thread
zhztheplayer marked this conversation as resolved.
}

override def priority(): Int = 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@ package org.apache.gluten.threads

import org.apache.gluten.exception.GlutenException

import org.apache.spark.task.{TaskResource, TaskResources}

import java.util.concurrent.atomic.AtomicBoolean

/**
* Scala wrapper around a native ThreadManager handle.
*
* Created once per Spark task and registered as a [[TaskResource]] so it is automatically released
* when the task completes. The ThreadManager wraps a [[NativeThreadInitializer]] that propagates
* task context to native worker threads spawned by folly executors.
* Created once per Spark task by [[org.apache.gluten.runtime.Runtime]]. The ThreadManager wraps a
* [[NativeThreadInitializer]] that propagates task context to native worker threads spawned by
* folly executors.
*/
trait NativeThreadManager {

/** @return opaque native handle passed to RuntimeJniWrapper#createRuntime. */
def getHandle(): Long

/** Release the native ThreadManager handle. Called by Runtime during task completion. */
def release(): Unit
}

object NativeThreadManager {
private class Impl(
private val backendName: String,
private val initializer: NativeThreadInitializer)
extends NativeThreadManager
with TaskResource {
extends NativeThreadManager {
private val handle = NativeThreadManagerJniWrapper.create(backendName, initializer)
private val released = new AtomicBoolean(false)

Expand All @@ -49,27 +49,22 @@ object NativeThreadManager {
override def release(): Unit = {
if (!released.compareAndSet(false, true)) {
throw new GlutenException(
s"Thread manager instance already released: $handle, ${resourceName()}, ${priority()}")
s"Thread manager instance already released: $handle")
}
NativeThreadManagerJniWrapper.release(handle)
}

// Release before MemoryManager (10) but after most other resources.
override def priority(): Int = 20

override def resourceName(): String = "ntm"
}

/**
* Create a new NativeThreadManager and register it with the current Spark task's
* [[TaskResources]] so it is automatically released when the task finishes.
* Create a new NativeThreadManager. The caller (typically Runtime) is responsible for calling
* `release()` when the manager is no longer needed.
*
* @param backendName
* the backend kind string (e.g., "velox").
* @param initializer
* callback invoked when native worker threads are created / destroyed.
*/
def apply(backendName: String, initializer: NativeThreadInitializer): NativeThreadManager = {
TaskResources.addAnonymousResource(new Impl(backendName, initializer))
new Impl(backendName, initializer)
}
}
Loading