Skip to content

Commit c4818e3

Browse files
committed
Require block manager port to be configured
1 parent c138447 commit c4818e3

File tree

4 files changed

+62
-41
lines changed

4 files changed

+62
-41
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,12 @@
14991499
],
15001500
"sqlState" : "42702"
15011501
},
1502+
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT" : {
1503+
"message" : [
1504+
"Enabling the executor Kubernetes service requires <blockManagerPortConfigKey> to be set to a positive number, for instance <defaultShuffleServicePort>."
1505+
],
1506+
"sqlState" : "42000"
1507+
},
15021508
"EXEC_IMMEDIATE_DUPLICATE_ARGUMENT_ALIASES" : {
15031509
"message" : [
15041510
"The USING clause of this EXECUTE IMMEDIATE command contained multiple arguments with same alias (<aliases>), which is invalid; please update the command to specify unique aliases and then try it again."

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -419,12 +419,13 @@ private[spark] object Config extends Logging {
419419
val KUBERNETES_EXECUTOR_ENABLE_SERVICE =
420420
ConfigBuilder("spark.kubernetes.executor.enableService")
421421
.doc("If a Kubernetes service is created for the executor. " +
422-
"The executor pod creates a Kubernetes service that allows to connect to executor ports " +
423-
"via the Kubernetes service instead of the pod host IP. Connecting to such ports " +
424-
"instantly fails with 'connection refused' error once the executor got decommissioned. " +
422+
"A Kubernetes service is created for the executor pod that allows to connect to executor " +
423+
"ports via the Kubernetes service instead of the pod host IP. Once the executor got " +
424+
"decommissioned, connecting to such ports instantly fails with 'connection refused'. " +
425425
"Connection to the port via the pod host IP instead fails with a 'connection timeout' " +
426-
"which is set via NETWORK_TIMEOUT and defaults to 2 minutes. " +
427-
"The kubernetes service provides access to the executors shuffle service.")
426+
"after NETWORK_TIMEOUT, which defaults to 2 minutes. " +
427+
"The executor kubernetes service provides access to the executor's block manager, so " +
428+
"BLOCK_MANAGER_PORT has to be given a value greater than zero.")
428429
.version("4.1.0")
429430
.booleanConf
430431
.createWithDefault(false)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorServiceFeatureStep.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,30 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, ServiceBu
2222

2323
import org.apache.spark.SparkException
2424
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
25-
import org.apache.spark.internal.config.SHUFFLE_SERVICE_PORT
25+
import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, SHUFFLE_SERVICE_PORT}
2626

2727
class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
2828
private val spark_app_selector_label = "spark-app-selector"
2929
private val spark_exec_id_label = "spark-exec-id"
3030
private val service_selector_labels = Set(spark_app_selector_label, spark_exec_id_label)
31+
private lazy val selector = conf.labels
32+
.filter { case (key, _) => service_selector_labels.contains(key) }
3133

3234
private lazy val sparkAppSelector = getLabel(spark_app_selector_label)
3335
private lazy val sparkExecId = getLabel(spark_exec_id_label)
3436
// name length is 8 + 38 + 6 + 10 = 62
3537
// which fits in KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH = 63
3638
private lazy val serviceName = s"svc-$sparkAppSelector-exec-$sparkExecId"
3739

40+
// The executor kubernetes services requires BLOCK_MANAGER_PORT to be set
41+
private val blockManagerPortName = "spark-block-manager"
42+
private val blockManagerPort = conf.sparkConf.get(BLOCK_MANAGER_PORT)
43+
SparkException.require(blockManagerPort > 0,
44+
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT",
45+
Map(
46+
"blockManagerPortConfigKey" -> BLOCK_MANAGER_PORT.key,
47+
"defaultShuffleServicePort" -> SHUFFLE_SERVICE_PORT.defaultValue.get.toString));
48+
3849
private def getLabel(label: String): String = {
3950
val value = conf.labels.get(label)
4051
value.getOrElse(
@@ -55,23 +66,16 @@ class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends Kubernete
5566
}
5667

5768
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
58-
val selector = conf.labels
59-
.filter { case (key, _) => service_selector_labels.contains(key) }
60-
61-
// kubernetes executor service provides access to the executor's shuffle service
62-
val portName = "spark-shuffle-service"
63-
val port = conf.sparkConf.get(SHUFFLE_SERVICE_PORT)
64-
6569
val service = new ServiceBuilder()
6670
.withNewMetadata()
6771
.withName(serviceName)
6872
.endMetadata()
6973
.withNewSpec()
7074
.withSelector(selector.asJava)
7175
.addNewPort()
72-
.withName(portName)
73-
.withPort(port)
74-
.withNewTargetPort(port)
76+
.withName(blockManagerPortName)
77+
.withPort(blockManagerPort)
78+
.withNewTargetPort(blockManagerPort)
7579
.endPort()
7680
.endSpec()
7781
.build()

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import io.fabric8.kubernetes.api.model.Service
2222
import io.fabric8.kubernetes.client.KubernetesClient
2323
import org.mockito.Mockito.mock
2424

25-
import org.apache.spark.{SecurityManager, SparkConf}
25+
import org.apache.spark.{SecurityManager, SparkConf, SparkIllegalArgumentException}
2626
import org.apache.spark.deploy.k8s._
2727
import org.apache.spark.deploy.k8s.features.KubernetesExecutorCustomFeatureConfigStep
28-
import org.apache.spark.internal.config.{ConfigEntry, SHUFFLE_SERVICE_PORT}
28+
import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, ConfigEntry}
2929
import org.apache.spark.resource.ResourceProfile
3030

3131
class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
@@ -81,29 +81,39 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
8181
}
8282

