diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala index 159e1bba5ec..3308f357a7c 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala @@ -25,7 +25,6 @@ import org.apache.gluten.utils.ConfigUtil import org.apache.spark.memory.SparkMemoryUtil import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf} -import org.apache.spark.task.{TaskResource, TaskResources} import org.slf4j.LoggerFactory @@ -38,12 +37,13 @@ trait NativeMemoryManager { def addSpiller(spiller: Spiller): Unit def hold(): Unit def getHandle(): Long + def release(): Unit } object NativeMemoryManager { private class Impl(backendName: String, name: String) - extends NativeMemoryManager - with TaskResource { + extends NativeMemoryManager { + private val nmmName = s"[nmm-$name]" private val LOGGER = LoggerFactory.getLogger(classOf[NativeMemoryManager]) private val spillers = Spillers.appendable() private val mutableStats: mutable.Map[String, MemoryUsageStatsBuilder] = mutable.Map() @@ -81,14 +81,14 @@ object NativeMemoryManager { override def release(): Unit = { if (!released.compareAndSet(false, true)) { throw new GlutenException( - s"Memory manager instance already released: $handle, ${resourceName()}, ${priority()}") + s"Memory manager instance already released: $handle") } def dump(): String = { SparkMemoryUtil.prettyPrintStats( - s"[${resourceName()}]", + nmmName, new KnownNameAndStats() { - override def name: String = resourceName() + override def name: String = nmmName override def stats: MemoryUsageStats = collectUsage() }) } @@ -110,15 +110,9 @@ object NativeMemoryManager { )) } } - override def priority(): Int = { - // Memory managers should be released after all runtimes are released. - // So set the priority lower than runtime resources. - 10 - } - override def resourceName(): String = "nmm" } def apply(backendName: String, name: String): NativeMemoryManager = { - TaskResources.addAnonymousResource(new Impl(backendName, name)) + new Impl(backendName, name) } } diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala index 2e3c4681400..885dd831e84 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala @@ -81,7 +81,8 @@ object Runtime { s"Runtime instance already released: $handle, ${resourceName()}, ${priority()}") } RuntimeJniWrapper.releaseRuntime(handle) - + ntm.release() + nmm.release() } override def priority(): Int = 30 diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala index 8abe6d9c9bb..ce0507b69a5 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/threads/NativeThreadManager.scala @@ -18,29 +18,29 @@ package org.apache.gluten.threads import org.apache.gluten.exception.GlutenException -import org.apache.spark.task.{TaskResource, TaskResources} - import java.util.concurrent.atomic.AtomicBoolean /** * Scala wrapper around a native ThreadManager handle. * - * Created once per Spark task and registered as a [[TaskResource]] so it is automatically released - * when the task completes. The ThreadManager wraps a [[NativeThreadInitializer]] that propagates - * task context to native worker threads spawned by folly executors. + * Created once per Spark task by [[org.apache.gluten.runtime.Runtime]]. The ThreadManager wraps a + * [[NativeThreadInitializer]] that propagates task context to native worker threads spawned by + * folly executors. */ trait NativeThreadManager { /** @return opaque native handle passed to RuntimeJniWrapper#createRuntime. */ def getHandle(): Long + + /** Release the native ThreadManager handle. Called by Runtime during task completion. */ + def release(): Unit } object NativeThreadManager { private class Impl( private val backendName: String, private val initializer: NativeThreadInitializer) - extends NativeThreadManager - with TaskResource { + extends NativeThreadManager { private val handle = NativeThreadManagerJniWrapper.create(backendName, initializer) private val released = new AtomicBoolean(false) @@ -49,20 +49,15 @@ object NativeThreadManager { override def release(): Unit = { if (!released.compareAndSet(false, true)) { throw new GlutenException( - s"Thread manager instance already released: $handle, ${resourceName()}, ${priority()}") + s"Thread manager instance already released: $handle") } NativeThreadManagerJniWrapper.release(handle) } - - // Release before MemoryManager (10) but after most other resources. - override def priority(): Int = 20 - - override def resourceName(): String = "ntm" } /** - * Create a new NativeThreadManager and register it with the current Spark task's - * [[TaskResources]] so it is automatically released when the task finishes. + * Create a new NativeThreadManager. The caller (typically Runtime) is responsible for calling + * `release()` when the manager is no longer needed. * * @param backendName * the backend kind string (e.g., "velox"). @@ -70,6 +65,6 @@ object NativeThreadManager { * callback invoked when native worker threads are created / destroyed. */ def apply(backendName: String, initializer: NativeThreadInitializer): NativeThreadManager = { - TaskResources.addAnonymousResource(new Impl(backendName, initializer)) + new Impl(backendName, initializer) } }