Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 6b1caca

Browse files
authored
Use the driver pod IP address for spark.driver.bindAddress (#533)
* Use the driver pod IP address for spark.driver.bindAddress * Addressed comments * Addressed more comments * Fixed broken DriverServiceBootstrapStepSuite
1 parent 0abf0b9 commit 6b1caca

File tree

10 files changed

+95
-84
lines changed

10 files changed

+95
-84
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ package object constants {
5454
// Environment Variables
5555
private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
5656
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
57+
private[spark] val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
5758
private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
5859
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
5960
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.k8s.ConfigurationUtils
2121
import org.apache.spark.deploy.k8s.config._
2222
import org.apache.spark.deploy.k8s.constants._
23-
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
23+
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
2424
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
2525
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
2626
import org.apache.spark.launcher.SparkLauncher
@@ -103,7 +103,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
103103
mainClass,
104104
appArgs,
105105
submissionSparkConf)
106-
val driverAddressStep = new DriverAddressConfigurationStep(
106+
val driverAddressStep = new DriverServiceBootstrapStep(
107107
kubernetesResourceNamePrefix,
108108
allDriverLabels,
109109
submissionSparkConf,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStep.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.submit.submitsteps
1818

19-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, PodBuilder, QuantityBuilder}
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
2020
import scala.collection.JavaConverters._
2121

2222
import org.apache.spark.SparkConf
@@ -114,6 +114,12 @@ private[spark] class BaseDriverConfigurationStep(
114114
.withName(ENV_DRIVER_ARGS)
115115
.withValue(appArgs.mkString(" "))
116116
.endEnv()
117+
.addNewEnv()
118+
.withName(ENV_DRIVER_BIND_ADDRESS)
119+
.withValueFrom(new EnvVarSourceBuilder()
120+
.withNewFieldRef("v1", "status.podIP")
121+
.build())
122+
.endEnv()
117123
.withNewResources()
118124
.addToRequests("cpu", driverCpuQuantity)
119125
.addToRequests("memory", driverMemoryQuantity)
Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,20 @@ import org.apache.spark.util.Clock
2929
* Allows the driver to be reachable by executor pods through a headless service. The service's
3030
* ports should correspond to the ports that the executor will reach the pod at for RPC.
3131
*/
32-
private[spark] class DriverAddressConfigurationStep(
32+
private[spark] class DriverServiceBootstrapStep(
3333
kubernetesResourceNamePrefix: String,
3434
driverLabels: Map[String, String],
3535
submissionSparkConf: SparkConf,
3636
clock: Clock) extends DriverConfigurationStep with Logging {
37-
import DriverAddressConfigurationStep._
37+
import DriverServiceBootstrapStep._
3838

3939
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
4040
require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
41-
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" +
42-
s" will be managed via a Kubernetes service.")
41+
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind" +
42+
s" address is managed and set to the driver pod's IP address.")
4343
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
44-
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
45-
s" managed via a Kubernetes service.")
44+
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
45+
s" managed via a Kubernetes service.")
4646

4747
val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
4848
val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
@@ -51,8 +51,8 @@ private[spark] class DriverAddressConfigurationStep(
5151
val randomServiceId = clock.getTimeMillis()
5252
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
5353
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" +
54-
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
55-
s" as the driver service's name.")
54+
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
55+
s" as the driver service's name.")
5656
shorterServiceName
5757
}
5858

@@ -82,19 +82,18 @@ private[spark] class DriverAddressConfigurationStep(
8282
val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
8383
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
8484
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
85-
.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, driverHostname)
8685
.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname)
8786
.set("spark.driver.port", driverPort.toString)
8887
.set(
89-
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
88+
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)
9089

9190
driverSpec.copy(
92-
driverSparkConf = resolvedSparkConf,
93-
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
91+
driverSparkConf = resolvedSparkConf,
92+
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
9493
}
9594
}
9695

