Skip to content

Commit b7f2ab2

Browse files
victors-oaiForVic
authored andcommitted
[SPARK-53335][K8S] Support spark.kubernetes.driver.annotateExitException
### What changes were proposed in this pull request? This PR aims to provide a way for users to annotate the driver pod with exception message when the applications exit with exceptions. ### Why are the changes needed? For jobs which run on kubernetes there is no native concept of diagnostics (like there is in YARN), which means that for debugging and triaging errors users _must_ go to logs. For jobs which run on YARN this is often not necessary, since the diagnostics contains the root cause reason for failure. Additionally, for platforms which provide automation of failure insights, or make decisions based on failures, there must be a custom solution or deciding why the application failed (e.g. log and stack trace parsing). We use a similar mechanism as #23599 to load custom implementations in order to avoid the dependency on the k8s module from SparkSubmit. ### Does this PR introduce _any_ user-facing change? Yes, a config, which is defaulted to false. ### How was this patch tested? Unit tested + verified in production k8s cluster. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52068 from ForVic/vsunderl/better_kubernetes_diagnostics. Lead-authored-by: Victor Sunderland <[email protected]> Co-authored-by: ForVic <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 45c0a46 commit b7f2ab2

File tree

6 files changed

+252
-0
lines changed

6 files changed

+252
-0
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,10 @@ private[spark] class SparkSubmit extends Logging {
10351035
exitCode = e.exitCode
10361036
case _ =>
10371037
}
1038+
// Store the diagnostics externally if enabled, but still throw to complete the application.
1039+
if (sparkConf.getBoolean("spark.kubernetes.driver.annotateExitException", false)) {
1040+
annotateExitException(args, sparkConf, cause)
1041+
}
10381042
throw cause
10391043
} finally {
10401044
if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
@@ -1058,6 +1062,24 @@ private[spark] class SparkSubmit extends Logging {
10581062
/** Throw a SparkException with the given error message. */
10591063
private def error(msg: String): Unit = throw new SparkException(msg)
10601064

1065+
/**
1066+
* Store the exit exception using the SparkDiagnosticsSetter.
1067+
*/
1068+
private def annotateExitException(
1069+
args: SparkSubmitArguments,
1070+
sparkConf: SparkConf,
1071+
throwable: Throwable): Unit = {
1072+
// Swallow exceptions when storing diagnostics, this shouldn't fail the application.
1073+
try {
1074+
if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass)
1075+
&& !isThriftServer(args.mainClass) && !isConnectServer(args.mainClass)) {
1076+
SparkSubmitUtils.getSparkDiagnosticsSetters(args.master)
1077+
.foreach(_.setDiagnostics(throwable, sparkConf))
1078+
}
1079+
} catch {
1080+
case e: Throwable => logDebug(s"Failed to set diagnostics: $e")
1081+
}
1082+
}
10611083
}
10621084

10631085

@@ -1233,6 +1255,23 @@ private[spark] object SparkSubmitUtils {
12331255
case _ => throw new SparkException(s"Spark config without '=': $pair")
12341256
}
12351257
}
1258+
1259+
private[deploy] def getSparkDiagnosticsSetters(
1260+
master: String): Option[SparkDiagnosticsSetter] = {
1261+
val loader = Utils.getContextOrSparkClassLoader
1262+
val serviceLoaders =
1263+
ServiceLoader.load(classOf[SparkDiagnosticsSetter], loader)
1264+
.asScala
1265+
.filter(_.supports(master))
1266+
1267+
serviceLoaders.size match {
1268+
case x if x > 1 =>
1269+
throw new SparkException(s"Multiple($x) external SparkDiagnosticsSetter registered.")
1270+
case 1 =>
1271+
Some(serviceLoaders.headOption.get)
1272+
case _ => None
1273+
}
1274+
}
12361275
}
12371276

