Skip to content

Commit 4695f65

Browse files
wangzhigang1999wangzhigang
authored andcommitted
feat: Introduce Shutdown Watchdog for Spark SQL Engine
- Added a new module `kyuubi-spark-shutdown-watchdog` to monitor and manage the graceful shutdown of Spark SQL engines. - Implemented configuration options for enabling the watchdog and setting timeout values. - Updated documentation to include usage instructions and configuration details for the new feature. - Removed the previous `ThreadDumpUtils` from `kyuubi-common` as its functionality is now encapsulated within the new module. - Added unit tests to ensure the watchdog operates correctly under various scenarios.
1 parent 5f4b1f0 commit 4695f65

File tree

8 files changed

+966
-0
lines changed

8 files changed

+966
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<!--
2+
- Licensed to the Apache Software Foundation (ASF) under one or more
3+
- contributor license agreements. See the NOTICE file distributed with
4+
- this work for additional information regarding copyright ownership.
5+
- The ASF licenses this file to You under the Apache License, Version 2.0
6+
- (the "License"); you may not use this file except in compliance with
7+
- the License. You may obtain a copy of the License at
8+
-
9+
- http://www.apache.org/licenses/LICENSE-2.0
10+
-
11+
- Unless required by applicable law or agreed to in writing, software
12+
- distributed under the License is distributed on an "AS IS" BASIS,
13+
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
- See the License for the specific language governing permissions and
15+
- limitations under the License.
16+
-->
17+
18+
# Shutdown Watchdog Plugin
19+
20+
The shutdown watchdog prevents zombie Spark SQL engines by monitoring graceful shutdown.
21+
If the driver does not finish within the configured timeout, the plugin produces a JVM thread
22+
dump for diagnostics and forcefully terminates the process to unblock resource reclamation.
23+
24+
## Build with Apache Maven
25+
26+
The plugin lives in the module `extensions/spark/kyuubi-spark-shutdown-watchdog` and can be built via:
27+
28+
```shell
29+
build/mvn clean package -DskipTests \
30+
-pl extensions/spark/kyuubi-spark-shutdown-watchdog -am
31+
```
32+
33+
The module still publishes artifacts with the standard Scala suffix
34+
(`kyuubi-spark-shutdown-watchdog_2.12`, `..._2.13`, etc.) so that it aligns with
35+
the rest of the project. Maven expands `${scala.binary.version}` automatically,
36+
so you can run the command above without worrying about Scala version specifics.
37+
Because the implementation is pure Java, there are no Scala runtime
38+
dependencies—building by module path is enough.
39+
40+
After the build succeeds the jar is located at:
41+
`./extensions/spark/kyuubi-spark-shutdown-watchdog/target/kyuubi-spark-shutdown-watchdog_${scala.binary.version}-${project.version}.jar`
42+
43+
## Installing
44+
45+
Place the jar on the Spark driver classpath, for example by:
46+
47+
- Copying it to `$SPARK_HOME/jars`, or
48+
- Pointing Spark to it through `spark.jars`
49+
50+
## Enabling the plugin
51+
52+
Add the plugin class to `spark.plugins` when launching the Spark SQL engine:
53+
54+
```properties
55+
spark.plugins=org.apache.spark.kyuubi.shutdown.watchdog.SparkShutdownWatchdogPlugin
56+
```
57+
58+
Configure the timeout directly through Spark (see also the general configuration
59+
table in `docs/configuration/settings.md`):
60+
61+
```properties
62+
spark.kyuubi.shutdown.watchdog.timeout=60000
63+
```
64+
65+
Tune this value according to how long you expect the engine to take during a
66+
normal shutdown; set `0` or a negative value to disable forced termination. In
67+
practice pick a value greater than other engine shutdown knobs (executor drain,
68+
engine pool shutdown, etc.) so that the watchdog only fires when everything else
69+
has already stalled.
70+
71+
## Configuration
72+
73+
| Configuration Key | Default | Description |
74+
|----------------------------------------|---------|------------------------------------------------------------------------------------------------------------|
75+
| spark.kyuubi.shutdown.watchdog.enabled | true | Enables/disables the plugin globally. |
76+
| spark.kyuubi.shutdown.watchdog.timeout | 0 | Maximum wait (milliseconds) for graceful shutdown before forcing termination. `0` or negative disables it. |
77+
78+
## Behavior on timeout
79+
80+
When the timeout elapses the plugin:
81+
82+
1. Emits a detailed JVM thread dump to the Spark driver logs.
83+
2. Terminates the driver with Spark's standard `SparkExitCode.UNCAUGHT_EXCEPTION` (the same code the
84+
driver would use for an uncaught fatal error). If the watchdog itself ever fails it will exit
85+
with `SparkExitCode.UNCAUGHT_EXCEPTION_TWICE`.
86+
87+
This keeps the shutdown semantics consistent with the rest of Spark, making it easier for cluster
88+
managers (YARN/Kubernetes) to treat the forced exit as a regular Spark failure.
89+
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.kyuubi</groupId>
23+
<artifactId>kyuubi-parent</artifactId>
24+
<version>1.11.0-SNAPSHOT</version>
25+
<relativePath>../../../pom.xml</relativePath>
26+
</parent>
27+
28+
<artifactId>kyuubi-spark-shutdown-watchdog_${scala.binary.version}</artifactId>
29+
<packaging>jar</packaging>
30+
<name>Kyuubi Spark Shutdown Watchdog</name>
31+
<url>https://kyuubi.apache.org/</url>
32+
33+
<dependencies>
34+
<dependency>
35+
<groupId>org.apache.spark</groupId>
36+
<artifactId>spark-core_${scala.binary.version}</artifactId>
37+
<scope>provided</scope>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.spark</groupId>
42+
<artifactId>spark-core_${scala.binary.version}</artifactId>
43+
<type>test-jar</type>
44+
<scope>test</scope>
45+
</dependency>
46+
</dependencies>
47+
48+
<build>
49+
<testResources>
50+
<testResource>
51+
<directory>${project.basedir}/src/test/resources</directory>
52+
</testResource>
53+
</testResources>
54+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
55+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
56+
</build>
57+
58+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.kyuubi.shutdown.watchdog;
19+
20+
import com.google.common.annotations.VisibleForTesting;
21+
import java.util.Objects;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.IntConsumer;
25+
import org.apache.spark.SparkConf;
26+
import org.apache.spark.util.SparkExitCode;
27+
import org.slf4j.Logger;
28+
29+
/**
30+
* Timer-backed watchdog that forces the Spark driver to exit if a graceful shutdown stalls.
31+
*
32+
* <p>Implementation is deliberately pure Java so the plugin has no Scala runtime dependency.
33+
*/
34+
public final class ShutdownWatchdog {
35+
36+
// Align with Spark's standard exit codes for consistency with Spark driver failures.
37+
static final int EXIT_CODE_FORCED_TERMINATION = SparkExitCode.UNCAUGHT_EXCEPTION();
38+
static final int EXIT_CODE_WATCHDOG_FAILURE = SparkExitCode.UNCAUGHT_EXCEPTION_TWICE();
39+
40+
private static final AtomicReference<Thread> WATCHDOG_THREAD_REF = new AtomicReference<>();
41+
private static volatile IntConsumer exitFn = System::exit;
42+
43+
private ShutdownWatchdog() {}
44+
45+
static void setExitFn(IntConsumer fn) {
46+
exitFn = fn != null ? fn : System::exit;
47+
}
48+
49+
static void startIfNeeded(SparkConf sparkConf, Logger logger) {
50+
Objects.requireNonNull(sparkConf, "sparkConf");
51+
Objects.requireNonNull(logger, "logger");
52+
53+
if (!SparkShutdownWatchdogConf.isEnabled(sparkConf)) {
54+
logger.info(
55+
"Shutdown Watchdog is disabled via {}=false.",
56+
SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_ENABLED_KEY);
57+
return;
58+
}
59+
60+
final long timeoutMillis = SparkShutdownWatchdogConf.getTimeoutMillis(sparkConf);
61+
if (timeoutMillis <= 0L) {
62+
logger.info("Shutdown Watchdog is disabled because timeout <= 0.");
63+
return;
64+
}
65+
66+
Thread existing = WATCHDOG_THREAD_REF.get();
67+
if (existing != null && existing.isAlive()) {
68+
logger.warn("Shutdown Watchdog is already running, ignoring duplicate start request.");
69+
return;
70+
}
71+
72+
final Thread watchdogThread =
73+
new Thread(() -> runWatchdogLoop(timeoutMillis, logger), "spark-shutdown-watchdog");
74+
watchdogThread.setDaemon(true);
75+
76+
if (!WATCHDOG_THREAD_REF.compareAndSet(existing, watchdogThread)) {
77+
logger.warn("Shutdown Watchdog could not be started because another instance won the race.");
78+
return;
79+
}
80+
81+
logger.info(
82+
"Shutdown Watchdog activated. Driver will be forcefully terminated if graceful shutdown "
83+
+ "exceeds {} ms.",
84+
timeoutMillis);
85+
86+
watchdogThread.start();
87+
88+
if (logger.isDebugEnabled()) {
89+
logger.debug("Shutdown Watchdog thread started: {}", watchdogThread.getName());
90+
}
91+
}
92+
93+
private static void runWatchdogLoop(long timeoutMillis, Logger logger) {
94+
try {
95+
if (logger.isDebugEnabled()) {
96+
logger.debug("Shutdown Watchdog thread monitoring with timeout {} ms.", timeoutMillis);
97+
}
98+
99+
TimeUnit.MILLISECONDS.sleep(timeoutMillis);
100+
101+
logger.error("EMERGENCY SHUTDOWN TRIGGERED");
102+
logger.error("Graceful shutdown exceeded {} ms timeout", timeoutMillis);
103+
logger.error("Non-daemon threads are preventing JVM exit");
104+
logger.error("Initiating forced termination...");
105+
106+
logger.error("=== THREAD DUMP FOR DIAGNOSTIC ===");
107+
ThreadDumpUtils.dumpToLogger(logger);
108+
logger.error("=== END OF THREAD DUMP ===");
109+
110+
logger.error("Forcefully terminating JVM now...");
111+
exitFn.accept(EXIT_CODE_FORCED_TERMINATION);
112+
} catch (InterruptedException ie) {
113+
if (logger.isDebugEnabled()) {
114+
logger.debug("Shutdown Watchdog interrupted, assuming normal driver shutdown.", ie);
115+
} else {
116+
logger.warn("Shutdown Watchdog interrupted, assuming normal driver shutdown.");
117+
}
118+
Thread.currentThread().interrupt();
119+
} catch (Throwable t) {
120+
logger.error(
121+
"Shutdown Watchdog error: {}: {}", t.getClass().getSimpleName(), t.getMessage(), t);
122+
logger.error("Proceeding with emergency termination...");
123+
exitFn.accept(EXIT_CODE_WATCHDOG_FAILURE);
124+
} finally {
125+
WATCHDOG_THREAD_REF.compareAndSet(Thread.currentThread(), null);
126+
}
127+
}
128+
129+
/** Test-only helper to check whether the watchdog thread is currently running. */
130+
static boolean isRunningForTests() {
131+
Thread t = WATCHDOG_THREAD_REF.get();
132+
return t != null && t.isAlive();
133+
}
134+
135+
/**
136+
* Test-only helper to clean up any leftover watchdog thread and restore the default exit hook.
137+
*/
138+
@VisibleForTesting
139+
static void resetForTests() {
140+
Thread thread = WATCHDOG_THREAD_REF.getAndSet(null);
141+
if (thread != null) {
142+
thread.interrupt();
143+
try {
144+
thread.join(1000);
145+
} catch (InterruptedException e) {
146+
Thread.currentThread().interrupt();
147+
}
148+
}
149+
exitFn = System::exit;
150+
}
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.kyuubi.shutdown.watchdog;
19+
20+
import org.apache.spark.SparkConf;
21+
22+
/**
23+
* Spark configuration helpers for the shutdown watchdog plugin.
24+
*
25+
* <p>Implemented in Java to avoid pulling Scala classes into the plugin artifact.
26+
*/
27+
final class SparkShutdownWatchdogConf {
28+
29+
static final String SHUTDOWN_WATCHDOG_ENABLED_KEY = "spark.kyuubi.shutdown.watchdog.enabled";
30+
static final boolean SHUTDOWN_WATCHDOG_ENABLED_DEFAULT = true;
31+
32+
static final String SHUTDOWN_WATCHDOG_TIMEOUT_KEY = "spark.kyuubi.shutdown.watchdog.timeout";
33+
private static final String SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK = "0ms";
34+
35+
private SparkShutdownWatchdogConf() {}
36+
37+
static boolean isEnabled(SparkConf conf) {
38+
return conf.getBoolean(SHUTDOWN_WATCHDOG_ENABLED_KEY, SHUTDOWN_WATCHDOG_ENABLED_DEFAULT);
39+
}
40+
41+
static long getTimeoutMillis(SparkConf conf) {
42+
return conf.getTimeAsMs(SHUTDOWN_WATCHDOG_TIMEOUT_KEY, SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK);
43+
}
44+
}

0 commit comments

Comments
 (0)