8383
test("SPARK-XXXXX: check executor kubernetes spec with service enabled") {
84-
Seq(None, Some(1234)).foreach { somePort =>
85-
val sparkConf = baseConf.clone.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
86-
somePort.foreach(sparkConf.set(SHUFFLE_SERVICE_PORT, _))
87-
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
88-
val secMgr = new SecurityManager(sparkConf)
89-
val client = mock(classOf[KubernetesClient])
90-
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
91-
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
92-
93-
val containerEnvs = spec.pod.container.getEnv.asScala
94-
assert(containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))
95-
val containerEnv = containerEnvs.filter(_.getName === "EXECUTOR_SERVICE_NAME").head
96-
assert(containerEnv.getValue === "svc-appId-exec-1")
97-
98-
assert(spec.executorKubernetesResources.size === 1)
99-
val resource = spec.executorKubernetesResources.head
100-
assert(resource.getKind === "Service")
101-
val service = resource.asInstanceOf[Service]
102-
assert(service.getMetadata.getName === "svc-appId-exec-1")
103-
assert(service.getSpec.getPorts.size() === 1)
104-
val port = service.getSpec.getPorts.get(0)
105-
assert(port.getName === "spark-shuffle-service")
106-
assert(port.getPort === somePort.getOrElse(SHUFFLE_SERVICE_PORT.defaultValue.get))
84+
val sparkConf = baseConf.clone
85+
.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
86+
.set(BLOCK_MANAGER_PORT, 1234)
87+
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
88+
val secMgr = new SecurityManager(sparkConf)
89+
val client = mock(classOf[KubernetesClient])
90+
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
91+
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
92+
93+
val containerEnvs = spec.pod.container.getEnv.asScala
94+
assert(containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))
95+
val containerEnv = containerEnvs.filter(_.getName === "EXECUTOR_SERVICE_NAME").head
96+
assert(containerEnv.getValue === "svc-appId-exec-1")
97+
98+
assert(spec.executorKubernetesResources.size === 1)
99+
val resource = spec.executorKubernetesResources.head
100+
assert(resource.getKind === "Service")
101+
val service = resource.asInstanceOf[Service]
102+
assert(service.getMetadata.getName === "svc-appId-exec-1")
103+
assert(service.getSpec.getPorts.size() === 1)
104+
val port = service.getSpec.getPorts.get(0)
105+
assert(port.getName === "spark-block-manager")
106+
assert(port.getPort === 1234)
107+
}
108+
109+
test("SPARK-XXXXX: check executor kubernetes service requires block manager port") {
110+
val sparkConf = baseConf.clone.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
111+
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
112+
val secMgr = new SecurityManager(sparkConf)
113+
val client = mock(classOf[KubernetesClient])
114+
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
115+
assertThrows[SparkIllegalArgumentException] {
116+
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
107117
}
108118
}
109119
}

0 commit comments

Comments
 (0)