diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 43db943a05fd..cf76c0e5b9f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -339,8 +339,8 @@ private[spark] class SparkSubmit extends Logging { val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT val isKubernetesClusterModeDriver = isKubernetesClient && sparkConf.getBoolean("spark.kubernetes.submitInDriver", false) - // TODO: does client/cluster mode matter here? - val isArmada = clusterManager == ARMADA + val isArmadaCluster = clusterManager == ARMADA && deployMode == CLUSTER + // TODO: Support armada & client? val isCustomClasspathInClusterModeDisallowed = !sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) && args.proxyUser != null && @@ -706,7 +706,7 @@ private[spark] class SparkSubmit extends Logging { mergeFn = Some(mergeFileLists(_, _))), // Other options - OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES, + OptionAssigner(args.numExecutors, YARN | KUBERNETES | ARMADA, ALL_DEPLOY_MODES, confKey = EXECUTOR_INSTANCES.key), OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = EXECUTOR_CORES.key), @@ -878,10 +878,28 @@ private[spark] class SparkSubmit extends Logging { } } - if (isArmada) { - // FIXME: Make sure we populate what we need here! + if (isArmadaCluster) { childMainClass = ARMADA_CLUSTER_SUBMIT_CLASS - childArgs ++= Array("--class", args.mainClass) + if (args.primaryResource != SparkLauncher.NO_RESOURCE) { + if (args.isPython) { + childArgs ++= Array("--primary-py-file", args.primaryResource) + childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") + } else if (args.isR) { + childArgs ++= Array("--primary-r-file", args.primaryResource) + childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner") + } + else { + childArgs ++= Array("--primary-java-resource", args.primaryResource) + childArgs ++= Array("--main-class", args.mainClass) + } + } else { + childArgs ++= Array("--main-class", args.mainClass) + } + if (args.childArgs != null) { + args.childArgs.foreach { arg => + childArgs += "--arg" += arg + } + } } // Load any properties specified through --conf and the default properties file diff --git a/examples/submitSparkPi.sh b/examples/submitSparkPi.sh new file mode 100755 index 000000000000..f4769a75feb6 --- /dev/null +++ b/examples/submitSparkPi.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# run spark pi app on armada +bin/spark-submit --master armada://localhost:30002 --deploy-mode cluster --name spark-pi --class org.apache.spark.examples.SparkPi --conf spark.executor.instances=2 --conf spark.kubernetes.container.image=spark:testing local:///opt/spark/examples/jars/spark-examples.jar 100 diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala index bfc6387ada84..afac03b3f598 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/ArmadaClientApplication.scala @@ -16,8 +16,9 @@ */ package org.apache.spark.deploy.armada.submit -/* import scala.collection.mutable + +/* import scala.jdk.CollectionConverters._ import scala.util.control.Breaks._ import scala.util.control.NonFatal @@ -30,6 +31,7 @@ import io.fabric8.kubernetes.client.Watcher.Action */ import _root_.io.armadaproject.armada.ArmadaClient import k8s.io.api.core.v1.generated.{Container, EnvVar, PodSpec, ResourceRequirements} +import k8s.io.api.core.v1.generated.{EnvVarSource, ObjectFieldSelector} import k8s.io.apimachinery.pkg.api.resource.generated.Quantity import org.apache.spark.SparkConf @@ -52,7 +54,6 @@ import org.apache.spark.util.Utils * @param mainClass the main class of the application to run * @param driverArgs arguments to the driver */ -/* private[spark] case class ClientArguments( mainAppResource: MainAppResource, mainClass: String, @@ -94,7 +95,6 @@ private[spark] object ClientArguments { proxyUser) } } -*/ /** * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a @@ -242,14 +242,15 @@ private[spark] class ArmadaClientApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { log("ArmadaClientApplication.start() called!") - run(conf) + val parsedArguments = ClientArguments.fromCommandLineArgs(args) + run(parsedArguments, conf) } - private def run(sparkConf: SparkConf): Unit = { + private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master")) log(s"host is $host, port is $port") - var armadaClient = new ArmadaClient(ArmadaClient.GetChannel(host, port)) - if (armadaClient.SubmitHealth().isServing) { + val armadaClient = ArmadaClient(host, port) + if (armadaClient.submitHealth().isServing) { log("Submit health good!") } else { log("Could not contact Armada!") @@ -258,7 +259,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication { // # FIXME: Need to check how this is launched whether to submit a job or // to turn into driver / cluster manager mode. - val jobId = submitDriverJob(armadaClient, sparkConf) + val jobId = submitDriverJob(armadaClient, clientArguments, sparkConf) log(s"Got job ID: $jobId") // For constructing the app ID, we can't use the Spark application name, as the app ID is going // to be added as a label to group resources belonging to the same application. Label values are @@ -296,14 +297,26 @@ private[spark] class ArmadaClientApplication extends SparkApplication { () } - private def submitDriverJob(armadaClient: ArmadaClient, conf: SparkConf): String = { + private def submitDriverJob(armadaClient: ArmadaClient, clientArguments: ClientArguments, + conf: SparkConf): String = { + val source = EnvVarSource().withFieldRef(ObjectFieldSelector() + .withApiVersion("v1").withFieldPath("status.podIP")) val envVars = Seq( - EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValue("0.0.0.0:1234") + new EnvVar().withName("SPARK_DRIVER_BIND_ADDRESS").withValueFrom(source) ) + + val primaryResource = clientArguments.mainAppResource match { + case JavaMainAppResource(Some(resource)) => Seq(resource) + case PythonMainAppResource(resource) => Seq(resource) + case RMainAppResource(resource) => Seq(resource) + case _ => Seq() + } + + val javaOptions = "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=0.0.0.0:5005" val driverContainer = Container() .withName("spark-driver") .withImagePullPolicy("IfNotPresent") - .withImage("spark:testing") + .withImage(conf.get("spark.kubernetes.container.image")) .withEnv(envVars) .withCommand(Seq("/opt/entrypoint.sh")) .withArgs( @@ -311,11 +324,19 @@ private[spark] class ArmadaClientApplication extends SparkApplication { "driver", "--verbose", "--class", - conf.get("spark.app.name"), + clientArguments.mainClass, "--master", "armada://armada-server.armada.svc.cluster.local:50051", - "submit" - ) + "--conf", + s"spark.executor.instances=${conf.get("spark.executor.instances")}", + "--conf", + s"spark.kubernetes.container.image=${conf.get("spark.kubernetes.container.image")}", + "--conf", + "spark.driver.port=7078", + "--conf", + s"spark.driver.extraJavaOptions=$javaOptions" + + ) ++ primaryResource ++ clientArguments.driverArgs ) .withResources( // FIXME: What are reasonable requests/limits for spark drivers? ResourceRequirements( @@ -338,16 +359,16 @@ private[spark] class ArmadaClientApplication extends SparkApplication { val driverJob = api.submit .JobSubmitRequestItem() .withPriority(0) - .withNamespace("personal-anonymous") + .withNamespace("default") .withPodSpec(podSpec) // FIXME: Plumb config for queue, job-set-id - val jobSubmitResponse = armadaClient.SubmitJobs("test", "spark-test-1", Seq(driverJob)) + val jobSubmitResponse = armadaClient.submitJobs("test", "driver", Seq(driverJob)) - log(s"Job Submit Response $jobSubmitResponse") for (respItem <- jobSubmitResponse.jobResponseItems) { - log(s"JobID: ${respItem.jobId} Error: ${respItem.error} ") + val error = if (respItem.error == "") "None" else respItem.error + log(s"JobID: ${respItem.jobId} Error: ${error}") } - jobSubmitResponse.jobResponseItems(0).jobId + jobSubmitResponse.jobResponseItems.head.jobId } } diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala new file mode 100644 index 000000000000..610b3c149bd1 --- /dev/null +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/deploy/armada/MainAppResource.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.armada.submit + +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} + +/** + * :: DeveloperApi :: + * + * All traits and classes in this file are used by K8s module and Spark K8s operator. + */ + +@Stable +@DeveloperApi +@Since("2.3.0") +sealed trait MainAppResource + +@Stable +@DeveloperApi +@Since("2.4.0") +sealed trait NonJVMResource + +@Stable +@DeveloperApi +@Since("3.0.0") +case class JavaMainAppResource(primaryResource: Option[String]) + extends MainAppResource + +@Stable +@DeveloperApi +@Since("2.4.0") +case class PythonMainAppResource(primaryResource: String) + extends MainAppResource with NonJVMResource + +@Stable +@DeveloperApi +@Since("2.4.0") +case class RMainAppResource(primaryResource: String) + extends MainAppResource with NonJVMResource diff --git a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala index 6d3f32ab364d..2eda76104175 100644 --- a/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala +++ b/resource-managers/armada/core/src/main/scala/io/armadaproject/spark/scheduler/cluster/armada/ArmadaClusterManagerBackend.scala @@ -29,6 +29,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rpc.{RpcAddress, RpcCallContext} import org.apache.spark.scheduler.{ExecutorDecommission, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.getInitialTargetExecutorNumber // TODO: Implement for Armada @@ -49,7 +50,7 @@ private[spark] class ArmadaClusterSchedulerBackend( } - private def submitJob(): Unit = { + private def submitJob(executorId: Int): Unit = { val urlArray = masterURL.split(":") // Remove leading "/"'s @@ -63,7 +64,7 @@ private[spark] class ArmadaClusterSchedulerBackend( val source = EnvVarSource().withFieldRef(ObjectFieldSelector() .withApiVersion("v1").withFieldPath("status.podIP")) val envVars = Seq( - EnvVar().withName("SPARK_EXECUTOR_ID").withValue("1"), + EnvVar().withName("SPARK_EXECUTOR_ID").withValue(executorId.toString), EnvVar().withName("SPARK_RESOURCE_PROFILE_ID").withValue("0"), EnvVar().withName("SPARK_EXECUTOR_POD_NAME").withValue("test-pod-name"), EnvVar().withName("SPARK_APPLICATION_ID").withValue("test_spark_app_id"), @@ -75,7 +76,7 @@ private[spark] class ArmadaClusterSchedulerBackend( val executorContainer = Container() .withName("spark-executor") .withImagePullPolicy("IfNotPresent") - .withImage("spark:testing") + .withImage(conf.get("spark.kubernetes.container.image")) .withEnv(envVars) .withCommand(Seq("/opt/entrypoint.sh")) .withArgs( @@ -107,8 +108,8 @@ private[spark] class ArmadaClusterSchedulerBackend( .withNamespace("default") .withPodSpec(podSpec) - val client = new ArmadaClient(ArmadaClient.GetChannel(host, port)) - val jobSubmitResponse = client.SubmitJobs("test", "executor", Seq(testJob)) + val client = ArmadaClient(host, port) + val jobSubmitResponse = client.submitJobs("test", "executor", Seq(testJob)) logInfo(s"Driver Job Submit Response") for (respItem <- jobSubmitResponse.jobResponseItems) { @@ -117,7 +118,8 @@ private[spark] class ArmadaClusterSchedulerBackend( } } override def start(): Unit = { - submitJob() + val numberOfExecutors = getInitialTargetExecutorNumber(conf) + 1 to numberOfExecutors foreach {j: Int => submitJob(j)} } override def stop(): Unit = {} @@ -143,7 +145,7 @@ private[spark] class ArmadaClusterSchedulerBackend( } private class ArmadaDriverEndpoint extends DriverEndpoint { - protected val execIDRequester = new HashMap[RpcAddress, String] + private val execIDRequester = new HashMap[RpcAddress, String] override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = super.receiveAndReply(context)