Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Use a headless service to give a hostname to the driver. #483

Merged
merged 8 commits into from
Sep 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ should give you a list of pods and configmaps (if any) respectively.
* You must have a spark distribution with Kubernetes support. This may be obtained from the
[release tarball](https://github.com/apache-spark-on-k8s/spark/releases) or by
[building Spark with Kubernetes support](../resource-managers/kubernetes/README.md#building-spark-with-kubernetes-support).
* You must have [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/) configured in
your cluster.

## Driver & Executor Images

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ package object constants {
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"

// Default and fixed ports
private[spark] val SUBMISSION_SERVER_PORT = 7077
private[spark] val DEFAULT_DRIVER_PORT = 7078
private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079
private[spark] val DEFAULT_UI_PORT = 4040
private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager"
private[spark] val DRIVER_PORT_NAME = "driver-rpc-port"
private[spark] val EXECUTOR_PORT_NAME = "executor"

// Environment Variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.ConfigurationUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}
import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
import org.apache.spark.util.{SystemClock, Utils}

/**
* Constructs the complete list of driver configuration steps to run to deploy the Spark driver.
Expand Down Expand Up @@ -97,7 +97,11 @@ private[spark] class DriverConfigurationStepsOrchestrator(
mainClass,
appArgs,
submissionSparkConf)

val driverAddressStep = new DriverAddressConfigurationStep(
kubernetesResourceNamePrefix,
allDriverLabels,
submissionSparkConf,
new SystemClock)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)

Expand Down Expand Up @@ -176,6 +180,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(

Seq(
initialSubmissionStep,
driverAddressStep,
kubernetesCredentialsStep,
dependencyResolutionStep) ++
submittedDependenciesBootstrapSteps ++
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps

import io.fabric8.kubernetes.api.model.ServiceBuilder
import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Clock

/**
* Allows the driver to be reachable by executor pods through a headless service. The service's
* ports should correspond to the ports that the executor will reach the pod at for RPC.
*/
private[spark] class DriverAddressConfigurationStep(
kubernetesResourceNamePrefix: String,
driverLabels: Map[String, String],
submissionSparkConf: SparkConf,
clock: Clock) extends DriverConfigurationStep with Logging {
import DriverAddressConfigurationStep._

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" +
s" will be managed via a Kubernetes service.")
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" +
s" managed via a Kubernetes service.")

val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the name prefix is derived from the app name, but we don't want the app name to be constrained because of service name limits. Nevertheless this still is far from ideal and I would appreciate suggestions on how we can do this better.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the service name isn't nearly as visible to users as pod names are, I don't think naming is all that important here. What you have seems fine to me!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might expose the driver UI using the DNS name now. So, I think it would be good to make sure it's intelligible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a great way to shorten the name given a long app name though. It should be fine to make the name less intelligible in the corner cases where the given app name is long.

preferredServiceName
} else {
val randomServiceId = clock.getTimeMillis()
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" +
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" +
s" as the driver service's name.")
shorterServiceName
}

val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT)
val driverBlockManagerPort = submissionSparkConf.getInt(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
val driverService = new ServiceBuilder()
.withNewMetadata()
.withName(resolvedServiceName)
.endMetadata()
.withNewSpec()
.withClusterIP("None")
.withSelector(driverLabels.asJava)
.addNewPort()
.withName(DRIVER_PORT_NAME)
.withPort(driverPort)
.withNewTargetPort(driverPort)
.endPort()
.addNewPort()
.withName(BLOCK_MANAGER_PORT_NAME)
.withPort(driverBlockManagerPort)
.withNewTargetPort(driverBlockManagerPort)
.endPort()
.endSpec()
.build()

val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this entire string need to be <= 63 characters?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just each component between the dots

val resolvedSparkConf = driverSpec.driverSparkConf.clone()
.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, driverHostname)
.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname)
.set("spark.driver.port", driverPort.toString)
.set(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort)

driverSpec.copy(
driverSparkConf = resolvedSparkConf,
otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService))
}
}

private[spark] object DriverAddressConfigurationStep {
val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
val DRIVER_SVC_POSTFIX = "-driver-svc"
val MAX_SERVICE_NAME_LENGTH = 63
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.submit.submitsteps._
import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep}

