Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions docs/extensions/engines/spark/shutdown-watchdog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Shutdown Watchdog Plugin

The shutdown watchdog prevents zombie Spark SQL engines by monitoring graceful shutdown.
If the driver does not finish within the configured timeout, the plugin produces a JVM thread
dump for diagnostics and forcefully terminates the process to unblock resource reclamation.

## Build with Apache Maven

The plugin lives in the module `extensions/spark/kyuubi-spark-shutdown-watchdog` and can be built via:

```shell
build/mvn clean package -DskipTests \
-pl extensions/spark/kyuubi-spark-shutdown-watchdog -am
```

The module still publishes artifacts with the standard Scala suffix
(`kyuubi-spark-shutdown-watchdog_2.12`, `..._2.13`, etc.) so that it aligns with
the rest of the project. Maven expands `${scala.binary.version}` automatically,
so you can run the command above without worrying about Scala version specifics.
Because the implementation is pure Java, there are no Scala runtime
dependencies—building by module path is enough.

After the build succeeds the jar is located at:
`./extensions/spark/kyuubi-spark-shutdown-watchdog/target/kyuubi-spark-shutdown-watchdog_${scala.binary.version}-${project.version}.jar`

## Installing

Place the jar on the Spark driver classpath, for example by:

- Copying it to `$SPARK_HOME/jars`, or
- Pointing Spark to it through `spark.jars`

## Enabling the plugin

Add the plugin class to `spark.plugins` when launching the Spark SQL engine:

```properties
spark.plugins=org.apache.spark.kyuubi.shutdown.watchdog.SparkShutdownWatchdogPlugin
```

Configure the timeout directly through Spark (see also the general configuration
table in `docs/configuration/settings.md`):

```properties
spark.kyuubi.shutdown.watchdog.timeout=60000
```

Tune this value according to how long you expect the engine to take during a
normal shutdown; set `0` or a negative value to disable forced termination. In
practice pick a value greater than other engine shutdown knobs (executor drain,
engine pool shutdown, etc.) so that the watchdog only fires when everything else
has already stalled.

## Configuration

| Configuration Key | Default | Description |
|----------------------------------------|---------|------------------------------------------------------------------------------------------------------------|
| spark.kyuubi.shutdown.watchdog.enabled | true | Enables/disables the plugin globally. |
| spark.kyuubi.shutdown.watchdog.timeout | 0 | Maximum wait (milliseconds) for graceful shutdown before forcing termination. `0` or negative disables it. |

## Behavior on timeout

When the timeout elapses the plugin:

1. Emits a detailed JVM thread dump to the Spark driver logs.
2. Terminates the driver with Spark's standard `SparkExitCode.UNCAUGHT_EXCEPTION` (the same code the
driver would use for an uncaught fatal error). If the watchdog itself ever fails it will exit
with `SparkExitCode.UNCAUGHT_EXCEPTION_TWICE`.

This keeps the shutdown semantics consistent with the rest of Spark, making it easier for cluster
managers (YARN/Kubernetes) to treat the forced exit as a regular Spark failure.

58 changes: 58 additions & 0 deletions extensions/spark/kyuubi-spark-shutdown-watchdog/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-parent</artifactId>
<version>1.11.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>kyuubi-spark-shutdown-watchdog_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Spark Shutdown Watchdog</name>
<url>https://kyuubi.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<testResources>
<testResource>
<directory>${project.basedir}/src/test/resources</directory>
</testResource>
</testResources>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kyuubi.shutdown.watchdog;

import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntConsumer;
import org.apache.spark.SparkConf;
import org.apache.spark.util.SparkExitCode;
import org.slf4j.Logger;

