From e086f4d9d861cfa46a5eda47412d53adc45c8fb9 Mon Sep 17 00:00:00 2001 From: sandflee Date: Wed, 19 Jul 2017 06:43:10 +0800 Subject: [PATCH] Add node selectors for driver and executor pods (#355) --- docs/running-on-kubernetes.md | 10 ++++++++++ .../deploy/kubernetes/ConfigurationUtils.scala | 14 ++++++++++++++ .../apache/spark/deploy/kubernetes/config.scala | 2 ++ .../submitsteps/BaseDriverConfigurationStep.scala | 3 +++ .../KubernetesClusterSchedulerBackend.scala | 6 ++++++ 5 files changed, 35 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 2b4e9a6f96af1..5e23801e15b10 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -758,6 +758,16 @@ from the other deployment modes. See the [configuration page](configuration.html Specify the hard cpu limit for a single executor pod + + spark.kubernetes.node.selector.[labelKey] + (none) + + Adds to the node selector of the driver pod and executor pods, with key labelKey and the value as the + configuration's value. For example, setting spark.kubernetes.node.selector.identifier to myIdentifier + will result in the driver pod and executors having a node selector with key identifier and value + myIdentifier. Multiple node selector keys can be added by setting multiple configurations with this prefix. + + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala index f461da4809b4d..1a008c236d00f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala @@ -65,4 +65,18 @@ object ConfigurationUtils extends Logging { } combined.toMap } + + def parsePrefixedKeyValuePairs( + sparkConf: SparkConf, + prefix: String, + configType: String): Map[String, String] = { + val fromPrefix = sparkConf.getAllWithPrefix(prefix) + fromPrefix.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for a given $configType key, got key $key with" + + s" values $values") + } + fromPrefix.toMap + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index e1c1ab9d459fc..c6772c1cb5ae4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -497,6 +497,8 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." + private[spark] def resolveK8sMaster(rawMasterString: String): String = { if (!rawMasterString.startsWith("k8s://")) { throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala index 022b5fccdc5e1..b3f509b44054e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala @@ -73,6 +73,8 @@ private[spark] class BaseDriverConfigurationStep( s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + s" Spark bookkeeping operations.") val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName) + val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs( + submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector") val driverCpuQuantity = new QuantityBuilder(false) .withAmount(driverCpuCores) .build() @@ -117,6 +119,7 @@ private[spark] class BaseDriverConfigurationStep( .endMetadata() .withNewSpec() .withRestartPolicy("Never") + .withNodeSelector(nodeSelector.asJava) .endSpec() .build() val resolvedSparkConf = driverSpec.driverSparkConf.clone() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index e5f980ad1f366..6dbe918f966e4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -84,6 +84,11 @@ private[spark] class KubernetesClusterSchedulerBackend( KUBERNETES_EXECUTOR_ANNOTATION_PREFIX, KUBERNETES_EXECUTOR_ANNOTATIONS, "executor annotation") + private val nodeSelector = + ConfigurationUtils.parsePrefixedKeyValuePairs( + conf, + KUBERNETES_NODE_SELECTOR_PREFIX, + "node-selector") private var shufflePodCache: Option[ShufflePodCache] = None private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY) @@ -449,6 +454,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .endMetadata() .withNewSpec() .withHostname(hostname) + .withNodeSelector(nodeSelector.asJava) .endSpec() .build()