97-
private[spark] object DriverAddressConfigurationStep {
96+
private[spark] object DriverServiceBootstrapStep {
9897
val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
9998
val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
10099
val DRIVER_SVC_POSTFIX = "-driver-svc"

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
1818

1919
import org.apache.spark.{SparkConf, SparkFunSuite}
2020
import org.apache.spark.deploy.k8s.config._
21-
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
21+
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
2222

2323
private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {
2424

@@ -50,7 +50,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
5050
validateStepTypes(
5151
orchestrator,
5252
classOf[BaseDriverConfigurationStep],
53-
classOf[DriverAddressConfigurationStep],
53+
classOf[DriverServiceBootstrapStep],
5454
classOf[DriverKubernetesCredentialsStep],
5555
classOf[DependencyResolutionStep],
5656
classOf[LocalDirectoryMountConfigurationStep])
@@ -74,7 +74,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
7474
validateStepTypes(
7575
orchestrator,
7676
classOf[BaseDriverConfigurationStep],
77-
classOf[DriverAddressConfigurationStep],
77+
classOf[DriverServiceBootstrapStep],
7878
classOf[DriverKubernetesCredentialsStep],
7979
classOf[DependencyResolutionStep],
8080
classOf[LocalDirectoryMountConfigurationStep],
@@ -97,7 +97,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
9797
validateStepTypes(
9898
orchestrator,
9999
classOf[BaseDriverConfigurationStep],
100-
classOf[DriverAddressConfigurationStep],
100+
classOf[DriverServiceBootstrapStep],
101101
classOf[DriverKubernetesCredentialsStep],
102102
classOf[DependencyResolutionStep],
103103
classOf[LocalDirectoryMountConfigurationStep],
@@ -120,7 +120,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
120120
validateStepTypes(
121121
orchestrator,
122122
classOf[BaseDriverConfigurationStep],
123-
classOf[DriverAddressConfigurationStep],
123+
classOf[DriverServiceBootstrapStep],
124124
classOf[DriverKubernetesCredentialsStep],
125125
classOf[DependencyResolutionStep],
126126
classOf[LocalDirectoryMountConfigurationStep],
@@ -144,7 +144,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
144144
validateStepTypes(
145145
orchestrator,
146146
classOf[BaseDriverConfigurationStep],
147-
classOf[DriverAddressConfigurationStep],
147+
classOf[DriverServiceBootstrapStep],
148148
classOf[DriverKubernetesCredentialsStep],
149149
classOf[DependencyResolutionStep],
150150
classOf[LocalDirectoryMountConfigurationStep],
@@ -169,7 +169,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
169169
validateStepTypes(
170170
orchestrator,
171171
classOf[BaseDriverConfigurationStep],
172-
classOf[DriverAddressConfigurationStep],
172+
classOf[DriverServiceBootstrapStep],
173173
classOf[DriverKubernetesCredentialsStep],
174174
classOf[DependencyResolutionStep],
175175
classOf[LocalDirectoryMountConfigurationStep],

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/BaseDriverConfigurationStepSuite.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,23 +65,30 @@ private[spark] class BaseDriverConfigurationStepSuite extends SparkFunSuite {
6565
driverContainer = new ContainerBuilder().build(),
6666
driverSparkConf = new SparkConf(false),
6767
otherKubernetesResources = Seq.empty[HasMetadata])
68-
6968
val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)
69+
7070
assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
7171
assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
7272
assert(preparedDriverSpec.driverContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY)
73+
74+
assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
7375
val envs = preparedDriverSpec.driverContainer
7476
.getEnv
7577
.asScala
7678
.map(env => (env.getName, env.getValue))
7779
.toMap
78-
assert(envs.size === 6)
7980
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar")
8081
assert(envs(ENV_DRIVER_MEMORY) === "256M")
8182
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
8283
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2")
8384
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
8485
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
86+
87+
assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
88+
envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
89+
envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
90+
envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))
91+
8592
val resourceRequirements = preparedDriverSpec.driverContainer.getResources
8693
val requests = resourceRequirements.getRequests.asScala
8794
assert(requests("cpu").getAmount === "2")

0 commit comments

Comments
 (0)