12381277
/**
@@ -1255,3 +1294,20 @@ private[spark] trait SparkSubmitOperation {
12551294

12561295
def supports(master: String): Boolean
12571296
}
1297+
1298+
/**
1299+
* Provides a hook to set the application failure details in some external system.
1300+
*/
1301+
private[spark] trait SparkDiagnosticsSetter {
1302+
1303+
/**
1304+
* Set the failure details.
1305+
*/
1306+
def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit
1307+
1308+
/**
1309+
* Whether this implementation of the SparkDiagnosticsSetter supports setting the exit
1310+
* exception for this application.
1311+
*/
1312+
def supports(clusterManagerUrl: String): Boolean
1313+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
org.apache.spark.deploy.k8s.SparkKubernetesDiagnosticsSetter

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,14 @@ private[spark] object Config extends Logging {
803803
.checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value")
804804
.createWithDefaultString("20s")
805805

806+
val KUBERNETES_ANNOTATE_EXIT_EXCEPTION =
807+
ConfigBuilder("spark.kubernetes.driver.annotateExitException")
808+
.doc("If set to true, Spark will store the exit exception failed applications in" +
809+
s" the Kubernetes API server using the $EXIT_EXCEPTION_ANNOTATION annotation.")
810+
.version("4.1.0")
811+
.booleanConf
812+
.createWithDefault(false)
813+
806814
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
807815
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
808816
val KUBERNETES_DRIVER_SERVICE_LABEL_PREFIX = "spark.kubernetes.driver.service.label."

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ object Constants {
107107
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
108108
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d
109109
val CONNECT_GRPC_BINDING_PORT = "spark.connect.grpc.binding.port"
110+
val EXIT_EXCEPTION_ANNOTATION = "spark.exit-exception"
110111

111112
// Hadoop Configuration
112113
val HADOOP_CONF_VOLUME = "hadoop-properties"
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
20+
import io.fabric8.kubernetes.client.KubernetesClient
21+
import org.apache.hadoop.util.StringUtils
22+
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.deploy.SparkDiagnosticsSetter
25+
import org.apache.spark.deploy.k8s.Config._
26+
import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION
27+
import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory.ClientType
28+
import org.apache.spark.internal.Logging
29+
import org.apache.spark.util.{SparkStringUtils, Utils}
30+
31+
/**
32+
* We use this trait and its implementation to allow for mocking the static
33+
* client creation in tests.
34+
*/
35+
private[spark] trait KubernetesClientProvider {
36+
def create(conf: SparkConf): KubernetesClient
37+
}
38+
39+
private[spark] class DefaultKubernetesClientProvider extends KubernetesClientProvider {
40+
override def create(conf: SparkConf): KubernetesClient = {
41+
SparkKubernetesClientFactory.createKubernetesClient(
42+
conf.get(KUBERNETES_DRIVER_MASTER_URL),
43+
Option(conf.get(KUBERNETES_NAMESPACE)),
44+
KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
45+
ClientType.Driver,
46+
conf,
47+
None)
48+
}
49+
}
50+
51+
private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: KubernetesClientProvider)
52+
extends SparkDiagnosticsSetter with Logging {
53+
54+
private val KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB
55+
56+
def this() = {
57+
this(new DefaultKubernetesClientProvider)
58+
}
59+
60+
override def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit = {
61+
val diagnostics = SparkStringUtils.abbreviate(StringUtils.stringifyException(throwable),
62+
KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES)
63+
Utils.tryWithResource(clientProvider.create(conf)) { client =>
64+
conf.get(KUBERNETES_DRIVER_POD_NAME).foreach { podName =>
65+
client.pods()
66+
.inNamespace(conf.get(KUBERNETES_NAMESPACE))
67+
.withName(podName)
68+
.edit((p: Pod) => new PodBuilder(p)
69+
.editOrNewMetadata()
70+
.addToAnnotations(EXIT_EXCEPTION_ANNOTATION, diagnostics)
71+
.endMetadata()
72+
.build());
73+
}
74+
}
75+
}
76+
77+
override def supports(clusterManagerUrl: String): Boolean = {
78+
clusterManagerUrl.startsWith("k8s://")
79+
}
80+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.util.function.UnaryOperator
20+
21+
import io.fabric8.kubernetes.api.model.Pod
22+
import io.fabric8.kubernetes.client.KubernetesClient
23+
import io.fabric8.kubernetes.client.dsl.PodResource
24+
import org.apache.hadoop.util.StringUtils
25+
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
26+
import org.mockito.ArgumentMatchers.any
27+
import org.mockito.Mockito._
28+
import org.scalatest.BeforeAndAfterEach
29+
import org.scalatestplus.mockito.MockitoSugar
30+
31+
import org.apache.spark.{SparkConf, SparkFunSuite}
32+
import org.apache.spark.deploy.k8s.Config._
33+
import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION
34+
import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS
35+
36+
class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite
37+
with MockitoSugar with BeforeAndAfterEach {
38+
39+
@Mock
40+
private var client: KubernetesClient = _
41+
@Mock
42+
private var clientProvider: KubernetesClientProvider = _
43+
@Mock
44+
private var podOperations: PODS = _
45+
@Mock
46+
private var driverPodOperations: PodResource = _
47+
48+
private val driverPodName: String = "driver-pod"
49+
private val k8sClusterManagerUrl: String = "k8s://dummy"
50+
private val namespace: String = "default"
51+
52+
private var setter: SparkKubernetesDiagnosticsSetter = _
53+
54+
override def beforeEach(): Unit = {
55+
MockitoAnnotations.openMocks(this)
56+
when(client.pods()).thenReturn(podOperations)
57+
when(podOperations.inNamespace(namespace)).thenReturn(podOperations)
58+
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
59+
when(clientProvider.create(any(classOf[SparkConf]))).thenReturn(client)
60+
setter = new SparkKubernetesDiagnosticsSetter(clientProvider)
61+
}
62+
63+
test("supports() should return true only for k8s:// URLs") {
64+
assert(setter.supports(k8sClusterManagerUrl))
65+
assert(!setter.supports("yarn"))
66+
assert(!setter.supports("spark://localhost"))
67+
}
68+
69+
test("setDiagnostics should patch pod with diagnostics annotation") {
70+
val diagnostics = new Throwable("Fake diagnostics stack trace")
71+
val conf = new SparkConf()
72+
.set(KUBERNETES_DRIVER_MASTER_URL, k8sClusterManagerUrl)
73+
.set(KUBERNETES_NAMESPACE, namespace)
74+
.set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
75+
76+
setter.setDiagnostics(diagnostics, conf)
77+
78+
val captor: ArgumentCaptor[UnaryOperator[Pod]] =
79+
ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]])
80+
verify(driverPodOperations).edit(captor.capture())
81+
82+
val fn = captor.getValue
83+
val initialPod = SparkPod.initialPod().pod
84+
val editedPod = fn.apply(initialPod)
85+
86+
assert(editedPod.getMetadata.getAnnotations.get(EXIT_EXCEPTION_ANNOTATION)
87+
== StringUtils.stringifyException(diagnostics))
88+
}
89+
}

0 commit comments

Comments
 (0)