diff --git a/docs/extensions/engines/spark/shutdown-watchdog.md b/docs/extensions/engines/spark/shutdown-watchdog.md new file mode 100644 index 00000000000..995d8b4878c --- /dev/null +++ b/docs/extensions/engines/spark/shutdown-watchdog.md @@ -0,0 +1,89 @@ + + +# Shutdown Watchdog Plugin + +The shutdown watchdog prevents zombie Spark SQL engines by monitoring graceful shutdown. +If the driver does not finish within the configured timeout, the plugin produces a JVM thread +dump for diagnostics and forcefully terminates the process to unblock resource reclamation. + +## Build with Apache Maven + +The plugin lives in the module `extensions/spark/kyuubi-spark-shutdown-watchdog` and can be built via: + +```shell +build/mvn clean package -DskipTests \ + -pl extensions/spark/kyuubi-spark-shutdown-watchdog -am +``` + +The module still publishes artifacts with the standard Scala suffix +(`kyuubi-spark-shutdown-watchdog_2.12`, `..._2.13`, etc.) so that it aligns with +the rest of the project. Maven expands `${scala.binary.version}` automatically, +so you can run the command above without worrying about Scala version specifics. +Because the implementation is pure Java, there are no Scala runtime +dependencies—building by module path is enough. + +After the build succeeds the jar is located at: +`./extensions/spark/kyuubi-spark-shutdown-watchdog/target/kyuubi-spark-shutdown-watchdog_${scala.binary.version}-${project.version}.jar` + +## Installing + +Place the jar on the Spark driver classpath, for example by: + +- Copying it to `$SPARK_HOME/jars`, or +- Pointing Spark to it through `spark.jars` + +## Enabling the plugin + +Add the plugin class to `spark.plugins` when launching the Spark SQL engine: + +```properties +spark.plugins=org.apache.spark.kyuubi.shutdown.watchdog.SparkShutdownWatchdogPlugin +``` + +Configure the timeout directly through Spark (see also the general configuration +table in `docs/configuration/settings.md`): + +```properties +spark.kyuubi.shutdown.watchdog.timeout=60000 +``` + +Tune this value according to how long you expect the engine to take during a +normal shutdown; set `0` or a negative value to disable forced termination. In +practice pick a value greater than other engine shutdown knobs (executor drain, +engine pool shutdown, etc.) so that the watchdog only fires when everything else +has already stalled. + +## Configuration + +| Configuration Key | Default | Description | +|----------------------------------------|---------|------------------------------------------------------------------------------------------------------------| +| spark.kyuubi.shutdown.watchdog.enabled | true | Enables/disables the plugin globally. | +| spark.kyuubi.shutdown.watchdog.timeout | 0 | Maximum wait (milliseconds) for graceful shutdown before forcing termination. `0` or negative disables it. | + +## Behavior on timeout + +When the timeout elapses the plugin: + +1. Emits a detailed JVM thread dump to the Spark driver logs. +2. Terminates the driver with Spark's standard `SparkExitCode.UNCAUGHT_EXCEPTION` (the same code the + driver would use for an uncaught fatal error). If the watchdog itself ever fails it will exit + with `SparkExitCode.UNCAUGHT_EXCEPTION_TWICE`. + +This keeps the shutdown semantics consistent with the rest of Spark, making it easier for cluster +managers (YARN/Kubernetes) to treat the forced exit as a regular Spark failure. + diff --git a/extensions/spark/kyuubi-spark-shutdown-watchdog/pom.xml b/extensions/spark/kyuubi-spark-shutdown-watchdog/pom.xml new file mode 100644 index 00000000000..d696ece37bb --- /dev/null +++ b/extensions/spark/kyuubi-spark-shutdown-watchdog/pom.xml @@ -0,0 +1,58 @@ + + + + 4.0.0 + + org.apache.kyuubi + kyuubi-parent + 1.11.0-SNAPSHOT + ../../../pom.xml + + + kyuubi-spark-shutdown-watchdog_${scala.binary.version} + jar + Kyuubi Spark Shutdown Watchdog + https://kyuubi.apache.org/ + + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + test-jar + test + + + + + + + ${project.basedir}/src/test/resources + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + diff --git a/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/ShutdownWatchdog.java b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/ShutdownWatchdog.java new file mode 100644 index 00000000000..37299b49b31 --- /dev/null +++ b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/ShutdownWatchdog.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.shutdown.watchdog; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntConsumer; +import org.apache.spark.SparkConf; +import org.apache.spark.util.SparkExitCode; +import org.slf4j.Logger; + +/** + * Timer-backed watchdog that forces the Spark driver to exit if a graceful shutdown stalls. + * + *

