Skip to content

Commit 60234a2

Browse files
committed
Spark on Kubernetes - basic submission client
1 parent 7475a96 commit 60234a2

33 files changed

+2623
-12
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ notifications:
4343
# 5. Run maven install before running lint-java.
4444
install:
4545
- export MAVEN_SKIP_RC=1
46-
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
46+
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
4747

4848
# 6. Run lint-java.
4949
script:

assembly/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@
148148
</dependency>
149149
</dependencies>
150150
</profile>
151+
<profile>
152+
<id>kubernetes</id>
153+
<dependencies>
154+
<dependency>
155+
<groupId>org.apache.spark</groupId>
156+
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
157+
<version>${project.version}</version>
158+
</dependency>
159+
</dependencies>
160+
</profile>
151161
<profile>
152162
<id>hive</id>
153163
<dependencies>

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
7676
private val STANDALONE = 2
7777
private val MESOS = 4
7878
private val LOCAL = 8
79-
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
79+
private val KUBERNETES = 16
80+
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
8081

8182
// Deploy modes
8283
private val CLIENT = 1
@@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
251252
YARN
252253
case m if m.startsWith("spark") => STANDALONE
253254
case m if m.startsWith("mesos") => MESOS
255+
case m if m.startsWith("k8s") => KUBERNETES
254256
case m if m.startsWith("local") => LOCAL
255257
case _ =>
256258
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
@@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
296298
case (STANDALONE, CLUSTER) if args.isR =>
297299
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
298300
"applications on standalone clusters.")
301+
case (KUBERNETES, CLIENT) =>
302+
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
303+
case (KUBERNETES, _) if args.isPython =>
304+
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
305+
case (KUBERNETES, _) if args.isR =>
306+
printErrorAndExit("R applications are currently not supported for Kubernetes.")
299307
case (LOCAL, CLUSTER) =>
300308
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
301309
case (_, CLUSTER) if isShell(args.primaryResource) =>
@@ -316,6 +324,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
316324
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
317325
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
318326
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
327+
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
319328

320329
if (!isMesosCluster && !isStandAloneCluster) {
321330
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
@@ -555,20 +564,24 @@ object SparkSubmit extends CommandLineUtils with Logging {
555564
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
556565
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),
557566

567+
// Kubernetes only
568+
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
569+
confKey = "spark.kubernetes.namespace"),
570+
558571
// Other options
559-
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
572+
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
560573
confKey = "spark.executor.cores"),
561-
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
574+
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
562575
confKey = "spark.executor.memory"),
563-
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
576+
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
564577
confKey = "spark.cores.max"),
565578
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
566579
confKey = "spark.files"),
567580
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
568581
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"),
569-
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
582+
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
570583
confKey = "spark.driver.memory"),
571-
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
584+
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
572585
confKey = "spark.driver.cores"),
573586
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
574587
confKey = "spark.driver.supervise"),
@@ -702,6 +715,18 @@ object SparkSubmit extends CommandLineUtils with Logging {
702715
}
703716
}
704717

