Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 0612195

Browse files
liyinan926mccheah
authored andcommitted
Allow user-specified environment variables and secrets in the init-container (#564)
* Allow setting user-specified environments in the init-container * Use driver/executor env keys for the init-container * Mount user-specified driver/executor secrets * Addressed comments
1 parent 15a333c commit 0612195

File tree

9 files changed

+349
-66
lines changed

9 files changed

+349
-66
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}
19+
import scala.collection.JavaConverters._
2020

21+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder}
22+
23+
import org.apache.spark.{SparkConf, SparkException}
24+
import org.apache.spark.deploy.k8s.config._
2125
import org.apache.spark.deploy.k8s.constants._
2226

2327
/**
@@ -47,7 +51,9 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
4751
filesDownloadPath: String,
4852
downloadTimeoutMinutes: Long,
4953
initContainerConfigMapName: String,
50-
initContainerConfigMapKey: String)
54+
initContainerConfigMapKey: String,
55+
sparkRole: String,
56+
sparkConf: SparkConf)
5157
extends SparkPodInitContainerBootstrap {
5258

5359
override def bootstrapInitContainerAndVolumes(
@@ -62,17 +68,32 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
6268
.withMountPath(filesDownloadPath)
6369
.build())
6470

71+
val initContainerCustomEnvVarKeyPrefix = sparkRole match {
72+
case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY
73+
case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv."
74+
case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role")
75+
}
76+
val initContainerCustomEnvVars = sparkConf.getAllWithPrefix(initContainerCustomEnvVarKeyPrefix)
77+
.toSeq
78+
.map(env =>
79+
new EnvVarBuilder()
80+
.withName(env._1)
81+
.withValue(env._2)
82+
.build())
83+
6584
val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer)
6685
.withName(s"spark-init")
6786
.withImage(initContainerImage)
6887
.withImagePullPolicy(dockerImagePullPolicy)
88+
.addAllToEnv(initContainerCustomEnvVars.asJava)
6989
.addNewVolumeMount()
7090
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
7191
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
7292
.endVolumeMount()
7393
.addToVolumeMounts(sharedVolumeMounts: _*)
7494
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
7595
.build()
96+
7697
val podWithBasicVolumes = new PodBuilder(podWithDetachedInitContainer.pod)
7798
.editSpec()
7899
.addNewVolume()
@@ -95,6 +116,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
95116
.endVolume()
96117
.endSpec()
97118
.build()
119+
98120
val mainContainerWithMountedFiles = new ContainerBuilder(
99121
podWithDetachedInitContainer.mainContainer)
100122
.addToVolumeMounts(sharedVolumeMounts: _*)
@@ -103,6 +125,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
103125
.withValue(filesDownloadPath)
104126
.endEnv()
105127
.build()
128+
106129
PodWithDetachedInitContainer(
107130
podWithBasicVolumes,
108131
initContainer,

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer
1818

1919
import org.apache.spark.SparkConf
20-
import org.apache.spark.deploy.k8s.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
20+
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
2121
import org.apache.spark.deploy.k8s.config._
2222
import org.apache.spark.deploy.k8s.constants._
23-
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl}
23+
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl}
2424
import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
2525
import org.apache.spark.util.Utils
2626

@@ -43,7 +43,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
4343
private val submittedResourcesSecretName = s"$kubernetesResourceNamePrefix-init-secret"
4444
private val resourceStagingServerUri = submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI)
4545
private val resourceStagingServerInternalUri =
46-
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI)
46+
submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI)
4747
private val initContainerImage = submissionSparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
4848
private val downloadTimeoutMinutes = submissionSparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
4949
private val maybeResourceStagingServerInternalTrustStore =
@@ -92,46 +92,62 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
9292

9393
def getAllConfigurationSteps(): Seq[InitContainerConfigurationStep] = {
9494
val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl(
95-
initContainerImage,
96-
dockerImagePullPolicy,
97-
jarsDownloadPath,
98-
filesDownloadPath,
99-
downloadTimeoutMinutes,
100-
initContainerConfigMapName,
101-
initContainerConfigMapKey)
95+
initContainerImage,
96+
dockerImagePullPolicy,
97+
jarsDownloadPath,
98+
filesDownloadPath,
99+
downloadTimeoutMinutes,
100+
initContainerConfigMapName,
101+
initContainerConfigMapKey,
102+
SPARK_POD_DRIVER_ROLE,
103+
submissionSparkConf)
102104
val baseInitContainerStep = new BaseInitContainerConfigurationStep(
103-
sparkJars,
104-
sparkFiles,
105-
jarsDownloadPath,
106-
filesDownloadPath,
107-
initContainerConfigMapName,
108-
initContainerConfigMapKey,
109-
initContainerBootstrap)
110-
val submittedResourcesInitContainerStep = resourceStagingServerUri.map {
111-
stagingServerUri =>
105+
sparkJars,
106+
sparkFiles,
107+
jarsDownloadPath,
108+
filesDownloadPath,
109+
initContainerConfigMapName,
110+
initContainerConfigMapKey,
111+
initContainerBootstrap)
112+
113+
val submittedResourcesInitContainerStep = resourceStagingServerUri.map { stagingServerUri =>
112114
val mountSecretPlugin = new InitContainerResourceStagingServerSecretPluginImpl(
113-
submittedResourcesSecretName,
114-
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
115+
submittedResourcesSecretName,
116+
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
115117
val submittedDependencyUploader = new SubmittedDependencyUploaderImpl(
116-
driverLabels,
117-
namespace,
118-
stagingServerUri,
119-
sparkJars,
120-
sparkFiles,
121-
new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions,
122-
RetrofitClientFactoryImpl)
118+
driverLabels,
119+
namespace,
120+
stagingServerUri,
121+
sparkJars,
122+
sparkFiles,
123+
new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions,
124+
RetrofitClientFactoryImpl)
123125
new SubmittedResourcesInitContainerConfigurationStep(
124-
submittedResourcesSecretName,
125-
resourceStagingServerInternalUri.getOrElse(stagingServerUri),
126-
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
127-
resourceStagingServerInternalSslEnabled,
128-
maybeResourceStagingServerInternalTrustStore,
129-
maybeResourceStagingServerInternalClientCert,
130-
maybeResourceStagingServerInternalTrustStorePassword,
131-
maybeResourceStagingServerInternalTrustStoreType,
132-
submittedDependencyUploader,
133-
mountSecretPlugin)
126+
submittedResourcesSecretName,
127+
resourceStagingServerInternalUri.getOrElse(stagingServerUri),
128+
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
129+
resourceStagingServerInternalSslEnabled,
130+
maybeResourceStagingServerInternalTrustStore,
131+
maybeResourceStagingServerInternalClientCert,
132+
maybeResourceStagingServerInternalTrustStorePassword,
133+
maybeResourceStagingServerInternalTrustStoreType,
134+
submittedDependencyUploader,
135+
mountSecretPlugin)
136+
}
137+
138+
val driverSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(
139+
submissionSparkConf,
140+
KUBERNETES_DRIVER_SECRETS_PREFIX,
141+
"driver secrets")
142+
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
143+
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
144+
Some(new InitContainerMountSecretsStep(mountSecretsBootstrap))
145+
} else {
146+
None
134147
}
135-
Seq(baseInitContainerStep) ++ submittedResourcesInitContainerStep.toSeq
148+
149+
Seq(baseInitContainerStep) ++
150+
submittedResourcesInitContainerStep.toSeq ++
151+
mountSecretsStep.toSeq
136152
}
137153
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer
18+
19+
import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap
20+
21+
/**
22+
* An init-container configuration step for mounting user-specified secrets onto user-specified
23+
* paths.
24+
*
25+
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
26+
*/
27+
private[spark] class InitContainerMountSecretsStep(
28+
mountSecretsBootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
29+
30+
override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = {
31+
val (podWithSecretsMounted, initContainerWithSecretsMounted) =
32+
mountSecretsBootstrap.mountSecrets(
33+
initContainerSpec.podToInitialize,
34+
initContainerSpec.initContainer)
35+
initContainerSpec.copy(
36+
podToInitialize = podWithSecretsMounted,
37+
initContainer = initContainerWithSecretsMounted
38+
)
39+
}
40+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
*/
1717
package org.apache.spark.scheduler.cluster.k8s
1818

19-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder}
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
2020
import scala.collection.JavaConverters._
2121

