Skip to content

Commit

Permalink
Add node selectors for driver and executor pods (apache#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
sandflee authored and foxish committed Jul 24, 2017
1 parent 2c00103 commit e086f4d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 0 deletions.
10 changes: 10 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,16 @@ from the other deployment modes. See the [configuration page](configuration.html
Specify the hard cpu limit for a single executor pod
</td>
</tr>
<tr>
<td><code>spark.kubernetes.node.selector.[labelKey]</code></td>
<td>(none)</td>
<td>
Adds to the node selector of the driver pod and executor pods, with key <code>labelKey</code> and the value as the
configuration's value. For example, setting <code>spark.kubernetes.node.selector.identifier</code> to <code>myIdentifier</code>
will result in the driver pod and executors having a node selector with key <code>identifier</code> and value
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,18 @@ object ConfigurationUtils extends Logging {
}
combined.toMap
}

def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String,
configType: String): Map[String, String] = {
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
fromPrefix.groupBy(_._1).foreach {
case (key, values) =>
require(values.size == 1,
s"Cannot have multiple values for a given $configType key, got key $key with" +
s" values $values")
}
fromPrefix.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class BaseDriverConfigurationStep(
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.build()
Expand Down Expand Up @@ -117,6 +119,7 @@ private[spark] class BaseDriverConfigurationStep(
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.endSpec()
.build()
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
KUBERNETES_EXECUTOR_ANNOTATIONS,
"executor annotation")
private val nodeSelector =
ConfigurationUtils.parsePrefixedKeyValuePairs(
conf,
KUBERNETES_NODE_SELECTOR_PREFIX,
"node-selector")
private var shufflePodCache: Option[ShufflePodCache] = None
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY)
Expand Down Expand Up @@ -449,6 +454,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.endMetadata()
.withNewSpec()
.withHostname(hostname)
.withNodeSelector(nodeSelector.asJava)
.endSpec()
.build()

Expand Down

0 comments on commit e086f4d

Please sign in to comment.