@@ -22,10 +22,10 @@ import io.fabric8.kubernetes.api.model.Service
2222import  io .fabric8 .kubernetes .client .KubernetesClient 
2323import  org .mockito .Mockito .mock 
2424
25- import  org .apache .spark .{SecurityManager , SparkConf }
25+ import  org .apache .spark .{SecurityManager , SparkConf ,  SparkIllegalArgumentException }
2626import  org .apache .spark .deploy .k8s ._ 
2727import  org .apache .spark .deploy .k8s .features .KubernetesExecutorCustomFeatureConfigStep 
28- import  org .apache .spark .internal .config .{ConfigEntry ,  SHUFFLE_SERVICE_PORT }
28+ import  org .apache .spark .internal .config .{BLOCK_MANAGER_PORT ,  ConfigEntry }
2929import  org .apache .spark .resource .ResourceProfile 
3030
3131class  KubernetesExecutorBuilderSuite  extends  PodBuilderSuite  {
@@ -81,29 +81,39 @@ class KubernetesExecutorBuilderSuite extends PodBuilderSuite {
8181  }
8282
8383  test(" SPARK-XXXXX: check executor kubernetes spec with service enabled" 
84-     Seq (None , Some (1234 )).foreach { somePort => 
85-       val  sparkConf  =  baseConf.clone.set(Config .KUBERNETES_EXECUTOR_ENABLE_SERVICE , true )
86-       somePort.foreach(sparkConf.set(SHUFFLE_SERVICE_PORT , _))
87-       val  conf  =  KubernetesTestConf .createExecutorConf(sparkConf =  sparkConf)
88-       val  secMgr  =  new  SecurityManager (sparkConf)
89-       val  client  =  mock(classOf [KubernetesClient ])
90-       val  profile  =  ResourceProfile .getOrCreateDefaultProfile(sparkConf)
91-       val  spec  =  new  KubernetesExecutorBuilder ().buildFromFeatures(conf, secMgr, client, profile)
92- 
93-       val  containerEnvs  =  spec.pod.container.getEnv.asScala
94-       assert(containerEnvs.exists(_.getName ===  " EXECUTOR_SERVICE_NAME" 
95-       val  containerEnv  =  containerEnvs.filter(_.getName ===  " EXECUTOR_SERVICE_NAME" 
96-       assert(containerEnv.getValue ===  " svc-appId-exec-1" 
97- 
98-       assert(spec.executorKubernetesResources.size ===  1 )
99-       val  resource  =  spec.executorKubernetesResources.head
100-       assert(resource.getKind ===  " Service" 
101-       val  service  =  resource.asInstanceOf [Service ]
102-       assert(service.getMetadata.getName ===  " svc-appId-exec-1" 
103-       assert(service.getSpec.getPorts.size() ===  1 )
104-       val  port  =  service.getSpec.getPorts.get(0 )
105-       assert(port.getName ===  " spark-shuffle-service" 
106-       assert(port.getPort ===  somePort.getOrElse(SHUFFLE_SERVICE_PORT .defaultValue.get))
84+     val  sparkConf  =  baseConf.clone
85+       .set(Config .KUBERNETES_EXECUTOR_ENABLE_SERVICE , true )
86+       .set(BLOCK_MANAGER_PORT , 1234 )
87+     val  conf  =  KubernetesTestConf .createExecutorConf(sparkConf =  sparkConf)
88+     val  secMgr  =  new  SecurityManager (sparkConf)
89+     val  client  =  mock(classOf [KubernetesClient ])
90+     val  profile  =  ResourceProfile .getOrCreateDefaultProfile(sparkConf)
91+     val  spec  =  new  KubernetesExecutorBuilder ().buildFromFeatures(conf, secMgr, client, profile)
92+ 
93+     val  containerEnvs  =  spec.pod.container.getEnv.asScala
94+     assert(containerEnvs.exists(_.getName ===  " EXECUTOR_SERVICE_NAME" 
95+     val  containerEnv  =  containerEnvs.filter(_.getName ===  " EXECUTOR_SERVICE_NAME" 
96+     assert(containerEnv.getValue ===  " svc-appId-exec-1" 
97+ 
98+     assert(spec.executorKubernetesResources.size ===  1 )
99+     val  resource  =  spec.executorKubernetesResources.head
100+     assert(resource.getKind ===  " Service" 
101+     val  service  =  resource.asInstanceOf [Service ]
102+     assert(service.getMetadata.getName ===  " svc-appId-exec-1" 
103+     assert(service.getSpec.getPorts.size() ===  1 )
104+     val  port  =  service.getSpec.getPorts.get(0 )
105+     assert(port.getName ===  " spark-block-manager" 
106+     assert(port.getPort ===  1234 )
107+   }
108+ 
109+   test(" SPARK-XXXXX: check executor kubernetes service requires block manager port" 
110+     val  sparkConf  =  baseConf.clone.set(Config .KUBERNETES_EXECUTOR_ENABLE_SERVICE , true )
111+     val  conf  =  KubernetesTestConf .createExecutorConf(sparkConf =  sparkConf)
112+     val  secMgr  =  new  SecurityManager (sparkConf)
113+     val  client  =  mock(classOf [KubernetesClient ])
114+     val  profile  =  ResourceProfile .getOrCreateDefaultProfile(sparkConf)
115+     assertThrows[SparkIllegalArgumentException ] {
116+       new  KubernetesExecutorBuilder ().buildFromFeatures(conf, secMgr, client, profile)
107117    }
108118  }
109119}
0 commit comments