Skip to content

Commit 6bfacf0

Browse files
committed
Require block manager port to be configured
1 parent dd1e73e commit 6bfacf0

File tree

4 files changed

+62
-41
lines changed

4 files changed

+62
-41
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1564,6 +1564,12 @@
15641564
],
15651565
"sqlState" : "42702"
15661566
},
1567+
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT" : {
1568+
"message" : [
1569+
"Enabling the executor Kubernetes service requires <blockManagerPortConfigKey> to be set to a positive number, for instance <defaultShuffleServicePort>."
1570+
],
1571+
"sqlState" : "42000"
1572+
},
15671573
"EXEC_IMMEDIATE_DUPLICATE_ARGUMENT_ALIASES" : {
15681574
"message" : [
15691575
"The USING clause of this EXECUTE IMMEDIATE command contained multiple arguments with same alias (<aliases>), which is invalid; please update the command to specify unique aliases and then try it again."

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,12 +438,13 @@ private[spark] object Config extends Logging {
438438
val KUBERNETES_EXECUTOR_ENABLE_SERVICE =
439439
ConfigBuilder("spark.kubernetes.executor.enableService")
440440
.doc("If a Kubernetes service is created for the executor. " +
441-
"The executor pod creates a Kubernetes service that allows to connect to executor ports " +
442-
"via the Kubernetes service instead of the pod host IP. Connecting to such ports " +
443-
"instantly fails with 'connection refused' error once the executor got decommissioned. " +
441+
"A Kubernetes service is created for the executor pod that allows to connect to executor " +
442+
"ports via the Kubernetes service instead of the pod host IP. Once the executor got " +
443+
"decommissioned, connecting to such ports instantly fails with 'connection refused'. " +
444444
"Connection to the port via the pod host IP instead fails with a 'connection timeout' " +
445-
"which is set via NETWORK_TIMEOUT and defaults to 2 minutes. " +
446-
"The kubernetes service provides access to the executors shuffle service.")
445+
"after NETWORK_TIMEOUT, which defaults to 2 minutes. " +
446+
"The executor kubernetes service provides access to the executor's block manager, so " +
447+
"BLOCK_MANAGER_PORT has to be given a value greater than zero.")
447448
.version("4.1.0")
448449
.booleanConf
449450
.createWithDefault(false)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorServiceFeatureStep.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,30 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, ServiceBu
2222

2323
import org.apache.spark.SparkException
2424
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
25-
import org.apache.spark.internal.config.SHUFFLE_SERVICE_PORT
25+
import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, SHUFFLE_SERVICE_PORT}
2626

2727
class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends KubernetesFeatureConfigStep {
2828
private val spark_app_selector_label = "spark-app-selector"
2929
private val spark_exec_id_label = "spark-exec-id"
3030
private val service_selector_labels = Set(spark_app_selector_label, spark_exec_id_label)
31+
private lazy val selector = conf.labels
32+
.filter { case (key, _) => service_selector_labels.contains(key) }
3133

3234
private lazy val sparkAppSelector = getLabel(spark_app_selector_label)
3335
private lazy val sparkExecId = getLabel(spark_exec_id_label)
3436
// name length is 8 + 38 + 6 + 10 = 62
3537
// which fits in KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH = 63
3638
private lazy val serviceName = s"svc-$sparkAppSelector-exec-$sparkExecId"
3739

40+
// The executor kubernetes services requires BLOCK_MANAGER_PORT to be set
41+
private val blockManagerPortName = "spark-block-manager"
42+
private val blockManagerPort = conf.sparkConf.get(BLOCK_MANAGER_PORT)
43+
SparkException.require(blockManagerPort > 0,
44+
"EXECUTOR_KUBERNETES_SERVICE_REQUIRES_BLOCK_MANAGER_PORT",
45+
Map(
46+
"blockManagerPortConfigKey" -> BLOCK_MANAGER_PORT.key,
47+
"defaultShuffleServicePort" -> SHUFFLE_SERVICE_PORT.defaultValue.get.toString));
48+
3849
private def getLabel(label: String): String = {
3950
val value = conf.labels.get(label)
4051
value.getOrElse(
@@ -55,23 +66,16 @@ class ExecutorServiceFeatureStep(conf: KubernetesExecutorConf) extends Kubernete
5566
}
5667

5768
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
58-
val selector = conf.labels
59-
.filter { case (key, _) => service_selector_labels.contains(key) }
60-
61-
// kubernetes executor service provides access to the executor's shuffle service
62-
val portName = "spark-shuffle-service"
63-
val port = conf.sparkConf.get(SHUFFLE_SERVICE_PORT)
64-
6569
val service = new ServiceBuilder()
6670
.withNewMetadata()
6771
.withName(serviceName)
6872
.endMetadata()
6973
.withNewSpec()
7074
.withSelector(selector.asJava)
7175
.addNewPort()
72-
.withName(portName)
73-
.withPort(port)
74-
.withNewTargetPort(port)
76+
.withName(blockManagerPortName)
77+
.withPort(blockManagerPort)
78+
.withNewTargetPort(blockManagerPort)
7579
.endPort()
7680
.endSpec()
7781
.build()

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import io.fabric8.kubernetes.api.model.Service
2222
import io.fabric8.kubernetes.client.KubernetesClient
2323
import org.mockito.Mockito.mock
2424

25-
import org.apache.spark.{SecurityManager, SparkConf}
25+
import org.apache.spark.{SecurityManager, SparkConf, SparkIllegalArgumentException}
2626
import org.apache.spark.deploy.k8s._
2727
import org.apache.spark.deploy.k8s.features.KubernetesExecutorCustomFeatureConfigStep
28-
import org.apache.spark.internal.config.{ConfigEntry, SHUFFLE_SERVICE_PORT}
28+
import org.apache.spark.internal.config.{BLOCK_MANAGER_PORT, ConfigEntry}
2929
import org.apache.spark.resource.ResourceProfile
3030

3131
class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
@@ -85,29 +85,39 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
8585
}
8686

8787
test("SPARK-XXXXX: check executor kubernetes spec with service enabled") {
88-
Seq(None, Some(1234)).foreach { somePort =>
89-
val sparkConf = baseConf.clone.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
90-
somePort.foreach(sparkConf.set(SHUFFLE_SERVICE_PORT, _))
91-
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
92-
val secMgr = new SecurityManager(sparkConf)
93-
val client = mock(classOf[KubernetesClient])
94-
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
95-
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
96-
97-
val containerEnvs = spec.pod.container.getEnv.asScala
98-
assert(containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))
99-
val containerEnv = containerEnvs.filter(_.getName === "EXECUTOR_SERVICE_NAME").head
100-
assert(containerEnv.getValue === "svc-appId-exec-1")
101-
102-
assert(spec.executorKubernetesResources.size === 1)
103-
val resource = spec.executorKubernetesResources.head
104-
assert(resource.getKind === "Service")
105-
val service = resource.asInstanceOf[Service]
106-
assert(service.getMetadata.getName === "svc-appId-exec-1")
107-
assert(service.getSpec.getPorts.size() === 1)
108-
val port = service.getSpec.getPorts.get(0)
109-
assert(port.getName === "spark-shuffle-service")
110-
assert(port.getPort === somePort.getOrElse(SHUFFLE_SERVICE_PORT.defaultValue.get))
88+
val sparkConf = baseConf.clone
89+
.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
90+
.set(BLOCK_MANAGER_PORT, 1234)
91+
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
92+
val secMgr = new SecurityManager(sparkConf)
93+
val client = mock(classOf[KubernetesClient])
94+
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
95+
val spec = new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
96+
97+
val containerEnvs = spec.pod.container.getEnv.asScala
98+
assert(containerEnvs.exists(_.getName === "EXECUTOR_SERVICE_NAME"))
99+
val containerEnv = containerEnvs.filter(_.getName === "EXECUTOR_SERVICE_NAME").head
100+
assert(containerEnv.getValue === "svc-appId-exec-1")
101+
102+
assert(spec.executorKubernetesResources.size === 1)
103+
val resource = spec.executorKubernetesResources.head
104+
assert(resource.getKind === "Service")
105+
val service = resource.asInstanceOf[Service]
106+
assert(service.getMetadata.getName === "svc-appId-exec-1")
107+
assert(service.getSpec.getPorts.size() === 1)
108+
val port = service.getSpec.getPorts.get(0)
109+
assert(port.getName === "spark-block-manager")
110+
assert(port.getPort === 1234)
111+
}
112+
113+
test("SPARK-XXXXX: check executor kubernetes service requires block manager port") {
114+
val sparkConf = baseConf.clone.set(Config.KUBERNETES_EXECUTOR_ENABLE_SERVICE, true)
115+
val conf = KubernetesTestConf.createExecutorConf(sparkConf = sparkConf)
116+
val secMgr = new SecurityManager(sparkConf)
117+
val client = mock(classOf[KubernetesClient])
118+
val profile = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
119+
assertThrows[SparkIllegalArgumentException] {
120+
new KubernetesExecutorBuilder().buildFromFeatures(conf, secMgr, client, profile)
111121
}
112122
}
113123
}

0 commit comments

Comments
 (0)