@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster.kubernetes
19
19
import java .io .Closeable
20
20
import java .net .InetAddress
21
21
import java .util .Collections
22
- import java .util .concurrent .{ConcurrentHashMap , TimeUnit }
22
+ import java .util .concurrent .{ConcurrentHashMap , ExecutorService , ScheduledExecutorService , ThreadPoolExecutor , TimeUnit }
23
23
import java .util .concurrent .atomic .{AtomicInteger , AtomicLong , AtomicReference }
24
24
25
25
import io .fabric8 .kubernetes .api .model ._
@@ -29,25 +29,28 @@ import scala.collection.{concurrent, mutable}
29
29
import scala .collection .JavaConverters ._
30
30
import scala .concurrent .{ExecutionContext , Future }
31
31
32
- import org .apache .spark .{SparkContext , SparkEnv , SparkException }
32
+ import org .apache .spark .{SparkEnv , SparkException }
33
33
import org .apache .spark .deploy .kubernetes .config ._
34
34
import org .apache .spark .deploy .kubernetes .constants ._
35
35
import org .apache .spark .rpc .{RpcAddress , RpcCallContext , RpcEndpointAddress , RpcEnv }
36
36
import org .apache .spark .scheduler .{ExecutorExited , SlaveLost , TaskSchedulerImpl }
37
37
import org .apache .spark .scheduler .cluster .CoarseGrainedClusterMessages .{RetrieveSparkAppConfig , SparkAppConfig }
38
38
import org .apache .spark .scheduler .cluster .CoarseGrainedSchedulerBackend
39
- import org .apache .spark .util .{ ThreadUtils , Utils }
39
+ import org .apache .spark .util .Utils
40
40
41
41
private [spark] class KubernetesClusterSchedulerBackend (
42
42
scheduler : TaskSchedulerImpl ,
43
- val sc : SparkContext ,
43
+ rpcEnv : RpcEnv ,
44
44
executorPodFactory : ExecutorPodFactory ,
45
45
shuffleManager : Option [KubernetesExternalShuffleManager ],
46
- kubernetesClient : KubernetesClient )
47
- extends CoarseGrainedSchedulerBackend (scheduler, sc.env.rpcEnv) {
46
+ kubernetesClient : KubernetesClient ,
47
+ allocatorExecutor : ScheduledExecutorService ,
48
+ requestExecutorsService : ExecutorService )
49
+ extends CoarseGrainedSchedulerBackend (scheduler, rpcEnv) {
48
50
49
51
import KubernetesClusterSchedulerBackend ._
50
52
53
+ private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
51
54
private val RUNNING_EXECUTOR_PODS_LOCK = new Object
52
55
// Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
53
56
private val runningExecutorsToPods = new mutable.HashMap [String , Pod ]
@@ -57,21 +60,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
57
60
private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
58
61
// Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
59
62
private val executorPodsByIPs = new mutable.HashMap [String , Pod ]
60
- private val failedPods : concurrent.Map [String , ExecutorExited ] = new
61
- ConcurrentHashMap [String , ExecutorExited ]().asScala
62
- private val executorsToRemove = Collections .newSetFromMap[ String ](
63
- new ConcurrentHashMap [String , java.lang. Boolean ]() ).asScala
63
+ private val podsWithKnownExitReasons : concurrent.Map [String , ExecutorExited ] =
64
+ new ConcurrentHashMap [String , ExecutorExited ]().asScala
65
+ private val disconnectedPodsByExecutorIdPendingRemoval =
66
+ new ConcurrentHashMap [String , Pod ]( ).asScala
64
67
65
68
private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE )
66
69
67
70
private val kubernetesDriverPodName = conf
68
71
.get(KUBERNETES_DRIVER_POD_NAME )
69
72
.getOrElse(
70
73
throw new SparkException (" Must specify the driver pod name" ))
71
- private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX )
72
-
73
74
private implicit val requestExecutorContext = ExecutionContext .fromExecutorService(
74
- ThreadUtils .newDaemonCachedThreadPool( " kubernetes-executor-requests " ) )
75
+ requestExecutorsService )
75
76
76
77
private val driverPod = try {
77
78
kubernetesClient.pods().inNamespace(kubernetesNamespace).
@@ -93,9 +94,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
93
94
protected var totalExpectedExecutors = new AtomicInteger (0 )
94
95
95
96
private val driverUrl = RpcEndpointAddress (
96
- sc.getConf .get(" spark.driver.host" ),
97
- sc.getConf .getInt(" spark.driver.port" , DEFAULT_DRIVER_PORT ),
98
- CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
97
+ conf .get(" spark.driver.host" ),
98
+ conf .getInt(" spark.driver.port" , DEFAULT_DRIVER_PORT ),
99
+ CoarseGrainedSchedulerBackend .ENDPOINT_NAME ).toString
99
100
100
101
private val initialExecutors = getInitialTargetExecutorNumber()
101
102
@@ -109,21 +110,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
109
110
s " ${KUBERNETES_ALLOCATION_BATCH_SIZE } " +
110
111
s " is ${podAllocationSize}, should be a positive integer " )
111
112
112
- private val allocator = ThreadUtils
113
- .newDaemonSingleThreadScheduledExecutor(" kubernetes-pod-allocator" )
113
+ private val allocatorRunnable = new Runnable {
114
114
115
- private val allocatorRunnable : Runnable = new Runnable {
116
-
117
- // Number of times we are allowed check for the loss reason for an executor before we give up
118
- // and assume the executor failed for good, and attribute it to a framework fault.
119
- private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
120
- private val executorsToRecover = new mutable.HashSet [String ]
121
115
// Maintains a map of executor id to count of checks performed to learn the loss reason
122
116
// for an executor.
123
- private val executorReasonChecks = new mutable.HashMap [String , Int ]
117
+ private val executorReasonCheckAttemptCounts = new mutable.HashMap [String , Int ]
124
118
125
119
override def run (): Unit = {
126
- removeFailedExecutors ()
120
+ handleDisconnectedExecutors ()
127
121
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
128
122
if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
129
123
logDebug(" Waiting for pending executors before scaling" )
@@ -132,7 +126,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
132
126
} else {
133
127
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
134
128
for (i <- 0 until math.min(
135
- totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
129
+ totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
136
130
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
137
131
runningExecutorsToPods.put(executorId, pod)
138
132
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
@@ -143,43 +137,47 @@ private[spark] class KubernetesClusterSchedulerBackend(
143
137
}
144
138
}
145
139
146
- def removeFailedExecutors (): Unit = {
147
- val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK .synchronized {
148
- runningExecutorsToPods.toMap
149
- }
150
- executorsToRemove.foreach { case (executorId) =>
151
- localRunningExecutorsToPods.get(executorId).map { pod : Pod =>
152
- failedPods.get(pod.getMetadata.getName).map { executorExited : ExecutorExited =>
153
- logDebug(s " Removing executor $executorId with loss reason " + executorExited.message)
154
- removeExecutor(executorId, executorExited)
155
- if (! executorExited.exitCausedByApp) {
156
- executorsToRecover.add(executorId)
157
- }
158
- }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
159
- }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
160
-
161
- executorsToRecover.foreach(executorId => {
162
- executorsToRemove -= executorId
163
- executorReasonChecks -= executorId
164
- RUNNING_EXECUTOR_PODS_LOCK .synchronized {
165
- runningExecutorsToPods.remove(executorId).map { pod : Pod =>
166
- kubernetesClient.pods().delete(pod)
167
- runningPodsToExecutors.remove(pod.getMetadata.getName)
168
- }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
140
+ def handleDisconnectedExecutors (): Unit = {
141
+ // For each disconnected executor, synchronize with the loss reasons that may have been found
142
+ // by the executor pod watcher. If the loss reason was discovered by the watcher,
143
+ // inform the parent class with removeExecutor.
144
+ val disconnectedPodsByExecutorIdPendingRemovalCopy =
145
+ Map .empty ++ disconnectedPodsByExecutorIdPendingRemoval
146
+ disconnectedPodsByExecutorIdPendingRemovalCopy.foreach { case (executorId, executorPod) =>
147
+ val knownExitReason = podsWithKnownExitReasons.remove(executorPod.getMetadata.getName)
148
+ knownExitReason.fold {
149
+ removeExecutorOrIncrementLossReasonCheckCount(executorId)
150
+ } { executorExited =>
151
+ logDebug(s " Removing executor $executorId with loss reason " + executorExited.message)
152
+ removeExecutor(executorId, executorExited)
153
+ // We keep around executors that have exit conditions caused by the application. This
154
+ // allows them to be debugged later on. Otherwise, mark them as to be deleted from the
155
+ // the API server.
156
+ if (! executorExited.exitCausedByApp) {
157
+ deleteExecutorFromClusterAndDataStructures(executorId)
169
158
}
170
- })
171
- executorsToRecover.clear()
159
+ }
172
160
}
173
161
}
174
162
175
163
def removeExecutorOrIncrementLossReasonCheckCount (executorId : String ): Unit = {
176
- val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0 )
177
- if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS ) {
178
- removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons" ))
179
- executorsToRecover.add(executorId)
180
- executorReasonChecks -= executorId
164
+ val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0 )
165
+ if (reasonCheckCount >= MAX_EXECUTOR_LOST_REASON_CHECKS ) {
166
+ removeExecutor(executorId, SlaveLost (" Executor lost for unknown reasons." ))
167
+ deleteExecutorFromClusterAndDataStructures(executorId)
181
168
} else {
182
- executorReasonChecks.put(executorId, reasonCheckCount + 1 )
169
+ executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1 )
170
+ }
171
+ }
172
+
173
+ def deleteExecutorFromClusterAndDataStructures (executorId : String ): Unit = {
174
+ disconnectedPodsByExecutorIdPendingRemoval -= executorId
175
+ executorReasonCheckAttemptCounts -= executorId
176
+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
177
+ runningExecutorsToPods.remove(executorId).map { pod =>
178
+ kubernetesClient.pods().delete(pod)
179
+ runningPodsToExecutors.remove(pod.getMetadata.getName)
180
+ }.getOrElse(logWarning(s " Unable to remove pod for unknown executor $executorId" ))
183
181
}
184
182
}
185
183
}
@@ -214,18 +212,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
214
212
.withLabel(SPARK_APP_ID_LABEL , applicationId())
215
213
.watch(new ExecutorPodsWatcher ()))
216
214
217
- allocator .scheduleWithFixedDelay(
218
- allocatorRunnable, 0 , podAllocationInterval, TimeUnit .SECONDS )
215
+ allocatorExecutor .scheduleWithFixedDelay(
216
+ allocatorRunnable, 0L , podAllocationInterval, TimeUnit .SECONDS )
219
217
shuffleManager.foreach(_.start(applicationId()))
220
218
221
- if (! Utils .isDynamicAllocationEnabled(sc. conf)) {
219
+ if (! Utils .isDynamicAllocationEnabled(conf)) {
222
220
doRequestTotalExecutors(initialExecutors)
223
221
}
224
222
}
225
223
226
224
override def stop (): Unit = {
227
225
// stop allocation of new resources and caches.
228
- allocator .shutdown()
226
+ allocatorExecutor .shutdown()
229
227
shuffleManager.foreach(_.stop())
230
228
231
229
// send stop message to executors so they shut down cleanly
@@ -298,7 +296,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
298
296
executorId,
299
297
applicationId(),
300
298
driverUrl,
301
- sc. conf.getExecutorEnv,
299
+ conf.getExecutorEnv,
302
300
driverPod,
303
301
nodeToLocalTaskCount)
304
302
try {
@@ -318,11 +316,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
318
316
override def doKillExecutors (executorIds : Seq [String ]): Future [Boolean ] = Future [Boolean ] {
319
317
RUNNING_EXECUTOR_PODS_LOCK .synchronized {
320
318
for (executor <- executorIds) {
321
- runningExecutorsToPods.remove(executor) match {
322
- case Some (pod) =>
323
- kubernetesClient.pods().delete(pod)
324
- runningPodsToExecutors.remove(pod.getMetadata.getName)
325
- case None => logWarning(s " Unable to remove pod for unknown executor $executor" )
319
+ val maybeRemovedExecutor = runningExecutorsToPods.remove(executor)
320
+ maybeRemovedExecutor.foreach { executorPod =>
321
+ kubernetesClient.pods().delete(executorPod)
322
+ disconnectedPodsByExecutorIdPendingRemoval(executor) = executorPod
323
+ runningPodsToExecutors.remove(executorPod.getMetadata.getName)
324
+ }
325
+ if (maybeRemovedExecutor.isEmpty) {
326
+ logWarning(s " Unable to remove pod for unknown executor $executor" )
326
327
}
327
328
}
328
329
}
@@ -396,10 +397,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
396
397
}
397
398
398
399
def handleErroredPod (pod : Pod ): Unit = {
399
- val alreadyReleased = isPodAlreadyReleased(pod)
400
400
val containerExitStatus = getExecutorExitStatus(pod)
401
401
// container was probably actively killed by the driver.
402
- val exitReason = if (alreadyReleased ) {
402
+ val exitReason = if (isPodAlreadyReleased(pod) ) {
403
403
ExecutorExited (containerExitStatus, exitCausedByApp = false ,
404
404
s " Container in pod " + pod.getMetadata.getName +
405
405
" exited from explicit termination request." )
@@ -411,17 +411,23 @@ private[spark] class KubernetesClusterSchedulerBackend(
411
411
// Here we can't be sure that that exit was caused by the application but this seems
412
412
// to be the right default since we know the pod was not explicitly deleted by
413
413
// the user.
414
- " Pod exited with following container exit status code " + containerExitStatus
414
+ s " Pod ${pod.getMetadata.getName}'s executor container exited with exit status " +
415
+ s " code $containerExitStatus. "
415
416
}
416
417
ExecutorExited (containerExitStatus, exitCausedByApp = true , containerExitReason)
417
418
}
418
- failedPods .put(pod.getMetadata.getName, exitReason)
419
+ podsWithKnownExitReasons .put(pod.getMetadata.getName, exitReason)
419
420
}
420
421
421
422
def handleDeletedPod (pod : Pod ): Unit = {
422
- val exitReason = ExecutorExited (getExecutorExitStatus(pod), exitCausedByApp = false ,
423
- " Pod " + pod.getMetadata.getName + " deleted or lost." )
424
- failedPods.put(pod.getMetadata.getName, exitReason)
423
+ val exitMessage = if (isPodAlreadyReleased(pod)) {
424
+ s " Container in pod ${pod.getMetadata.getName} exited from explicit termination request. "
425
+ } else {
426
+ s " Pod ${pod.getMetadata.getName} deleted or lost. "
427
+ }
428
+ val exitReason = ExecutorExited (
429
+ getExecutorExitStatus(pod), exitCausedByApp = false , exitMessage)
430
+ podsWithKnownExitReasons.put(pod.getMetadata.getName, exitReason)
425
431
}
426
432
}
427
433
@@ -433,12 +439,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
433
439
rpcEnv : RpcEnv ,
434
440
sparkProperties : Seq [(String , String )])
435
441
extends DriverEndpoint (rpcEnv, sparkProperties) {
436
- private val externalShufflePort = conf.getInt(" spark.shuffle.service.port" , 7337 )
437
442
438
443
override def onDisconnected (rpcAddress : RpcAddress ): Unit = {
439
444
addressToExecutorId.get(rpcAddress).foreach { executorId =>
440
445
if (disableExecutor(executorId)) {
441
- executorsToRemove.add(executorId)
446
+ RUNNING_EXECUTOR_PODS_LOCK .synchronized {
447
+ runningExecutorsToPods.get(executorId).foreach { pod =>
448
+ disconnectedPodsByExecutorIdPendingRemoval(executorId) = pod
449
+ }
450
+ }
442
451
}
443
452
}
444
453
}
@@ -448,7 +457,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
448
457
new PartialFunction [Any , Unit ]() {
449
458
override def isDefinedAt (msg : Any ): Boolean = {
450
459
msg match {
451
- case RetrieveSparkAppConfig (executorId ) =>
460
+ case RetrieveSparkAppConfig (_ ) =>
452
461
shuffleManager.isDefined
453
462
case _ => false
454
463
}
@@ -477,11 +486,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
477
486
}
478
487
479
488
private object KubernetesClusterSchedulerBackend {
480
- private val DEFAULT_STATIC_PORT = 10000
481
- private val EXECUTOR_ID_COUNTER = new AtomicLong (0L )
482
489
private val VMEM_EXCEEDED_EXIT_CODE = - 103
483
490
private val PMEM_EXCEEDED_EXIT_CODE = - 104
484
491
private val UNKNOWN_EXIT_CODE = - 111
492
+ // Number of times we are allowed check for the loss reason for an executor before we give up
493
+ // and assume the executor failed for good, and attribute it to a framework fault.
494
+ val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
485
495
486
496
def memLimitExceededLogMessage (diagnostics : String ): String = {
487
497
s " Pod/Container killed for exceeding memory limits. $diagnostics" +
0 commit comments