22-
import org.apache.spark.{SparkConf, SparkException}
22+
import org.apache.spark.SparkConf
2323
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
2424
import org.apache.spark.deploy.k8s.config._
2525
import org.apache.spark.deploy.k8s.constants._
@@ -44,6 +44,7 @@ private[spark] class ExecutorPodFactoryImpl(
4444
mountSecretsBootstrap: Option[MountSecretsBootstrap],
4545
mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap],
4646
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
47+
executorInitContainerMountSecretsBootstrap: Option[MountSecretsBootstrap],
4748
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
4849
executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider)
4950
extends ExecutorPodFactory {
@@ -82,9 +83,6 @@ private[spark] class ExecutorPodFactoryImpl(
8283
private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT)
8384
private val blockmanagerPort = sparkConf
8485
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
85-
private val kubernetesDriverPodName = sparkConf
86-
.get(KUBERNETES_DRIVER_POD_NAME)
87-
.getOrElse(throw new SparkException("Must specify the driver pod name"))
8886

8987
private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
9088

@@ -234,6 +232,7 @@ private[spark] class ExecutorPodFactoryImpl(
234232
bootstrap.mountSmallFilesSecret(
235233
withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)
236234
}.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer))
235+
237236
val (executorPodWithInitContainer, initBootstrappedExecutorContainer) =
238237
executorInitContainerBootstrap.map { bootstrap =>
239238
val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes(
@@ -247,8 +246,13 @@ private[spark] class ExecutorPodFactoryImpl(
247246
podWithDetachedInitContainer.initContainer)
248247
}.getOrElse(podWithDetachedInitContainer.initContainer)
249248

249+
val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) =
250+
executorInitContainerMountSecretsBootstrap.map { bootstrap =>
251+
bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer)
252+
}.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer)
253+
250254
val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer(
251-
podWithDetachedInitContainer.pod, resolvedInitContainer)
255+
mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted)
252256