Implementation is deliberately pure Java so the plugin has no Scala runtime dependency. + */ +public final class ShutdownWatchdog { + + // Align with Spark's standard exit codes for consistency with Spark driver failures. + static final int EXIT_CODE_FORCED_TERMINATION = SparkExitCode.UNCAUGHT_EXCEPTION(); + static final int EXIT_CODE_WATCHDOG_FAILURE = SparkExitCode.UNCAUGHT_EXCEPTION_TWICE(); + + private static final AtomicReference WATCHDOG_THREAD_REF = new AtomicReference<>(); + private static volatile IntConsumer exitFn = System::exit; + + private ShutdownWatchdog() {} + + static void setExitFn(IntConsumer fn) { + exitFn = fn != null ? fn : System::exit; + } + + static void startIfNeeded(SparkConf sparkConf, Logger logger) { + Objects.requireNonNull(sparkConf, "sparkConf"); + Objects.requireNonNull(logger, "logger"); + + if (!SparkShutdownWatchdogConf.isEnabled(sparkConf)) { + logger.info( + "Shutdown Watchdog is disabled via {}=false.", + SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_ENABLED_KEY); + return; + } + + final long timeoutMillis = SparkShutdownWatchdogConf.getTimeoutMillis(sparkConf); + if (timeoutMillis <= 0L) { + logger.info("Shutdown Watchdog is disabled because timeout <= 0."); + return; + } + + Thread existing = WATCHDOG_THREAD_REF.get(); + if (existing != null && existing.isAlive()) { + logger.warn("Shutdown Watchdog is already running, ignoring duplicate start request."); + return; + } + + final Thread watchdogThread = + new Thread(() -> runWatchdogLoop(timeoutMillis, logger), "spark-shutdown-watchdog"); + watchdogThread.setDaemon(true); + + if (!WATCHDOG_THREAD_REF.compareAndSet(existing, watchdogThread)) { + logger.warn("Shutdown Watchdog could not be started because another instance won the race."); + return; + } + + logger.info( + "Shutdown Watchdog activated. Driver will be forcefully terminated if graceful shutdown " + + "exceeds {} ms.", + timeoutMillis); + + watchdogThread.start(); + + if (logger.isDebugEnabled()) { + logger.debug("Shutdown Watchdog thread started: {}", watchdogThread.getName()); + } + } + + private static void runWatchdogLoop(long timeoutMillis, Logger logger) { + try { + if (logger.isDebugEnabled()) { + logger.debug("Shutdown Watchdog thread monitoring with timeout {} ms.", timeoutMillis); + } + + TimeUnit.MILLISECONDS.sleep(timeoutMillis); + + logger.error("EMERGENCY SHUTDOWN TRIGGERED"); + logger.error("Graceful shutdown exceeded {} ms timeout", timeoutMillis); + logger.error("Non-daemon threads are preventing JVM exit"); + logger.error("Initiating forced termination..."); + + logger.error("=== THREAD DUMP FOR DIAGNOSTIC ==="); + ThreadDumpUtils.dumpToLogger(logger); + logger.error("=== END OF THREAD DUMP ==="); + + logger.error("Forcefully terminating JVM now..."); + exitFn.accept(EXIT_CODE_FORCED_TERMINATION); + } catch (InterruptedException ie) { + if (logger.isDebugEnabled()) { + logger.debug("Shutdown Watchdog interrupted, assuming normal driver shutdown.", ie); + } else { + logger.warn("Shutdown Watchdog interrupted, assuming normal driver shutdown."); + } + Thread.currentThread().interrupt(); + } catch (Throwable t) { + logger.error( + "Shutdown Watchdog error: {}: {}", t.getClass().getSimpleName(), t.getMessage(), t); + logger.error("Proceeding with emergency termination..."); + exitFn.accept(EXIT_CODE_WATCHDOG_FAILURE); + } finally { + WATCHDOG_THREAD_REF.compareAndSet(Thread.currentThread(), null); + } + } + + /** Test-only helper to check whether the watchdog thread is currently running. */ + static boolean isRunningForTests() { + Thread t = WATCHDOG_THREAD_REF.get(); + return t != null && t.isAlive(); + } + + /** + * Test-only helper to clean up any leftover watchdog thread and restore the default exit hook. + */ + @VisibleForTesting + static void resetForTests() { + Thread thread = WATCHDOG_THREAD_REF.getAndSet(null); + if (thread != null) { + thread.interrupt(); + try { + thread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + exitFn = System::exit; + } +} diff --git a/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/SparkShutdownWatchdogConf.java b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/SparkShutdownWatchdogConf.java new file mode 100644 index 00000000000..b5e410b5512 --- /dev/null +++ b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/SparkShutdownWatchdogConf.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.shutdown.watchdog; + +import org.apache.spark.SparkConf; + +/** + * Spark configuration helpers for the shutdown watchdog plugin. + * + *

Implemented in Java to avoid pulling Scala classes into the plugin artifact. + */ +final class SparkShutdownWatchdogConf { + + static final String SHUTDOWN_WATCHDOG_ENABLED_KEY = "spark.kyuubi.shutdown.watchdog.enabled"; + static final boolean SHUTDOWN_WATCHDOG_ENABLED_DEFAULT = true; + + static final String SHUTDOWN_WATCHDOG_TIMEOUT_KEY = "spark.kyuubi.shutdown.watchdog.timeout"; + private static final String SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK = "0ms"; + + private SparkShutdownWatchdogConf() {} + + static boolean isEnabled(SparkConf conf) { + return conf.getBoolean(SHUTDOWN_WATCHDOG_ENABLED_KEY, SHUTDOWN_WATCHDOG_ENABLED_DEFAULT); + } + + static long getTimeoutMillis(SparkConf conf) { + return conf.getTimeAsMs(SHUTDOWN_WATCHDOG_TIMEOUT_KEY, SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK); + } +} diff --git a/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/SparkShutdownWatchdogPlugin.java b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/SparkShutdownWatchdogPlugin.java new file mode 100644 index 00000000000..47e0af44ee7 --- /dev/null +++ b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/SparkShutdownWatchdogPlugin.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.shutdown.watchdog; + +import java.util.Collections; +import java.util.Map; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.api.plugin.DriverPlugin; +import org.apache.spark.api.plugin.ExecutorPlugin; +import org.apache.spark.api.plugin.PluginContext; +import org.apache.spark.api.plugin.SparkPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Spark plugin entry point that wires the ShutdownWatchdog into the driver lifecycle. */ +@SuppressWarnings("unused") +public class SparkShutdownWatchdogPlugin implements SparkPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(SparkShutdownWatchdogPlugin.class); + + @Override + public DriverPlugin driverPlugin() { + return new DriverPlugin() { + private SparkConf sparkConf; + + @Override + public Map init(SparkContext sc, PluginContext pluginContext) { + this.sparkConf = sc != null ? sc.getConf() : null; + return Collections.emptyMap(); + } + + @Override + public void shutdown() { + if (sparkConf != null) { + ShutdownWatchdog.startIfNeeded(sparkConf, LOG); + sparkConf = null; + } else { + LOG.warn("Shutdown Watchdog driver plugin invoked without SparkConf."); + } + } + }; + } + + @Override + public ExecutorPlugin executorPlugin() { + return new ExecutorPlugin() { + @Override + public void init(PluginContext context, Map extraConf) { + // no-op + } + + @Override + public void shutdown() { + // no-op + } + }; + } +} diff --git a/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/ThreadDumpUtils.java b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/ThreadDumpUtils.java new file mode 100644 index 00000000000..0db86540d4d --- /dev/null +++ b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/main/java/org/apache/spark/kyuubi/shutdown/watchdog/ThreadDumpUtils.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.shutdown.watchdog; + +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.slf4j.Logger; + +/** + * Utility for generating JVM thread dumps, implemented purely in Java to avoid a Scala dependency. + */ +public final class ThreadDumpUtils { + + private static final DumpConfig DEFAULT_CONFIG = new DumpConfig(); + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.ROOT); + + private ThreadDumpUtils() {} + + /** Immutable configuration for thread dump generation with builder-style helpers. */ + public static final class DumpConfig { + + private final int stackDepth; + private final boolean showDaemonThreads; + private final boolean includeLocksInfo; + private final boolean includeSynchronizers; + private final ThreadSortBy sortThreadsBy; + + public DumpConfig() { + this(10, true, true, true, ThreadSortBy.NAME); + } + + private DumpConfig( + int stackDepth, + boolean showDaemonThreads, + boolean includeLocksInfo, + boolean includeSynchronizers, + ThreadSortBy sortThreadsBy) { + this.stackDepth = stackDepth; + this.showDaemonThreads = showDaemonThreads; + this.includeLocksInfo = includeLocksInfo; + this.includeSynchronizers = includeSynchronizers; + this.sortThreadsBy = Objects.requireNonNull(sortThreadsBy, "sortThreadsBy"); + } + + public DumpConfig withStackDepth(int newStackDepth) { + return new DumpConfig( + newStackDepth, showDaemonThreads, includeLocksInfo, includeSynchronizers, sortThreadsBy); + } + + public DumpConfig withShowDaemonThreads(boolean showDaemon) { + return new DumpConfig( + stackDepth, showDaemon, includeLocksInfo, includeSynchronizers, sortThreadsBy); + } + + public DumpConfig withIncludeLocksInfo(boolean includeLocks) { + return new DumpConfig( + stackDepth, showDaemonThreads, includeLocks, includeSynchronizers, sortThreadsBy); + } + + public DumpConfig withIncludeSynchronizers(boolean includeSync) { + return new DumpConfig( + stackDepth, showDaemonThreads, includeLocksInfo, includeSync, sortThreadsBy); + } + + public DumpConfig withSortThreadsBy(ThreadSortBy sortBy) { + return new DumpConfig( + stackDepth, showDaemonThreads, includeLocksInfo, includeSynchronizers, sortBy); + } + + public int getStackDepth() { + return stackDepth; + } + + public boolean isShowDaemonThreads() { + return showDaemonThreads; + } + + public boolean isIncludeLocksInfo() { + return includeLocksInfo; + } + + public boolean isIncludeSynchronizers() { + return includeSynchronizers; + } + + public ThreadSortBy getSortThreadsBy() { + return sortThreadsBy; + } + } + + public enum ThreadSortBy { + ID, + NAME, + STATE + } + + public static void dumpToConsole() { + dumpToConsole(DEFAULT_CONFIG); + } + + public static void dumpToConsole(DumpConfig config) { + dumpToStream(System.err, config); + } + + public static String dumpToString() { + return dumpToString(DEFAULT_CONFIG); + } + + public static String dumpToString(DumpConfig config) { + StringWriter writer = new StringWriter(8192); + PrintWriter printWriter = new PrintWriter(writer); + try { + dumpToWriter(printWriter, config); + return writer.toString(); + } finally { + printWriter.close(); + } + } + + public static void dumpToLogger(Logger logger) { + dumpToLogger(logger, DEFAULT_CONFIG); + } + + public static void dumpToLogger(Logger logger, DumpConfig config) { + Objects.requireNonNull(logger, "logger"); + try { + String dump = dumpToString(config); + logger.error("\n{}", dump); + } catch (Throwable t) { + t.printStackTrace(System.err); + dumpToConsole(config); + } + } + + private static void dumpToStream(PrintStream out, DumpConfig config) { + PrintWriter printWriter = new PrintWriter(out, true); + dumpToWriter(printWriter, config); + } + + private static void dumpToWriter(PrintWriter writer, DumpConfig config) { + DumpConfig effectiveConfig = config != null ? config : DEFAULT_CONFIG; + + try { + ExtendedThreadInfo[] allExtendedThreads = + getAllExtendedThreadInfo( + effectiveConfig.isIncludeLocksInfo(), effectiveConfig.isIncludeSynchronizers()); + + String timestamp = LocalDateTime.now().format(TIMESTAMP_FORMATTER); + + writeLine(writer, "================== Thread Dump Start =================="); + writeLine(writer, "Timestamp: " + timestamp); + writeLine(writer, "Total threads: " + allExtendedThreads.length); + writeLine(writer); + + checkDeadlocks(writer); + showThreadStatistics(allExtendedThreads, writer); + + ThreadBuckets buckets = splitDaemonThreads(allExtendedThreads); + + writeLine(writer); + writeLine(writer, "==================== Non-Daemon Threads ===================="); + writeLine(writer, "(These threads prevent JVM from exiting)"); + showThreadDetails(buckets.nonDaemonThreads, effectiveConfig, writer); + + if (effectiveConfig.isShowDaemonThreads()) { + writeLine(writer); + writeLine(writer, "====================== Daemon Threads ======================"); + showThreadDetails(buckets.daemonThreads, effectiveConfig, writer); + } + + writeLine(writer); + writeLine(writer, "======================== Summary ========================"); + showThreadSummary(allExtendedThreads, writer); + writeLine(writer, "================== Thread Dump End =================="); + } catch (Throwable t) { + writeLine(writer, "*** ERROR: Failed to generate thread dump: " + t.getMessage() + " ***"); + t.printStackTrace(writer); + performEmergencyDump(writer); + } + } + + private static void writeLine(PrintWriter writer) { + writer.println(); + } + + private static void writeLine(PrintWriter writer, String text) { + writer.println(text); + } + + private static ExtendedThreadInfo[] getAllExtendedThreadInfo( + boolean includeLocksInfo, boolean includeSynchronizers) { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + ThreadInfo[] allThreadInfos = threadBean.dumpAllThreads(includeLocksInfo, includeSynchronizers); + + Map activeThreadsMap = captureActiveThreads(); + ExtendedThreadInfo[] extendedInfos = new ExtendedThreadInfo[allThreadInfos.length]; + + for (int i = 0; i < allThreadInfos.length; i++) { + ThreadInfo threadInfo = allThreadInfos[i]; + Thread thread = activeThreadsMap.get(threadInfo.getThreadId()); + extendedInfos[i] = new ExtendedThreadInfo(threadInfo, thread); + } + return extendedInfos; + } + + private static Map captureActiveThreads() { + try { + Map stackTraces = Thread.getAllStackTraces(); + Map result = new LinkedHashMap<>(stackTraces.size()); + for (Thread thread : stackTraces.keySet()) { + result.put(thread.getId(), thread); + } + return result; + } catch (SecurityException se) { + return Collections.emptyMap(); + } + } + + private static void checkDeadlocks(PrintWriter writer) { + try { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + long[] deadlockedThreads = threadBean.findDeadlockedThreads(); + if (deadlockedThreads != null && deadlockedThreads.length > 0) { + writeLine(writer, "*** DEADLOCK DETECTED ***"); + writeLine( + writer, + "Deadlocked thread IDs: " + + Arrays.stream(deadlockedThreads) + .mapToObj(Long::toString) + .collect(Collectors.joining(", "))); + writeLine(writer); + } + } catch (Throwable t) { + writeLine(writer, "Warning: Could not check for deadlocks: " + t.getMessage()); + } + } + + private static void showThreadStatistics(ExtendedThreadInfo[] allThreads, PrintWriter writer) { + Map> byState = new EnumMap<>(Thread.State.class); + for (ExtendedThreadInfo info : allThreads) { + byState.computeIfAbsent(info.getState(), k -> new ArrayList<>()).add(info); + } + + ThreadBuckets buckets = splitDaemonThreads(allThreads); + + writeLine(writer, "Thread Statistics:"); + writeLine( + writer, + String.format(Locale.ROOT, " Non-daemon threads: %3d", buckets.nonDaemonThreads.length)); + writeLine( + writer, + String.format(Locale.ROOT, " Daemon threads: %3d", buckets.daemonThreads.length)); + writeLine(writer); + writeLine(writer, "Threads by state:"); + + byState.entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .forEach( + entry -> + writeLine( + writer, + String.format( + Locale.ROOT, " %-15s: %3d", entry.getKey(), entry.getValue().size()))); + } + + private static void showThreadDetails( + ExtendedThreadInfo[] threads, DumpConfig config, PrintWriter writer) { + if (threads.length == 0) { + writeLine(writer, " (No threads in this category)"); + return; + } + + ExtendedThreadInfo[] sorted = Arrays.copyOf(threads, threads.length); + switch (config.getSortThreadsBy()) { + case ID: + Arrays.sort(sorted, (a, b) -> Long.compare(a.getId(), b.getId())); + break; + case NAME: + Arrays.sort(sorted, (a, b) -> a.getName().compareTo(b.getName())); + break; + case STATE: + Arrays.sort( + sorted, + (a, b) -> { + int stateCompare = a.getState().compareTo(b.getState()); + return stateCompare != 0 ? stateCompare : a.getName().compareTo(b.getName()); + }); + break; + default: + break; + } + + for (ExtendedThreadInfo info : sorted) { + ThreadInfo threadInfo = info.threadInfo; + + String daemonLabel = info.isDaemon() ? "daemon" : ""; + writeLine(writer); + writeLine( + writer, + String.format( + Locale.ROOT, + "Thread: \"%s\" #%d %s", + threadInfo.getThreadName(), + threadInfo.getThreadId(), + daemonLabel)); + writeLine(writer, " State: " + threadInfo.getThreadState()); + + if (config.isIncludeLocksInfo()) { + if (threadInfo.getLockName() != null) { + writeLine(writer, " Waiting on: <" + threadInfo.getLockName() + ">"); + } + if (threadInfo.getLockOwnerName() != null) { + writeLine( + writer, + String.format( + Locale.ROOT, + " Lock owned by \"%s\" #%d", + threadInfo.getLockOwnerName(), + threadInfo.getLockOwnerId())); + } + } + + StackTraceElement[] stackTrace = threadInfo.getStackTrace(); + int actualDepth = + config.getStackDepth() <= 0 + ? stackTrace.length + : Math.min(config.getStackDepth(), stackTrace.length); + + for (int i = 0; i < actualDepth; i++) { + writeLine(writer, " at " + stackTrace[i]); + } + + if (stackTrace.length > actualDepth) { + writeLine( + writer, + String.format( + Locale.ROOT, + " ... (%d more stack frames)", + stackTrace.length - actualDepth)); + } + } + } + + private static void showThreadSummary(ExtendedThreadInfo[] allThreads, PrintWriter writer) { + writeLine(writer, "Thread ID | Type | State | Name"); + writeLine(writer, "----------|------|-----------------|---------------------------"); + + ExtendedThreadInfo[] sorted = Arrays.copyOf(allThreads, allThreads.length); + Arrays.sort(sorted, (a, b) -> Long.compare(a.getId(), b.getId())); + + for (ExtendedThreadInfo info : sorted) { + char type = info.isDaemon() ? 'D' : 'U'; + writeLine( + writer, + String.format( + Locale.ROOT, + "%8d | %4s | %-15s | %s", + info.getId(), + String.valueOf(type), + info.getState(), + info.getName())); + } + } + + private static void performEmergencyDump(PrintWriter writer) { + try { + Thread[] basicThreads = Thread.getAllStackTraces().keySet().toArray(new Thread[0]); + writeLine(writer, "*** Emergency fallback: Found " + basicThreads.length + " threads ***"); + for (Thread thread : basicThreads) { + String type = thread.isDaemon() ? "daemon" : "user"; + writeLine( + writer, + String.format( + Locale.ROOT, "Thread: %s [%s] %s", thread.getName(), thread.getState(), type)); + } + } catch (Throwable t) { + writeLine(writer, "*** Even emergency fallback failed: " + t.getMessage() + " ***"); + } + } + + private static ThreadBuckets splitDaemonThreads(ExtendedThreadInfo[] allThreads) { + List daemon = new ArrayList<>(); + List nonDaemon = new ArrayList<>(); + for (ExtendedThreadInfo info : allThreads) { + if (info.isDaemon()) { + daemon.add(info); + } else { + nonDaemon.add(info); + } + } + return new ThreadBuckets( + daemon.toArray(new ExtendedThreadInfo[0]), nonDaemon.toArray(new ExtendedThreadInfo[0])); + } + + private static final class ThreadBuckets { + final ExtendedThreadInfo[] daemonThreads; + final ExtendedThreadInfo[] nonDaemonThreads; + + ThreadBuckets(ExtendedThreadInfo[] daemonThreads, ExtendedThreadInfo[] nonDaemonThreads) { + this.daemonThreads = daemonThreads; + this.nonDaemonThreads = nonDaemonThreads; + } + } + + private static final class ExtendedThreadInfo { + private final ThreadInfo threadInfo; + private final Thread thread; + + ExtendedThreadInfo(ThreadInfo threadInfo, Thread thread) { + this.threadInfo = threadInfo; + this.thread = thread; + } + + boolean isDaemon() { + return thread != null && thread.isDaemon(); + } + + String getName() { + return threadInfo.getThreadName(); + } + + long getId() { + return threadInfo.getThreadId(); + } + + Thread.State getState() { + return threadInfo.getThreadState(); + } + } +} diff --git a/extensions/spark/kyuubi-spark-shutdown-watchdog/src/test/scala/org/apache/spark/kyuubi/shutdown/watchdog/ShutdownWatchdogSuite.scala b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/test/scala/org/apache/spark/kyuubi/shutdown/watchdog/ShutdownWatchdogSuite.scala new file mode 100644 index 00000000000..6763b93459a --- /dev/null +++ b/extensions/spark/kyuubi-spark-shutdown-watchdog/src/test/scala/org/apache/spark/kyuubi/shutdown/watchdog/ShutdownWatchdogSuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kyuubi.shutdown.watchdog + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.IntConsumer + +import org.apache.spark.SparkConf +import org.apache.spark.SparkFunSuite +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.{Seconds, Span} +import org.slf4j.helpers.NOPLogger + +class ShutdownWatchdogSuite + extends SparkFunSuite + with BeforeAndAfterEach + with Eventually { + + private val logger = NOPLogger.NOP_LOGGER + + override protected def afterEach(): Unit = { + ShutdownWatchdog.resetForTests() + super.afterEach() + } + + test("watchdog does not start when disabled") { + val conf = new SparkConf(false) + .set(SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_ENABLED_KEY, "false") + .set(SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_TIMEOUT_KEY, "1000ms") + + ShutdownWatchdog.startIfNeeded(conf, logger) + assert(!ShutdownWatchdog.isRunningForTests()) + } + + test("watchdog does not start when timeout is non-positive") { + val conf = new SparkConf(false) + .set(SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_ENABLED_KEY, "true") + .set(SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_TIMEOUT_KEY, "0ms") + + ShutdownWatchdog.startIfNeeded(conf, logger) + assert(!ShutdownWatchdog.isRunningForTests()) + } + + test("watchdog triggers emergency exit after timeout") { + val conf = new SparkConf(false) + .set(SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_ENABLED_KEY, "true") + .set(SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_TIMEOUT_KEY, "25ms") + + val exitCode = new AtomicInteger(-1) + val exitLatch = new CountDownLatch(1) + + ShutdownWatchdog.setExitFn(new IntConsumer { + override def accept(value: Int): Unit = { + exitCode.set(value) + exitLatch.countDown() + } + }) + + ShutdownWatchdog.startIfNeeded(conf, logger) + + assert(exitLatch.await(5, TimeUnit.SECONDS), "Watchdog did not trigger within timeout") + assert(exitCode.get() == ShutdownWatchdog.EXIT_CODE_FORCED_TERMINATION) + + // Ensure the watchdog thread cleaned itself up. + eventually(Timeout(Span(2, Seconds))) { + assert(!ShutdownWatchdog.isRunningForTests()) + } + } +} diff --git a/pom.xml b/pom.xml index 70bedda8a30..d20e73b4f71 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ extensions/spark/kyuubi-spark-connector-tpch extensions/spark/kyuubi-spark-lineage extensions/spark/kyuubi-spark-jvm-quake + extensions/spark/kyuubi-spark-shutdown-watchdog externals/kyuubi-chat-engine externals/kyuubi-download externals/kyuubi-flink-sql-engine