diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 2b4e9a6f96af1..5e23801e15b10 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -758,6 +758,16 @@ from the other deployment modes. See the [configuration page](configuration.html
Specify the hard cpu limit for a single executor pod
+
+ spark.kubernetes.node.selector.[labelKey] |
+ (none) |
+
+ Adds to the node selector of the driver pod and executor pods, with key labelKey and the value as the
+ configuration's value. For example, setting spark.kubernetes.node.selector.identifier to myIdentifier
+ will result in the driver pod and executors having a node selector with key identifier and value
+ myIdentifier . Multiple node selector keys can be added by setting multiple configurations with this prefix.
+ |
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala
index f461da4809b4d..1a008c236d00f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ConfigurationUtils.scala
@@ -65,4 +65,18 @@ object ConfigurationUtils extends Logging {
}
combined.toMap
}
+
+ def parsePrefixedKeyValuePairs(
+ sparkConf: SparkConf,
+ prefix: String,
+ configType: String): Map[String, String] = {
+ val fromPrefix = sparkConf.getAllWithPrefix(prefix)
+ fromPrefix.groupBy(_._1).foreach {
+ case (key, values) =>
+ require(values.size == 1,
+ s"Cannot have multiple values for a given $configType key, got key $key with" +
+ s" values $values")
+ }
+ fromPrefix.toMap
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
index e1c1ab9d459fc..c6772c1cb5ae4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -497,6 +497,8 @@ package object config extends Logging {
.stringConf
.createOptional
+ private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
+
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala
index 022b5fccdc5e1..b3f509b44054e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/BaseDriverConfigurationStep.scala
@@ -73,6 +73,8 @@ private[spark] class BaseDriverConfigurationStep(
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
+ val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
+ submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.build()
@@ -117,6 +119,7 @@ private[spark] class BaseDriverConfigurationStep(
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
+ .withNodeSelector(nodeSelector.asJava)
.endSpec()
.build()
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
index e5f980ad1f366..6dbe918f966e4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
@@ -84,6 +84,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
KUBERNETES_EXECUTOR_ANNOTATIONS,
"executor annotation")
+ private val nodeSelector =
+ ConfigurationUtils.parsePrefixedKeyValuePairs(
+ conf,
+ KUBERNETES_NODE_SELECTOR_PREFIX,
+ "node-selector")
private var shufflePodCache: Option[ShufflePodCache] = None
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY)
@@ -449,6 +454,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.endMetadata()
.withNewSpec()
.withHostname(hostname)
+ .withNodeSelector(nodeSelector.asJava)
.endSpec()
.build()