253257
val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin =>
254258
plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
2929
import org.apache.spark.network.netty.SparkTransportConf
3030
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
3131
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
32-
import org.apache.spark.util.{ThreadUtils, Utils}
32+
import org.apache.spark.util.ThreadUtils
3333

3434
private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
3535

@@ -78,7 +78,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
7878
sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION),
7979
sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT),
8080
configMap,
81-
configMapKey)
81+
configMapKey,
82+
SPARK_POD_EXECUTOR_ROLE,
83+
sparkConf)
8284
}
8385

8486
val mountSmallFilesBootstrap = for {
@@ -95,6 +97,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
9597
} else {
9698
None
9799
}
100+
val executorInitContainerMountSecretsBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
101+
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
102+
} else {
103+
None
104+
}
98105

99106
if (maybeInitContainerConfigMap.isEmpty) {
100107
logWarning("The executor's init-container config map was not specified. Executors will" +
@@ -133,6 +140,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
133140
mountSecretBootstrap,
134141
mountSmallFilesBootstrap,
135142
executorInitContainerBootstrap,
143+
executorInitContainerMountSecretsBootstrap,
136144
executorInitContainerSecretVolumePlugin,
137145
executorLocalDirVolumeProvider)
138146
val allocatorExecutor = ThreadUtils

0 commit comments

Comments
 (0)