Skip to content

Commit f89fc23

Browse files
author
wangzhigang
committed
feat: Introduce Shutdown Watchdog for Spark SQL Engine
- Added a new module `kyuubi-spark-shutdown-watchdog` to monitor and manage the graceful shutdown of Spark SQL engines. - Implemented configuration options for enabling the watchdog and setting timeout values. - Updated documentation to include usage instructions and configuration details for the new feature. - Removed the previous `ThreadDumpUtils` from `kyuubi-common` as its functionality is now encapsulated within the new module. - Added unit tests to ensure the watchdog operates correctly under various scenarios.
1 parent 5ea5a34 commit f89fc23

File tree

13 files changed

+968
-629
lines changed

13 files changed

+968
-629
lines changed

docs/configuration/settings.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,6 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
494494
| kyuubi.session.engine.open.onFailure | RETRY | The behavior when opening engine failed: <ul> <li>RETRY: retry to open engine for kyuubi.session.engine.open.max.attempts times.</li> <li>DEREGISTER_IMMEDIATELY: deregister the engine immediately.</li> <li>DEREGISTER_AFTER_RETRY: deregister the engine after retry to open engine for kyuubi.session.engine.open.max.attempts times.</li></ul> | string | 1.8.1 |
495495
| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 |
496496
| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 |
497-
| kyuubi.session.engine.shutdown.watchdog.timeout | PT1M | The maximum time to wait for the engine to shutdown gracefully before forcing termination. When an engine shutdown is initiated, this watchdog timer starts counting down. If the engine doesn't complete shutdown within this timeout period, it will be forcefully terminated to prevent hanging. Set to 0 or a negative value to disable the forced shutdown mechanism. | duration | 1.11.0 |
498497
| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 |
499498
| kyuubi.session.engine.spark.main.resource | &lt;undefined&gt; | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 |
500499
| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 |
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<!--
2+
- Licensed to the Apache Software Foundation (ASF) under one or more
3+
- contributor license agreements. See the NOTICE file distributed with
4+
- this work for additional information regarding copyright ownership.
5+
- The ASF licenses this file to You under the Apache License, Version 2.0
6+
- (the "License"); you may not use this file except in compliance with
7+
- the License. You may obtain a copy of the License at
8+
-
9+
- http://www.apache.org/licenses/LICENSE-2.0
10+
-
11+
- Unless required by applicable law or agreed to in writing, software
12+
- distributed under the License is distributed on an "AS IS" BASIS,
13+
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
- See the License for the specific language governing permissions and
15+
- limitations under the License.
16+
-->
17+
18+
# Shutdown Watchdog Plugin
19+
20+
The shutdown watchdog prevents zombie Spark SQL engines by monitoring graceful shutdown.
21+
If the driver does not finish within the configured timeout, the plugin produces a JVM thread
22+
dump for diagnostics and forcefully terminates the process to unblock resource reclamation.
23+
24+
## Build with Apache Maven
25+
26+
The plugin lives in the module `extensions/spark/kyuubi-spark-shutdown-watchdog` and can be built via:
27+
28+
```shell
29+
build/mvn clean package -DskipTests \
30+
-pl extensions/spark/kyuubi-spark-shutdown-watchdog -am
31+
```
32+
33+
The module still publishes artifacts with the standard Scala suffix
34+
(`kyuubi-spark-shutdown-watchdog_2.12`, `..._2.13`, etc.) so that it aligns with
35+
the rest of the project. Maven expands `${scala.binary.version}` automatically,
36+
so you can run the command above without worrying about Scala version specifics.
37+
Because the implementation is pure Java, there are no Scala runtime
38+
dependencies—building by module path is enough.
39+
40+
After the build succeeds the jar is located at:
41+
`./extensions/spark/kyuubi-spark-shutdown-watchdog/target/kyuubi-spark-shutdown-watchdog_${scala.binary.version}-${project.version}.jar`
42+
43+
## Installing
44+
45+
Place the jar on the Spark driver classpath, for example by:
46+
47+
- Copying it to `$SPARK_HOME/jars`, or
48+
- Pointing Spark to it through `spark.jars`
49+
50+
## Enabling the plugin
51+
52+
Add the plugin class to `spark.plugins` when launching the Spark SQL engine:
53+
54+
```properties
55+
spark.plugins=org.apache.spark.kyuubi.shutdown.watchdog.SparkShutdownWatchdogPlugin
56+
```
57+
58+
Configure the timeout directly through Spark (see also the general configuration
59+
table in `docs/configuration/settings.md`):
60+
61+
```properties
62+
spark.kyuubi.shutdown.watchdog.timeout=60000
63+
```
64+
65+
Tune this value according to how long you expect the engine to take during a
66+
normal shutdown; set `0` or a negative value to disable forced termination. In
67+
practice pick a value greater than other engine shutdown knobs (executor drain,
68+
engine pool shutdown, etc.) so that the watchdog only fires when everything else
69+
has already stalled.
70+
71+
## Configuration
72+
73+
| Configuration Key | Default | Description |
74+
|----------------------------------------|---------|------------------------------------------------------------------------------------------------------------|
75+
| spark.kyuubi.shutdown.watchdog.enabled | true | Enables/disables the plugin globally. |
76+
| spark.kyuubi.shutdown.watchdog.timeout | 0 | Maximum wait (milliseconds) for graceful shutdown before forcing termination. `0` or negative disables it. |
77+
78+
## Behavior on timeout
79+
80+
When the timeout elapses the plugin:
81+
82+
1. Emits a detailed JVM thread dump to the Spark driver logs.
83+
2. Terminates the driver with Spark's standard `SparkExitCode.UNCAUGHT_EXCEPTION` (the same code the
84+
driver would use for an uncaught fatal error). If the watchdog itself ever fails it will exit
85+
with `SparkExitCode.UNCAUGHT_EXCEPTION_TWICE`.
86+
87+
This keeps the shutdown semantics consistent with the rest of Spark, making it easier for cluster
88+
managers (YARN/Kubernetes) to treat the forced exit as a regular Spark failure.
89+
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
<parent>
22+
<groupId>org.apache.kyuubi</groupId>
23+
<artifactId>kyuubi-parent</artifactId>
24+
<version>1.11.0-SNAPSHOT</version>
25+
<relativePath>../../../pom.xml</relativePath>
26+
</parent>
27+
28+
<artifactId>kyuubi-spark-shutdown-watchdog_${scala.binary.version}</artifactId>
29+
<packaging>jar</packaging>
30+
<name>Kyuubi Spark Shutdown Watchdog</name>
31+
<url>https://kyuubi.apache.org/</url>
32+
33+
<dependencies>
34+
<dependency>
35+
<groupId>org.apache.spark</groupId>
36+
<artifactId>spark-core_${scala.binary.version}</artifactId>
37+
<scope>provided</scope>
38+
</dependency>
39+
40+
<dependency>
41+
<groupId>org.apache.spark</groupId>
42+
<artifactId>spark-core_${scala.binary.version}</artifactId>
43+
<type>test-jar</type>
44+
<scope>test</scope>
45+
</dependency>
46+
</dependencies>
47+
48+
<build>
49+
<testResources>
50+
<testResource>
51+
<directory>${project.basedir}/src/test/resources</directory>
52+
</testResource>
53+
</testResources>
54+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
55+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
56+
</build>
57+
58+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.kyuubi.shutdown.watchdog;
19+
20+
import com.google.common.annotations.VisibleForTesting;
21+
import java.util.Objects;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.IntConsumer;
25+
import org.apache.spark.SparkConf;
26+
import org.apache.spark.util.SparkExitCode;
27+
import org.slf4j.Logger;
28+
29+
/**
30+
* Timer-backed watchdog that forces the Spark driver to exit if a graceful shutdown stalls.
31+
*
32+
* <p>Implementation is deliberately pure Java so the plugin has no Scala runtime dependency.
33+
*/
34+
public final class ShutdownWatchdog {
35+
36+
// Align with Spark's standard exit codes for consistency with Spark driver failures.
37+
static final int EXIT_CODE_FORCED_TERMINATION = SparkExitCode.UNCAUGHT_EXCEPTION();
38+
static final int EXIT_CODE_WATCHDOG_FAILURE = SparkExitCode.UNCAUGHT_EXCEPTION_TWICE();
39+
40+
private static final AtomicReference<Thread> WATCHDOG_THREAD_REF = new AtomicReference<>();
41+
private static volatile IntConsumer exitFn = System::exit;
42+
43+
private ShutdownWatchdog() {}
44+
45+
static void setExitFn(IntConsumer fn) {
46+
exitFn = fn != null ? fn : System::exit;
47+
}
48+
49+
static void startIfNeeded(SparkConf sparkConf, Logger logger) {
50+
Objects.requireNonNull(sparkConf, "sparkConf");
51+
Objects.requireNonNull(logger, "logger");
52+
53+
if (!SparkShutdownWatchdogConf.isEnabled(sparkConf)) {
54+
logger.info(
55+
"Shutdown Watchdog is disabled via {}=false.",
56+
SparkShutdownWatchdogConf.SHUTDOWN_WATCHDOG_ENABLED_KEY);
57+
return;
58+
}
59+
60+
final long timeoutMillis = SparkShutdownWatchdogConf.getTimeoutMillis(sparkConf);
61+
if (timeoutMillis <= 0L) {
62+
logger.info("Shutdown Watchdog is disabled because timeout <= 0.");
63+
return;
64+
}
65+
66+
Thread existing = WATCHDOG_THREAD_REF.get();
67+
if (existing != null && existing.isAlive()) {
68+
logger.warn("Shutdown Watchdog is already running, ignoring duplicate start request.");
69+
return;
70+
}
71+
72+
final Thread watchdogThread =
73+
new Thread(() -> runWatchdogLoop(timeoutMillis, logger), "spark-shutdown-watchdog");
74+
watchdogThread.setDaemon(true);
75+
76+
if (!WATCHDOG_THREAD_REF.compareAndSet(existing, watchdogThread)) {
77+
logger.warn("Shutdown Watchdog could not be started because another instance won the race.");
78+
return;
79+
}
80+
81+
logger.info(
82+
"Shutdown Watchdog activated. Driver will be forcefully terminated if graceful shutdown "
83+
+ "exceeds {} ms.",
84+
timeoutMillis);
85+
86+
watchdogThread.start();
87+
88+
if (logger.isDebugEnabled()) {
89+
logger.debug("Shutdown Watchdog thread started: {}", watchdogThread.getName());
90+
}
91+
}
92+
93+
private static void runWatchdogLoop(long timeoutMillis, Logger logger) {
94+
try {
95+
if (logger.isDebugEnabled()) {
96+
logger.debug("Shutdown Watchdog thread monitoring with timeout {} ms.", timeoutMillis);
97+
}
98+
99+
TimeUnit.MILLISECONDS.sleep(timeoutMillis);
100+
101+
logger.error("EMERGENCY SHUTDOWN TRIGGERED");
102+
logger.error("Graceful shutdown exceeded {} ms timeout", timeoutMillis);
103+
logger.error("Non-daemon threads are preventing JVM exit");
104+
logger.error("Initiating forced termination...");
105+
106+
logger.error("=== THREAD DUMP FOR DIAGNOSTIC ===");
107+
ThreadDumpUtils.dumpToLogger(logger);
108+
logger.error("=== END OF THREAD DUMP ===");
109+
110+
logger.error("Forcefully terminating JVM now...");
111+
exitFn.accept(EXIT_CODE_FORCED_TERMINATION);
112+
} catch (InterruptedException ie) {
113+
if (logger.isDebugEnabled()) {
114+
logger.debug("Shutdown Watchdog interrupted, assuming normal driver shutdown.", ie);
115+
} else {
116+
logger.warn("Shutdown Watchdog interrupted, assuming normal driver shutdown.");
117+
}
118+
Thread.currentThread().interrupt();
119+
} catch (Throwable t) {
120+
logger.error(
121+
"Shutdown Watchdog error: {}: {}", t.getClass().getSimpleName(), t.getMessage(), t);
122+
logger.error("Proceeding with emergency termination...");
123+
exitFn.accept(EXIT_CODE_WATCHDOG_FAILURE);
124+
} finally {
125+
WATCHDOG_THREAD_REF.compareAndSet(Thread.currentThread(), null);
126+
}
127+
}
128+
129+
/** Test-only helper to check whether the watchdog thread is currently running. */
130+
static boolean isRunningForTests() {
131+
Thread t = WATCHDOG_THREAD_REF.get();
132+
return t != null && t.isAlive();
133+
}
134+
135+
/**
136+
* Test-only helper to clean up any leftover watchdog thread and restore the default exit hook.
137+
*/
138+
@VisibleForTesting
139+
static void resetForTests() {
140+
Thread thread = WATCHDOG_THREAD_REF.getAndSet(null);
141+
if (thread != null) {
142+
thread.interrupt();
143+
try {
144+
thread.join(1000);
145+
} catch (InterruptedException e) {
146+
Thread.currentThread().interrupt();
147+
}
148+
}
149+
exitFn = System::exit;
150+
}
151+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.kyuubi.shutdown.watchdog;
19+
20+
import org.apache.spark.SparkConf;
21+
22+
/**
23+
* Spark configuration helpers for the shutdown watchdog plugin.
24+
*
25+
* <p>Implemented in Java to avoid pulling Scala classes into the plugin artifact.
26+
*/
27+
final class SparkShutdownWatchdogConf {
28+
29+
static final String SHUTDOWN_WATCHDOG_ENABLED_KEY = "spark.kyuubi.shutdown.watchdog.enabled";
30+
static final boolean SHUTDOWN_WATCHDOG_ENABLED_DEFAULT = true;
31+
32+
static final String SHUTDOWN_WATCHDOG_TIMEOUT_KEY = "spark.kyuubi.shutdown.watchdog.timeout";
33+
private static final String SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK = "0ms";
34+
35+
private SparkShutdownWatchdogConf() {}
36+
37+
static boolean isEnabled(SparkConf conf) {
38+
return conf.getBoolean(SHUTDOWN_WATCHDOG_ENABLED_KEY, SHUTDOWN_WATCHDOG_ENABLED_DEFAULT);
39+
}
40+
41+
static long getTimeoutMillis(SparkConf conf) {
42+
return conf.getTimeAsMs(SHUTDOWN_WATCHDOG_TIMEOUT_KEY, SHUTDOWN_WATCHDOG_TIMEOUT_FALLBACK);
43+
}
44+
}

0 commit comments

Comments
 (0)