/**
* Timer-backed watchdog that forces the Spark driver to exit if a graceful shutdown stalls.
*
* <p>Implementation is deliberately pure Java so the plugin has no Scala runtime dependency.
*/
public final class ShutdownWatchdog {

// Align with Spark's standard exit codes for consistency with Spark driver failures.
static final int EXIT_CODE_FORCED_TERMINATION = SparkExitCode.UNCAUGHT_EXCEPTION();
static final int EXIT_CODE_WATCHDOG_FAILURE = SparkExitCode.UNCAUGHT_EXCEPTION_TWICE();

private static final AtomicReference<Thread> WATCHDOG_THREAD_REF = new AtomicReference<>();
private static volatile IntConsumer exitFn = System::exit;

private ShutdownWatchdog() {}

static void setExitFn(IntConsumer fn) {
exitFn = fn != null ? fn : System::exit;
}

static void startIfNeeded(SparkConf sparkConf, Logger logger) {
Objects.requireNonNull(sparkConf, "sparkConf");
Objects.requireNonNull(logger, "logger");

if (!SparkShutdownWatchdogConf.isEnabled(sparkConf)) {
logger.info(
"Shutdown Watchdog is disabled via {}=false.",
SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_ENABLED_KEY);
return;
}

final long timeoutMillis = SparkShutdownWatchdogConf.getTimeoutMillis(sparkConf);
if (timeoutMillis <= 0L) {
logger.info("Shutdown Watchdog is disabled because timeout <= 0.");
return;
}

Thread existing = WATCHDOG_THREAD_REF.get();
if (existing != null && existing.isAlive()) {
logger.warn("Shutdown Watchdog is already running, ignoring duplicate start request.");
return;
}

final Thread watchdogThread =
new Thread(() -> runWatchdogLoop(timeoutMillis, logger), "spark-shutdown-watchdog");
watchdogThread.setDaemon(true);

if (!WATCHDOG_THREAD_REF.compareAndSet(existing, watchdogThread)) {
logger.warn("Shutdown Watchdog could not be started because another instance won the race.");
return;
}

logger.info(
"Shutdown Watchdog activated. Driver will be forcefully terminated if graceful shutdown "
+ "exceeds {} ms.",
timeoutMillis);

watchdogThread.start();

if (logger.isDebugEnabled()) {
logger.debug("Shutdown Watchdog thread started: {}", watchdogThread.getName());
}
}

private static void runWatchdogLoop(long timeoutMillis, Logger logger) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Shutdown Watchdog thread monitoring with timeout {} ms.", timeoutMillis);
}

TimeUnit.MILLISECONDS.sleep(timeoutMillis);

logger.error("EMERGENCY SHUTDOWN TRIGGERED");
logger.error("Graceful shutdown exceeded {} ms timeout", timeoutMillis);
logger.error("Non-daemon threads are preventing JVM exit");
logger.error("Initiating forced termination...");

logger.error("=== THREAD DUMP FOR DIAGNOSTIC ===");
ThreadDumpUtils.dumpToLogger(logger);
logger.error("=== END OF THREAD DUMP ===");

logger.error("Forcefully terminating JVM now...");
exitFn.accept(EXIT_CODE_FORCED_TERMINATION);
} catch (InterruptedException ie) {
if (logger.isDebugEnabled()) {
logger.debug("Shutdown Watchdog interrupted, assuming normal driver shutdown.", ie);
} else {
logger.warn("Shutdown Watchdog interrupted, assuming normal driver shutdown.");
}
Thread.currentThread().interrupt();
} catch (Throwable t) {
logger.error(
"Shutdown Watchdog error: {}: {}", t.getClass().getSimpleName(), t.getMessage(), t);
logger.error("Proceeding with emergency termination...");
exitFn.accept(EXIT_CODE_WATCHDOG_FAILURE);
} finally {
WATCHDOG_THREAD_REF.compareAndSet(Thread.currentThread(), null);
}
}

/** Test-only helper to check whether the watchdog thread is currently running. */
static boolean isRunningForTests() {
Thread t = WATCHDOG_THREAD_REF.get();
return t != null && t.isAlive();
}

/**
* Test-only helper to clean up any leftover watchdog thread and restore the default exit hook.
*/
@VisibleForTesting
static void resetForTests() {
Thread thread = WATCHDOG_THREAD_REF.getAndSet(null);
if (thread != null) {
thread.interrupt();
try {
thread.join(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
exitFn = System::exit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.kyuubi.shutdown.watchdog;

import org.apache.spark.SparkConf;

/**
* Spark configuration helpers for the shutdown watchdog plugin.
*
* <p>Implemented in Java to avoid pulling Scala classes into the plugin artifact.
*/
final class SparkShutdownWatchdogConf {

static final String SHUTDOWN_WATCHDOG_ENABLED_KEY = "spark.kyuubi.shutdown.watchdog.enabled";
static final boolean SHUTDOWN_WATCHDOG_ENABLED_DEFAULT = true;

static final String SHUTDOWN_WATCHDOG_TIMEOUT_KEY = "spark.kyuubi.shutdown.watchdog.timeout";
private static final String SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK = "0ms";

private SparkShutdownWatchdogConf() {}

static boolean isEnabled(SparkConf conf) {
return conf.getBoolean(SHUTDOWN_WATCHDOG_ENABLED_KEY, SHUTDOWN_WATCHDOG_ENABLED_DEFAULT);
}

static long getTimeoutMillis(SparkConf conf) {
return conf.getTimeAsMs(SHUTDOWN_WATCHDOG_TIMEOUT_KEY, SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK);
}
}
Loading
Loading