From 07a19118933096c8bdc970511a9f2cb85ce345c5 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 6 Sep 2017 17:14:13 -0700 Subject: [PATCH 1/5] Use a headless service to give a hostname to the driver. Required since SPARK-21642 was added upstream. --- docs/running-on-kubernetes.md | 2 + .../spark/deploy/kubernetes/constants.scala | 2 +- ...DriverConfigurationStepsOrchestrator.scala | 10 +- .../DriverAddressConfigurationStep.scala | 102 ++++++++++ ...rConfigurationStepsOrchestratorSuite.scala | 6 +- .../DriverAddressConfigurationStepSuite.scala | 181 ++++++++++++++++++ 6 files changed, 299 insertions(+), 4 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStepSuite.scala diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index b44857fffcdfa..af5433b7656ad 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -24,6 +24,8 @@ should give you a list of pods and configmaps (if any) respectively. * You must have a spark distribution with Kubernetes support. This may be obtained from the [release tarball](https://github.com/apache-spark-on-k8s/spark/releases) or by [building Spark with Kubernetes support](../resource-managers/kubernetes/README.md#building-spark-with-kubernetes-support). +* You must have [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/) configured in +your cluster. ## Driver & Executor Images diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 2c2ccf31b9dd9..0a2bc46249f3a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -44,11 +44,11 @@ package object constants { private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials" // Default and fixed ports - private[spark] val SUBMISSION_SERVER_PORT = 7077 private[spark] val DEFAULT_DRIVER_PORT = 7078 private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079 private[spark] val DEFAULT_UI_PORT = 4040 private[spark] val BLOCK_MANAGER_PORT_NAME = "blockmanager" + private[spark] val DRIVER_PORT_NAME = "driver-rpc-port" private[spark] val EXECUTOR_PORT_NAME = "executor" // Environment Variables diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala index b66da0b154698..a6fab39398b9d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,10 +20,10 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverAddressConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.util.Utils +import org.apache.spark.util.{SystemClock, Utils} /** * Constructs the complete list of driver configuration steps to run to deploy the Spark driver. @@ -92,6 +92,11 @@ private[spark] class DriverConfigurationStepsOrchestrator( mainClass, appArgs, submissionSparkConf) + val driverAddressStep = new DriverAddressConfigurationStep( + kubernetesResourceNamePrefix, + allDriverLabels, + submissionSparkConf, + new SystemClock) val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep( submissionSparkConf, kubernetesResourceNamePrefix) val pythonStep = mainAppResource match { @@ -160,6 +165,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( localFilesDownloadPath) Seq( initialSubmissionStep, + driverAddressStep, kubernetesCredentialsStep, dependencyResolutionStep) ++ submittedDependenciesBootstrapSteps ++ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStep.scala new file mode 100644 index 0000000000000..85412ad19742b --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStep.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import io.fabric8.kubernetes.api.model.ServiceBuilder +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Allows the driver to be reachable by executor pods through a headless service. The service's + * ports should correspond to the ports that the executor will reach the pod at for RPC. + */ +private[spark] class DriverAddressConfigurationStep( + kubernetesResourceNamePrefix: String, + driverLabels: Map[String, String], + submissionSparkConf: SparkConf, + clock: Clock) extends DriverConfigurationStep with Logging { + import DriverAddressConfigurationStep._ + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's hostname" + + s" will be managed via a Kubernetes service.") + require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" + + s" managed via a Kubernetes service.") + + val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" + val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName + } else { + val randomServiceId = clock.getTimeMillis() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" + + s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" + + s" as the driver service's name.") + shorterServiceName + } + + val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) + val driverBlockManagerPort = submissionSparkConf.getInt( + org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) + val driverService = new ServiceBuilder() + .withNewMetadata() + .withName(resolvedServiceName) + .endMetadata() + .withNewSpec() + .withClusterIP("None") + .withSelector(driverLabels.asJava) + .addNewPort() + .withName(DRIVER_PORT_NAME) + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() + .addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withPort(driverBlockManagerPort) + .withNewTargetPort(driverBlockManagerPort) + .endPort() + .endSpec() + .build() + + val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) + val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" + val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, driverHostname) + .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) + .set("spark.driver.port", driverPort.toString) + .set( + org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, driverBlockManagerPort) + + driverSpec.copy( + driverSparkConf = resolvedSparkConf, + otherKubernetesResources = driverSpec.otherKubernetesResources ++ Seq(driverService)) + } +} + +private[spark] object DriverAddressConfigurationStep { + val DRIVER_BIND_ADDRESS_KEY = org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key + val DRIVER_HOST_KEY = org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key + val DRIVER_SVC_POSTFIX = "-driver-svc" + val MAX_SERVICE_NAME_LENGTH = 63 +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala index c168e7b5407ba..7d90df2d96a65 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { @@ -47,6 +47,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], + classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep]) } @@ -69,6 +70,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], + classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[InitContainerBootstrapStep]) @@ -90,6 +92,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], + classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[PythonStep]) @@ -111,6 +114,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], + classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[MountSmallLocalFilesStep]) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStepSuite.scala new file mode 100644 index 0000000000000..08c939987603d --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/DriverAddressConfigurationStepSuite.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.submitsteps + +import io.fabric8.kubernetes.api.model.Service +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.util.Clock + +private[spark] class DriverAddressConfigurationStepSuite + extends SparkFunSuite with BeforeAndAfter { + + private val SHORT_RESOURCE_NAME_PREFIX = + "a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH - + DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length) + + private val LONG_RESOURCE_NAME_PREFIX = + "a" * (DriverAddressConfigurationStep.MAX_SERVICE_NAME_LENGTH - + DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX.length + 1) + private val DRIVER_LABELS = Map( + "label1key" -> "label1value", + "label2key" -> "label2value") + + @Mock + private var clock: Clock = _ + + private var sparkConf: SparkConf = _ + + before { + MockitoAnnotations.initMocks(this) + sparkConf = new SparkConf(false) + } + + test("Headless service has a port for the driver RPC and the block manager.") { + val configurationStep = new DriverAddressConfigurationStep( + SHORT_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf + .set("spark.driver.port", "9000") + .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080), + clock) + val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) + val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) + assert(resolvedDriverSpec.otherKubernetesResources.size === 1) + assert(resolvedDriverSpec.otherKubernetesResources.head.isInstanceOf[Service]) + val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service] + verifyService( + 9000, + 8080, + s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}", + driverService) + } + + test("Hostname and ports are set according to the service name.") { + val configurationStep = new DriverAddressConfigurationStep( + SHORT_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf + .set("spark.driver.port", "9000") + .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) + .set(KUBERNETES_NAMESPACE, "my-namespace"), + clock) + val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) + val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) + val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + + DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX + val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" + verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) + } + + test("Ports should resolve to defaults in SparkConf and in the service.") { + val configurationStep = new DriverAddressConfigurationStep( + SHORT_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf, + clock) + val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) + val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) + verifyService( + DEFAULT_DRIVER_PORT, + DEFAULT_BLOCKMANAGER_PORT, + s"$SHORT_RESOURCE_NAME_PREFIX${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}", + resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]) + assert(resolvedDriverSpec.driverSparkConf.get("spark.driver.port") === + DEFAULT_DRIVER_PORT.toString) + assert(resolvedDriverSpec.driverSparkConf.get( + org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT) === DEFAULT_BLOCKMANAGER_PORT) + } + + test("Long prefixes should switch to using a generated name.") { + val configurationStep = new DriverAddressConfigurationStep( + LONG_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), + clock) + when(clock.getTimeMillis()).thenReturn(10000) + val baseDriverSpec = KubernetesDriverSpec.initialSpec(sparkConf.clone()) + val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec) + val driverService = resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service] + val expectedServiceName = s"spark-10000${DriverAddressConfigurationStep.DRIVER_SVC_POSTFIX}" + assert(driverService.getMetadata.getName === expectedServiceName) + val expectedHostName = s"$expectedServiceName.my-namespace.svc.cluster.local" + verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, expectedHostName) + } + + test("Disallow bind address and driver host to be set explicitly.") { + val configurationStep = new DriverAddressConfigurationStep( + LONG_RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), + clock) + try { + configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf)) + fail("The driver bind address should not be allowed.") + } catch { + case e: Throwable => + assert(e.getMessage === + s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_BIND_ADDRESS_KEY} is" + + s" not supported in Kubernetes mode, as the driver's hostname will be managed via" + + s" a Kubernetes service.") + } + sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) + sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host") + try { + configurationStep.configureDriver(KubernetesDriverSpec.initialSpec(sparkConf)) + fail("The driver host address should not be allowed.") + } catch { + case e: Throwable => + assert(e.getMessage === + s"requirement failed: ${DriverAddressConfigurationStep.DRIVER_HOST_KEY} is" + + s" not supported in Kubernetes mode, as the driver's hostname will be managed via" + + s" a Kubernetes service.") + } + } + + private def verifyService( + driverPort: Int, + blockManagerPort: Int, + expectedServiceName: String, + service: Service): Unit = { + assert(service.getMetadata.getName === expectedServiceName) + assert(service.getSpec.getClusterIP === "None") + assert(service.getSpec.getSelector.asScala === DRIVER_LABELS) + assert(service.getSpec.getPorts.size() === 2) + val driverServicePorts = service.getSpec.getPorts.asScala + assert(driverServicePorts.head.getName === DRIVER_PORT_NAME) + assert(driverServicePorts.head.getPort.intValue() === driverPort) + assert(driverServicePorts.head.getTargetPort.getIntVal === driverPort) + assert(driverServicePorts(1).getName === BLOCK_MANAGER_PORT_NAME) + assert(driverServicePorts(1).getPort.intValue() === blockManagerPort) + assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort) + } + + private def verifySparkConfHostNames( + driverSparkConf: SparkConf, expectedHostName: String): Unit = { + assert(driverSparkConf.get( + org.apache.spark.internal.config.DRIVER_HOST_ADDRESS) === expectedHostName) + assert(driverSparkConf.get( + org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) === expectedHostName) + } +} From a1e5e843dbddb60ecd4c31d5e30b09f9afae679c Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 6 Sep 2017 17:25:17 -0700 Subject: [PATCH 2/5] Fix scalastyle. --- .../submit/DriverConfigurationStepsOrchestrator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala index a6fab39398b9d..76e97cb857ae5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverAddressConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{SystemClock, Utils} From a3e6699f39067fe82cca1391d392369a1bffa422 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 7 Sep 2017 13:55:49 -0700 Subject: [PATCH 3/5] Add back import --- .../submit/DriverConfigurationStepsOrchestratorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala index 831acff138035..8e7c4a3fa6269 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { From 98f8b61967e95e6e6c34d932b65fe227775389c7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 7 Sep 2017 15:08:41 -0700 Subject: [PATCH 4/5] Fix conflict properly. --- .../submit/DriverConfigurationStepsOrchestrator.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala index efbf0b4f64557..afc73e8f07601 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,13 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -<<<<<<< HEAD -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} -||||||| merged common ancestors -import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSmallLocalFilesStep, PythonStep} -======= -import org.apache.spark.deploy.kubernetes.submit.submitsteps._ ->>>>>>> origin/branch-2.2-kubernetes +import org.apache.spark.deploy.kubernetes.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverAddressConfigurationStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep} import org.apache.spark.deploy.kubernetes.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util.{SystemClock, Utils} From 5d42d0ee436710687078e6bd3f588d7ab2300dee Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 7 Sep 2017 15:33:07 -0700 Subject: [PATCH 5/5] Fix orchestrator test. --- .../submit/DriverConfigurationStepsOrchestratorSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala index 8e7c4a3fa6269..fc9159d08fe23 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -141,6 +141,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS validateStepTypes( orchestrator, classOf[BaseDriverConfigurationStep], + classOf[DriverAddressConfigurationStep], classOf[DriverKubernetesCredentialsStep], classOf[DependencyResolutionStep], classOf[MountSecretsStep])