718+
if (isKubernetesCluster) {
719+
childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
720+
childArgs ++= Array("--primary-java-resource", args.primaryResource)
721+
childArgs ++= Array("--main-class", args.mainClass)
722+
if (args.childArgs != null) {
723+
args.childArgs.foreach { arg =>
724+
childArgs += "--arg"
725+
childArgs += arg
726+
}
727+
}
728+
}
729+
705730
// Load any properties specified through --conf and the default properties file
706731
for ((k, v) <- args.sparkProperties) {
707732
sparkConf.setIfMissing(k, v)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
8181
var submissionToRequestStatusFor: String = null
8282
var useRest: Boolean = true // used internally
8383

84+
// Kubernetes only
85+
var kubernetesNamespace: String = null
86+
8487
/** Default properties present in the currently defined defaults file. */
8588
lazy val defaultSparkProperties: HashMap[String, String] = {
8689
val defaultProperties = new HashMap[String, String]()
@@ -199,6 +202,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
199202
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
200203
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
201204

205+
kubernetesNamespace = Option(kubernetesNamespace)
206+
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
207+
.orNull
208+
202209
// Try to set main class from JAR if no --class argument is given
203210
if (mainClass == null && !isPython && !isR && primaryResource != null) {
204211
val uri = new URI(primaryResource)
@@ -454,6 +461,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
454461
case KEYTAB =>
455462
keytab = value
456463

464+
case KUBERNETES_NAMESPACE =>
465+
kubernetesNamespace = value
466+
457467
case HELP =>
458468
printUsageAndExit(0)
459469

@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
590600
| the node running the Application Master via the Secure
591601
| Distributed Cache, for renewing the login tickets and the
592602
| delegation tokens periodically.
603+
|
604+
| Kubernetes only:
605+
| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the
606+
| application must be launched. The namespace must already
607+
| exist in the cluster. (Default: default).
593608
""".stripMargin
594609
)
595610

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,32 @@ class SparkSubmitSuite
385385
conf.get("spark.ui.enabled") should be ("false")
386386
}
387387

388+
test("handles k8s cluster mode") {
389+
val clArgs = Seq(
390+
"--deploy-mode", "cluster",
391+
"--master", "k8s://host:port",
392+
"--executor-memory", "5g",
393+
"--class", "org.SomeClass",
394+
"--kubernetes-namespace", "foo",
395+
"--driver-memory", "4g",
396+
"--conf", "spark.kubernetes.driver.docker.image=bar",
397+
"/home/thejar.jar",
398+
"arg1")
399+
val appArgs = new SparkSubmitArguments(clArgs)
400+
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)
401+
402+
val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
403+
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
404+
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
405+
childArgsMap.get("--arg") should be (Some("arg1"))
406+
mainClass should be ("org.apache.spark.deploy.k8s.submit.Client")
407+
classpath should have length (0)
408+
conf.get("spark.executor.memory") should be ("5g")
409+
conf.get("spark.driver.memory") should be ("4g")
410+
conf.get("spark.kubernetes.namespace") should be ("foo")
411+
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
412+
}
413+
388414
test("handles confs with flag equivalents") {
389415
val clArgs = Seq(
390416
"--deploy-mode", "cluster",

dev/sparktestsupport/modules.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,14 @@ def __hash__(self):
531531
sbt_test_goals=["mesos/test"]
532532
)
533533

534+
kubernetes = Module(
535+
name="kubernetes",
536+
dependencies=[],
537+
source_file_regexes=["resource-managers/kubernetes/core"],
538+
build_profile_flags=["-Pkubernetes"],
539+
sbt_test_goals=["kubernetes/test"]
540+
)
541+
534542
# The root module is a dummy module which is used to run all of the tests.
535543
# No other modules should directly depend on this module.
536544
root = Module(

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,16 @@ class SparkSubmitOptionParser {
7676
protected final String PRINCIPAL = "--principal";
7777
protected final String QUEUE = "--queue";
7878

79+
// Kubernetes-only options.
80+
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";
81+
7982
/**
8083
* This is the canonical list of spark-submit options. Each entry in the array contains the
8184
* different aliases for the same option; the first element of each entry is the "official"
8285
* name of the option, passed to {@link #handle(String, String)}.
8386
* <p>
8487
* Options not listed here nor in the "switch" list below will result in a call to
85-
* {@link $#handleUnknown(String)}.
88+
* {@link #handleUnknown(String)}.
8689
* <p>
8790
* These two arrays are visible for tests.
8891
*/
@@ -115,6 +118,7 @@ class SparkSubmitOptionParser {
115118
{ REPOSITORIES },
116119
{ STATUS },
117120
{ TOTAL_EXECUTOR_CORES },
121+
{ KUBERNETES_NAMESPACE },
118122
};
119123

120124
/**

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2662,6 +2662,13 @@
26622662
</modules>
26632663
</profile>
26642664

2665+
<profile>
2666+
<id>kubernetes</id>
2667+
<modules>
2668+
<module>resource-managers/kubernetes/core</module>
2669+
</modules>
2670+
</profile>
2671+
26652672
<profile>
26662673
<id>hive-thriftserver</id>
26672674
<modules>

project/SparkBuild.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ object BuildCommons {
5353
"tags", "sketch", "kvstore"
5454
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
5555

56-
val optionallyEnabledProjects@Seq(mesos, yarn,
56+
val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
5757
streamingFlumeSink, streamingFlume,
5858
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
5959
dockerIntegrationTests, hadoopCloud) =
60-
Seq("mesos", "yarn",
60+
Seq("kubernetes", "mesos", "yarn",
6161
"streaming-flume-sink", "streaming-flume",
6262
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
6363
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
@@ -671,9 +671,9 @@ object Unidoc {
671671
publish := {},
672672

673673
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
674-
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
674+
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
675675
unidocProjectFilter in(JavaUnidoc, unidoc) :=
676-
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
676+
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
677677

678678
unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
679679
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.apache.spark</groupId>
22+
<artifactId>spark-parent_2.11</artifactId>
23+
<version>2.3.0-SNAPSHOT</version>
24+
<relativePath>../../../pom.xml</relativePath>
25+
</parent>
26+
27+
<artifactId>spark-kubernetes_2.11</artifactId>
28+
<packaging>jar</packaging>
29+
<name>Spark Project Kubernetes</name>
30+
<properties>
31+
<sbt.project.name>kubernetes</sbt.project.name>
32+
<kubernetes.client.version>3.0.0</kubernetes.client.version>
33+
</properties>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.spark</groupId>
38+
<artifactId>spark-core_${scala.binary.version}</artifactId>
39+
<version>${project.version}</version>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.apache.spark</groupId>
44+
<artifactId>spark-core_${scala.binary.version}</artifactId>
45+
<version>${project.version}</version>
46+
<type>test-jar</type>
47+
<scope>test</scope>
48+
</dependency>
49+
50+
<dependency>
51+
<groupId>io.fabric8</groupId>
52+
<artifactId>kubernetes-client</artifactId>
53+
<version>${kubernetes.client.version}</version>
54+
<exclusions>
55+
<exclusion>
56+
<groupId>com.fasterxml.jackson.core</groupId>
57+
<artifactId>jackson-core</artifactId>
58+
</exclusion>
59+
<exclusion>
60+
<groupId>com.fasterxml.jackson.core</groupId>
61+
<artifactId>jackson-databind</artifactId>
62+
</exclusion>
63+
<exclusion>
64+
<groupId>com.fasterxml.jackson.core</groupId>
65+
<artifactId>jackson-annotations</artifactId>
66+
</exclusion>
67+
<exclusion>
68+
<groupId>com.fasterxml.jackson.dataformat</groupId>
69+
<artifactId>jackson-dataformat-yaml</artifactId>
70+
</exclusion>
71+
</exclusions>
72+
</dependency>
73+
74+
<!-- Required by kubernetes-client but we exclude it -->
75+
<dependency>
76+
<groupId>com.fasterxml.jackson.dataformat</groupId>
77+
<artifactId>jackson-dataformat-yaml</artifactId>
78+
<version>${fasterxml.jackson.version}</version>
79+
</dependency>
80+
81+
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
82+
<dependency>
83+
<groupId>com.google.guava</groupId>
84+
<artifactId>guava</artifactId>
85+
</dependency>
86+
<!-- End of shaded deps. -->
87+
88+
<dependency>
89+
<groupId>org.mockito</groupId>
90+
<artifactId>mockito-core</artifactId>
91+
<scope>test</scope>
92+
</dependency>
93+
94+
</dependencies>
95+
96+
97+
<build>
98+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
99+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
100+
</build>
101+
102+
</project>

0 commit comments

Comments
 (0)