Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 246b885

Browse files
ifilonenkoliyinan926
authored andcommitted
Basic Secure HDFS Support [514] (#540)
* first stage of PR #514 of just logic * fixing build and unit test issues * fixed integration tests * fixed issue with executorPodFactory unit tests * first series of PR comments * handle most PR comments * third round of PR comments * initial round of comments and initial unit tests for deploy * handled most of the comments and added test cases for pods * resolve conflicts * merge conflicts * adding thread sleeping for RSS issues as a test * resolving comments and unit testing * regarding comments on PR
1 parent 0612195 commit 246b885

34 files changed

+2347
-44
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ object SparkSubmit extends CommandLineUtils {
573573
}
574574

575575
// assure a keytab is available from any place in a JVM
576-
if (clusterManager == YARN || clusterManager == LOCAL) {
576+
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
577577
if (args.principal != null) {
578578
require(args.keytab != null, "Keytab must be specified when principal is specified")
579579
if (!new File(args.keytab).exists()) {

docs/running-on-kubernetes.md

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,61 @@ from the other deployment modes. See the [configuration page](configuration.html
752752
</td>
753753
</tr>
754754
<tr>
755+
<td><code>spark.kubernetes.kerberos.enabled</code></td>
756+
<td>false</td>
757+
<td>
758+
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
759+
will assume that you will not require secure HDFS access.
760+
</td>
761+
</tr>
762+
<tr>
763+
<td><code>spark.kubernetes.kerberos.keytab</code></td>
764+
<td>(none)</td>
765+
<td>
766+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
767+
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
768+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
769+
will look within your local TGT cache to resolve this.
770+
</td>
771+
</tr>
772+
<tr>
773+
<td><code>spark.kubernetes.kerberos.principal</code></td>
774+
<td>(none)</td>
775+
<td>
776+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
777+
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
778+
may login by running <code>kinit</code> before running the spark-submit, and the submission client
779+
will look within your local TGT cache to resolve this.
780+
</td>
781+
</tr>
782+
<tr>
783+
<td><code>spark.kubernetes.kerberos.renewer.principal</code></td>
784+
<td>(none)</td>
785+
<td>
786+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
787+
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as
788+
we will set the principal to be the job users principal by default.
789+
</td>
790+
</tr>
791+
<tr>
792+
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
793+
<td>(none)</td>
794+
<td>
795+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
796+
the name of the secret where your existing delegation token data is stored. You must also specify the
797+
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
798+
This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
799+
created.
800+
</td>
801+
</tr>
802+
<tr>
803+
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
804+
<td>spark.kubernetes.kerberos.dt.label</td>
805+
<td>
806+
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
807+
the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
808+
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
809+
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
755810
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
756811
<td>(none)</td>
757812
<td>
@@ -791,4 +846,3 @@ from the other deployment modes. See the [configuration page](configuration.html
791846
Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that
792847
should be lifted in the future include:
793848
* Applications can only run in cluster mode.
794-
* Only Scala and Java applications can be run.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.io.File
20+
21+
import scala.collection.JavaConverters._
22+
23+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}
24+
25+
import org.apache.spark.deploy.k8s.constants._
26+
import org.apache.spark.internal.Logging
27+
28+
/**
29+
* This is separated out from the HadoopConf steps API because this component can be reused to
30+
* set up the Hadoop Configuration for executors as well.
31+
*/
32+
private[spark] trait HadoopConfBootstrap {
33+
/**
34+
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
35+
* mounted as volumes and an ENV variable pointing to the mounted file.
36+
*/
37+
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
38+
: PodWithMainContainer
39+
}
40+
41+
private[spark] class HadoopConfBootstrapImpl(
42+
hadoopConfConfigMapName: String,
43+
hadoopConfigFiles: Seq[File],
44+
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging {
45+
46+
override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
47+
: PodWithMainContainer = {
48+
logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files")
49+
val keyPaths = hadoopConfigFiles.map { file =>
50+
val fileStringPath = file.toPath.getFileName.toString
51+
new KeyToPathBuilder()
52+
.withKey(fileStringPath)
53+
.withPath(fileStringPath)
54+
.build() }
55+
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
56+
.editSpec()
57+
.addNewVolume()
58+
.withName(HADOOP_FILE_VOLUME)
59+
.withNewConfigMap()
60+
.withName(hadoopConfConfigMapName)
61+
.withItems(keyPaths.asJava)
62+
.endConfigMap()
63+
.endVolume()
64+
.endSpec()
65+
.build()
66+
val hadoopSupportedContainer = new ContainerBuilder(
67+
originalPodWithMainContainer.mainContainer)
68+
.addNewVolumeMount()
69+
.withName(HADOOP_FILE_VOLUME)
70+
.withMountPath(HADOOP_CONF_DIR_PATH)
71+
.endVolumeMount()
72+
.addNewEnv()
73+
.withName(ENV_HADOOP_CONF_DIR)
74+
.withValue(HADOOP_CONF_DIR_PATH)
75+
.endEnv()
76+
.build()
77+
78+
originalPodWithMainContainer.copy(
79+
pod = hadoopSupportedPod,
80+
mainContainer = hadoopSupportedContainer)
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.ContainerBuilder
20+
21+
import org.apache.spark.deploy.k8s.constants._
22+
23+
// This trait is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected
24+
// however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER
25+
private[spark] trait HadoopConfSparkUserBootstrap {
26+
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
27+
: PodWithMainContainer
28+
}
29+
30+
private[spark] class HadoopConfSparkUserBootstrapImpl(hadoopUGIUtil: HadoopUGIUtil)
31+
extends HadoopConfSparkUserBootstrap {
32+
33+
override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
34+
: PodWithMainContainer = {
35+
val envModifiedContainer = new ContainerBuilder(
36+
originalPodWithMainContainer.mainContainer)
37+
.addNewEnv()
38+
.withName(ENV_SPARK_USER)
39+
.withValue(hadoopUGIUtil.getShortUserName)
40+
.endEnv()
41+
.build()
42+
originalPodWithMainContainer.copy(
43+
pod = originalPodWithMainContainer.pod,
44+
mainContainer = envModifiedContainer)
45+
}
46+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
20+
21+
import scala.util.Try
22+
23+
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.fs.FileSystem
25+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
26+
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
27+
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
28+
29+
import org.apache.spark.util.{Clock, SystemClock, Utils}
30+
31+
private[spark] trait HadoopUGIUtil {
32+
def getCurrentUser: UserGroupInformation
33+
def getShortUserName: String
34+
def getFileSystem(hadoopConf: Configuration): FileSystem
35+
def isSecurityEnabled: Boolean
36+
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String) :
37+
UserGroupInformation
38+
def dfsAddDelegationToken(fileSystem: FileSystem,
39+
hadoopConf: Configuration,
40+
renewer: String,
41+
creds: Credentials) : Iterable[Token[_ <: TokenIdentifier]]
42+
def getCurrentTime: Long
43+
def getTokenRenewalInterval(
44+
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
45+
hadoopConf: Configuration) : Option[Long]
46+
def serialize(creds: Credentials): Array[Byte]
47+
def deserialize(tokenBytes: Array[Byte]): Credentials
48+
}
49+
50+
private[spark] class HadoopUGIUtilImpl extends HadoopUGIUtil {
51+
52+
private val clock: Clock = new SystemClock()
53+
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
54+
def getShortUserName : String = getCurrentUser.getShortUserName
55+
def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf)
56+
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
57+
58+
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
59+
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
60+
61+
def dfsAddDelegationToken(fileSystem: FileSystem,
62+
hadoopConf: Configuration,
63+
renewer: String,
64+
creds: Credentials) : Iterable[Token[_ <: TokenIdentifier]] =
65+
fileSystem.addDelegationTokens(renewer, creds)
66+
67+
def getCurrentTime: Long = clock.getTimeMillis()
68+
69+
// Functions that should be in Core with Rebase to 2.3
70+
@deprecated("Moved to core in 2.3", "2.3")
71+
def getTokenRenewalInterval(
72+
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
73+
hadoopConf: Configuration): Option[Long] = {
74+
val renewIntervals = renewedTokens.filter {
75+
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
76+
}.flatMap { token =>
77+
Try {
78+
val newExpiration = token.renew(hadoopConf)
79+
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
80+
val interval = newExpiration - identifier.getIssueDate
81+
interval
82+
}.toOption
83+
}
84+
renewIntervals.reduceLeftOption(_ min _)
85+
}
86+
87+
@deprecated("Moved to core in 2.3", "2.3")
88+
def serialize(creds: Credentials): Array[Byte] = {
89+
Utils.tryWithResource(new ByteArrayOutputStream()) { byteStream =>
90+
Utils.tryWithResource(new DataOutputStream(byteStream)) { dataStream =>
91+
creds.writeTokenStorageToStream(dataStream)
92+
}
93+
byteStream.toByteArray
94+
}
95+
}
96+
97+
@deprecated("Moved to core in 2.3", "2.3")
98+
def deserialize(tokenBytes: Array[Byte]): Credentials = {
99+
val creds = new Credentials()
100+
Utils.tryWithResource(new ByteArrayInputStream(tokenBytes)) { byteStream =>
101+
Utils.tryWithResource(new DataInputStream(byteStream)) { dataStream =>
102+
creds.readTokenStorageStream(dataStream)
103+
}
104+
}
105+
creds
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
20+
21+
import org.apache.spark.deploy.k8s.constants._
22+
import org.apache.spark.internal.Logging
23+
24+
25+
/**
26+
* This is separated out from the HadoopConf steps API because this component can be reused to
27+
* mounted the DT secret for executors as well.
28+
*/
29+
private[spark] trait KerberosTokenConfBootstrap {
30+
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
31+
// pointing to the mounted file containing the DT for Secure HDFS interaction
32+
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
33+
: PodWithMainContainer
34+
}
35+
36+
private[spark] class KerberosTokenConfBootstrapImpl(
37+
secretName: String,
38+
secretItemKey: String,
39+
userName: String) extends KerberosTokenConfBootstrap with Logging {
40+
41+
override def bootstrapMainContainerAndVolumes(
42+
originalPodWithMainContainer: PodWithMainContainer) : PodWithMainContainer = {
43+
logInfo(s"Mounting HDFS DT from Secret $secretName for Secure HDFS")
44+
val secretMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
45+
.editOrNewSpec()
46+
.addNewVolume()
47+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
48+
.withNewSecret()
49+
.withSecretName(secretName)
50+
.endSecret()
51+
.endVolume()
52+
.endSpec()
53+
.build()
54+
// TODO: ENV_HADOOP_TOKEN_FILE_LOCATION should point to the latest token data item key.
55+
val secretMountedContainer = new ContainerBuilder(
56+
originalPodWithMainContainer.mainContainer)
57+
.addNewVolumeMount()
58+
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
59+
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
60+
.endVolumeMount()
61+
.addNewEnv()
62+
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
63+
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
64+
.endEnv()
65+
.addNewEnv()
66+
.withName(ENV_SPARK_USER)
67+
.withValue(userName)
68+
.endEnv()
69+
.build()
70+
originalPodWithMainContainer.copy(
71+
pod = secretMountedPod,
72+
mainContainer = secretMountedContainer)
73+
}
74+
}

0 commit comments

Comments
 (0)