Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 6cebfb6

Browse files
mccheahash211
authored andcommitted
Use a headless service to give a hostname to the driver. (#483)
* Use a headless service to give a hostname to the driver. Required since SPARK-21642 was added upstream. * Fix scalastyle. * Add back import * Fix conflict properly. * Fix orchestrator test.
1 parent 6053455 commit 6cebfb6

File tree

6 files changed

+300
-5
lines changed

6 files changed

+300
-5
lines changed

docs/running-on-kubernetes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ should give you a list of pods and configmaps (if any) respectively.
2424
* You must have a spark distribution with Kubernetes support. This may be obtained from the
2525
[release tarball](https://github.com/apache-spark-on-k8s/spark/releases) or by
2626
[building Spark with Kubernetes support](../resource-managers/kubernetes/README.md#building-spark-with-kubernetes-support).
27+
* You must have [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/) configured in
28+
your cluster.
2729

2830
## Driver & Executor Images
2931

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ package object constants {
4444
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"
4545

4646
// Default and fixed ports
47-
private[spark] val SUBMISSION_SERVER_PORT = 7077
4847
private[spark] val DEFAULT_DRIVER_PORT = 7078
4948
private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
5049
private[spark] val DEFAULT_UI_PORT = 4040
5150
private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
51+
private[spark] val DRIVER_PORT_NAME = "driver-rpc-port"
5252
private[spark] val EXECUTOR_PORT_NAME = "executor"
5353

5454
// Environment Variables

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
2121
import org.apache.spark.deploy.kubernetes.config._
2222
import org.apache.spark.deploy.kubernetes.constants._
23-
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
23+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
2424
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2525
import org.apache.spark.launcher.SparkLauncher
26-
import org.apache.spark.util.Utils
26+
import org.apache.spark.util.{SystemClock, Utils}
2727

2828
/**
2929
* Constructs the complete list of driver configuration steps to run to deploy the Spark driver.
@@ -97,7 +97,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
9797
mainClass,
9898
appArgs,
9999
submissionSparkConf)
100-
100+
val driverAddressStep = new DriverAddressConfigurationStep(
101+
kubernetesResourceNamePrefix,
102+
allDriverLabels,
103+
submissionSparkConf,
104+
new SystemClock)
101105
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
102106
submissionSparkConf, kubernetesResourceNamePrefix)
103107

@@ -176,6 +180,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
176180

177181
Seq(
178182
initialSubmissionStep,
183+
driverAddressStep,
179184
kubernetesCredentialsStep,
180185
dependencyResolutionStep) ++
181186
submittedDependenciesBootstrapSteps ++
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps
18+
19+
import io.fabric8.kubernetes.api.model.ServiceBuilder
20+
import scala.collection.JavaConverters._
21+
22+
import org.apache.spark.SparkConf
23+
import org.apache.spark.deploy.kubernetes.config._
24+
import org.apache.spark.deploy.kubernetes.constants._
25+
import org.apache.spark.internal.Logging
26+
import org.apache.spark.util.Clock
27+
28+
/**
29+
* Allows the driver to be reachable by executor pods through a headless service. The service's
30+
* ports should correspond to the ports that the executor will reach the pod at for RPC.
31+
*/
32+
private[spark] class DriverAddressConfigurationStep(
33+
kubernetesResourceNamePrefix: String,
34+
driverLabels: Map[String, String],
35+
submissionSparkConf: SparkConf,
36+
clock: Clock) extends DriverConfigurationStep with Logging {
37+
import DriverAddressConfigurationStep._
38+
39+
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
40+
require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
41+
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" +
42+
s" will be managed via a Kubernetes service.")
43+
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
44+
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
45+
s" managed via a Kubernetes service.")
46+
47+
val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
48+
val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
49+
preferredServiceName
50+
} else {
51+
val randomServiceId = clock.getTimeMillis()
52+
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
53+
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" +
54+
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
55+
s" as the driver service's name.")
56+
shorterServiceName
57+
}
58+
59+
val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
60+
val driverBlockManagerPort = submissionSparkConf.getInt(
61+
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
62+
val driverService = new ServiceBuilder()
63+
.withNewMetadata()
64+
.withName(resolvedServiceName)
65+
.endMetadata()
66+
.withNewSpec()
67+
.withClusterIP("None")
68+
.withSelector(driverLabels.asJava)
69+
.addNewPort()
70+
.withName(DRIVER_PORT_NAME)
71+
.withPort(driverPort)
72+
.withNewTargetPort(driverPort)
73+
.endPort()
74+
.addNewPort()
75+
.withName(BLOCK_MANAGER_PORT_NAME)
76+
.withPort(driverBlockManagerPort)
77+
.withNewTargetPort(driverBlockManagerPort)
78+
.endPort()
79+
.endSpec()
80+
.build()
81+
82+
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
83+
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
84+
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
85+
.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, driverHostname)
86+
.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname)
87+
.set("spark.driver.port", driverPort.toString)
88+
.set(
89+
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
90+
91+
driverSpec.copy(
92+
driverSparkConf = resolvedSparkConf,
93+
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
94+
}
95+
}
96+
97+
private[spark] object DriverAddressConfigurationStep {
98+
val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
99+
val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
100+
val DRIVER_SVC_POSTFIX = "-driver-svc"
101+
val MAX_SERVICE_NAME_LENGTH = 63
102+
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit
1818

1919
import org.apache.spark.{SparkConf, SparkFunSuite}
2020
import org.apache.spark.deploy.kubernetes.config._
21-
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
21+
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
2222

2323
private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
2424

@@ -50,6 +50,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
5050
validateStepTypes(
5151
orchestrator,
5252
classOf[BaseDriverConfigurationStep],
53+
classOf[DriverAddressConfigurationStep],
5354
classOf[DriverKubernetesCredentialsStep],
5455
classOf[DependencyResolutionStep])
5556
}
@@ -72,6 +73,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
7273
validateStepTypes(
7374
orchestrator,
7475
classOf[BaseDriverConfigurationStep],
76+
classOf[DriverAddressConfigurationStep],
7577
classOf[DriverKubernetesCredentialsStep],
7678
classOf[DependencyResolutionStep],
7779
classOf[InitContainerBootstrapStep])
@@ -93,6 +95,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
9395
validateStepTypes(
9496
orchestrator,
9597
classOf[BaseDriverConfigurationStep],
98+
classOf[DriverAddressConfigurationStep],
9699
classOf[DriverKubernetesCredentialsStep],
97100
classOf[DependencyResolutionStep],
98101
classOf[PythonStep])
@@ -114,6 +117,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
114117
validateStepTypes(
115118
orchestrator,
116119
classOf[BaseDriverConfigurationStep],
120+
classOf[DriverAddressConfigurationStep],
117121
classOf[DriverKubernetesCredentialsStep],
118122
classOf[DependencyResolutionStep],
119123
classOf[MountSmallLocalFilesStep])
@@ -137,6 +141,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
137141
validateStepTypes(
138142
orchestrator,
139143
classOf[BaseDriverConfigurationStep],
144+
classOf[DriverAddressConfigurationStep],
140145
classOf[DriverKubernetesCredentialsStep],
141146
classOf[DependencyResolutionStep],
142147
classOf[MountSecretsStep])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes.submit.submitsteps
18+
19+
import io.fabric8.kubernetes.api.model.Service
20+
import org.mockito.{Mock, MockitoAnnotations}
21+
import org.mockito.Mockito.when
22+
import org.scalatest.BeforeAndAfter
23+
import scala.collection.JavaConverters._
24+
25+
import org.apache.spark.{SparkConf, SparkFunSuite}
26+
import org.apache.spark.deploy.kubernetes.config._
27+
import org.apache.spark.deploy.kubernetes.constants._
28+
import org.apache.spark.util.Clock
29+
30+
private[spark] class DriverAddressConfigurationStepSuite
31+
extends SparkFunSuite with BeforeAndAfter {
32+
33+
private val SHORT_RESOURCE_NAME_PREFIX =
34+
"a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH -
35+
DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length)
36+
37+
private val LONG_RESOURCE_NAME_PREFIX =
38+
"a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH -
39+
DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length + 1)
40+
private val DRIVER_LABELS = Map(
41+
"label1key" -> "label1value",
42+
"label2key" -> "label2value")
43+
44+
@Mock
45+
private var clock: Clock = _
46+
47+
private var sparkConf: SparkConf = _
48+
49+
before {
50+
MockitoAnnotations.initMocks(this)
51+
sparkConf = new SparkConf(false)
52+
}
53+
54+
test("Headless service has a port for the driver RPC and the block manager.") {
55+
val configurationStep = new DriverAddressConfigurationStep(
56+
SHORT_RESOURCE_NAME_PREFIX,
57+
DRIVER_LABELS,
58+
sparkConf
59+
.set("spark.driver.port", "9000")
60+
.set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080),
61+
clock)
62+
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
63+
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
64+
assert(resolvedDriverSpec.otherKubernetesResources.size === 1)
65+
assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service])
66+
val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
67+
verifyService(
68+
9000,
69+
8080,
70+
s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}",
71+
driverService)
72+
}
73+
74+
test("Hostname and ports are set according to the service name.") {
75+
val configurationStep = new DriverAddressConfigurationStep(
76+
SHORT_RESOURCE_NAME_PREFIX,
77+
DRIVER_LABELS,
78+
sparkConf
79+
.set("spark.driver.port", "9000")
80+
.set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
81+
.set(KUBERNETES_NAMESPACE, "my-namespace"),
82+
clock)
83+
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
84+
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
85+
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
86+
DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX
87+
val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local"
88+
verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
89+
}
90+
91+
test("Ports should resolve to defaults in SparkConf and in the service.") {
92+
val configurationStep = new DriverAddressConfigurationStep(
93+
SHORT_RESOURCE_NAME_PREFIX,
94+
DRIVER_LABELS,
95+
sparkConf,
96+
clock)
97+
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
98+
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
99+
verifyService(
100+
DEFAULT_DRIVER_PORT,
101+
DEFAULT_BLOCKMANAGER_PORT,
102+
s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}",
103+
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service])
104+
assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") ===
105+
DEFAULT_DRIVER_PORT.toString)
106+
assert(resolvedDriverSpec.driverSparkConf.get(
107+
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === DEFAULT_BLOCKMANAGER_PORT)
108+
}
109+
110+
test("Long prefixes should switch to using a generated name.") {
111+
val configurationStep = new DriverAddressConfigurationStep(
112+
LONG_RESOURCE_NAME_PREFIX,
113+
DRIVER_LABELS,
114+
sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
115+
clock)
116+
when(clock.getTimeMillis()).thenReturn(10000)
117+
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
118+
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
119+
val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
120+
val expectedServiceName = s"spark-10000${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}"
121+
assert(driverService.getMetadata.getName === expectedServiceName)
122+
val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local"
123+
verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
124+
}
125+
126+
test("Disallow bind address and driver host to be set explicitly.") {
127+
val configurationStep = new DriverAddressConfigurationStep(
128+
LONG_RESOURCE_NAME_PREFIX,
129+
DRIVER_LABELS,
130+
sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"),
131+
clock)
132+
try {
133+
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
134+
fail("The driver bind address should not be allowed.")
135+
} catch {
136+
case e: Throwable =>
137+
assert(e.getMessage ===
138+
s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_BIND_ADDRESS_KEY} is" +
139+
s" not supported in Kubernetes mode, as the driver's hostname will be managed via" +
140+
s" a Kubernetes service.")
141+
}
142+
sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
143+
sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
144+
try {
145+
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
146+
fail("The driver host address should not be allowed.")
147+
} catch {
148+
case e: Throwable =>
149+
assert(e.getMessage ===
150+
s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_HOST_KEY} is" +
151+
s" not supported in Kubernetes mode, as the driver's hostname will be managed via" +
152+
s" a Kubernetes service.")
153+
}
154+
}
155+
156+
private def verifyService(
157+
driverPort: Int,
158+
blockManagerPort: Int,
159+
expectedServiceName: String,
160+
service: Service): Unit = {
161+
assert(service.getMetadata.getName === expectedServiceName)
162+
assert(service.getSpec.getClusterIP === "None")
163+
assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
164+
assert(service.getSpec.getPorts.size() === 2)
165+
val driverServicePorts = service.getSpec.getPorts.asScala
166+
assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
167+
assert(driverServicePorts.head.getPort.intValue() === driverPort)
168+
assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
169+
assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
170+
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
171+
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
172+
}
173+
174+
private def verifySparkConfHostNames(
175+
driverSparkConf: SparkConf, expectedHostName: String): Unit = {
176+
assert(driverSparkConf.get(
177+
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === expectedHostName)
178+
assert(driverSparkConf.get(
179+
org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) === expectedHostName)
180+
}
181+
}

0 commit comments

Comments
 (0)