private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite {

Expand Down Expand Up @@ -50,6 +50,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep])
}
Expand All @@ -72,6 +73,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[InitContainerBootstrapStep])
Expand All @@ -93,6 +95,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[PythonStep])
Expand All @@ -114,6 +117,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[MountSmallLocalFilesStep])
Expand All @@ -137,6 +141,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS
validateStepTypes(
orchestrator,
classOf[BaseDriverConfigurationStep],
classOf[DriverAddressConfigurationStep],
classOf[DriverKubernetesCredentialsStep],
classOf[DependencyResolutionStep],
classOf[MountSecretsStep])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps

import io.fabric8.kubernetes.api.model.Service
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Mockito.when
import org.scalatest.BeforeAndAfter
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.util.Clock

private[spark] class DriverAddressConfigurationStepSuite
extends SparkFunSuite with BeforeAndAfter {

private val SHORT_RESOURCE_NAME_PREFIX =
"a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH -
DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length)

private val LONG_RESOURCE_NAME_PREFIX =
"a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH -
DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length + 1)
private val DRIVER_LABELS = Map(
"label1key" -> "label1value",
"label2key" -> "label2value")

@Mock
private var clock: Clock = _

private var sparkConf: SparkConf = _

before {
MockitoAnnotations.initMocks(this)
sparkConf = new SparkConf(false)
}

test("Headless service has a port for the driver RPC and the block manager.") {
val configurationStep = new DriverAddressConfigurationStep(
SHORT_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf
.set("spark.driver.port", "9000")
.set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080),
clock)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
assert(resolvedDriverSpec.otherKubernetesResources.size === 1)
assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service])
val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
verifyService(
9000,
8080,
s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}",
driverService)
}

test("Hostname and ports are set according to the service name.") {
val configurationStep = new DriverAddressConfigurationStep(
SHORT_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf
.set("spark.driver.port", "9000")
.set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080)
.set(KUBERNETES_NAMESPACE, "my-namespace"),
clock)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX
val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local"
verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
}

test("Ports should resolve to defaults in SparkConf and in the service.") {
val configurationStep = new DriverAddressConfigurationStep(
SHORT_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf,
clock)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
verifyService(
DEFAULT_DRIVER_PORT,
DEFAULT_BLOCKMANAGER_PORT,
s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}",
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service])
assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") ===
DEFAULT_DRIVER_PORT.toString)
assert(resolvedDriverSpec.driverSparkConf.get(
org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === DEFAULT_BLOCKMANAGER_PORT)
}

test("Long prefixes should switch to using a generated name.") {
val configurationStep = new DriverAddressConfigurationStep(
LONG_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"),
clock)
when(clock.getTimeMillis()).thenReturn(10000)
val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone())
val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
val expectedServiceName = s"spark-10000${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}"
assert(driverService.getMetadata.getName === expectedServiceName)
val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local"
verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName)
}

test("Disallow bind address and driver host to be set explicitly.") {
val configurationStep = new DriverAddressConfigurationStep(
LONG_RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"),
clock)
try {
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
fail("The driver bind address should not be allowed.")
} catch {
case e: Throwable =>
assert(e.getMessage ===
s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_BIND_ADDRESS_KEY} is" +
s" not supported in Kubernetes mode, as the driver's hostname will be managed via" +
s" a Kubernetes service.")
}
sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS)
sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host")
try {
configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf))
fail("The driver host address should not be allowed.")
} catch {
case e: Throwable =>
assert(e.getMessage ===
s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_HOST_KEY} is" +
s" not supported in Kubernetes mode, as the driver's hostname will be managed via" +
s" a Kubernetes service.")
}
}

private def verifyService(
driverPort: Int,
blockManagerPort: Int,
expectedServiceName: String,
service: Service): Unit = {
assert(service.getMetadata.getName === expectedServiceName)
assert(service.getSpec.getClusterIP === "None")
assert(service.getSpec.getSelector.asScala === DRIVER_LABELS)
assert(service.getSpec.getPorts.size() === 2)
val driverServicePorts = service.getSpec.getPorts.asScala
assert(driverServicePorts.head.getName === DRIVER_PORT_NAME)
assert(driverServicePorts.head.getPort.intValue() === driverPort)
assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort)
assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME)
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
}

private def verifySparkConfHostNames(
driverSparkConf: SparkConf, expectedHostName: String): Unit = {
assert(driverSparkConf.get(
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === expectedHostName)
assert(driverSparkConf.get(
org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) === expectedHostName)
}
}