Skip to content

Commit 2a5fd49

Browse files
committed
Added stageId <--> jobId mapping in DAGScheduler
...and make sure that DAGScheduler data structures are cleaned up on job completion. Initial effort and discussion at mesos/spark#842
1 parent d48959f commit 2a5fd49

File tree

8 files changed

+271
-76
lines changed

8 files changed

+271
-76
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
244244
case Some(bytes) =>
245245
return bytes
246246
case None =>
247-
statuses = mapStatuses(shuffleId)
247+
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
248248
epochGotten = epoch
249249
}
250250
}
251251
// If we got here, we failed to find the serialized locations in the cache, so we pulled
252-
// out a snapshot of the locations as "locs"; let's serialize and return that
252+
// out a snapshot of the locations as "statuses"; let's serialize and return that
253253
val bytes = MapOutputTracker.serializeMapStatuses(statuses)
254254
logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
255255
// Add them into the table only if the epoch hasn't changed while we were working
@@ -274,6 +274,10 @@ private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
274274
override def updateEpoch(newEpoch: Long) {
275275
// This might be called on the MapOutputTrackerMaster if we're running in local mode.
276276
}
277+
278+
def has(shuffleId: Int): Boolean = {
279+
cachedSerializedStatuses.get(shuffleId).isDefined || mapStatuses.contains(shuffleId)
280+
}
277281
}
278282

279283
private[spark] object MapOutputTracker {

core/src/main/scala/org/apache/spark/scheduler/ClusterScheduler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ private[spark] class ClusterScheduler(
181181
backend.killTask(tid, execId)
182182
}
183183
}
184-
tsm.error("Stage %d was cancelled".format(stageId))
184+
logInfo("Stage %d was cancelled".format(stageId))
185+
tsm.removeAllRunningTasks()
186+
taskSetFinished(tsm)
185187
}
186188
}
187189

0 commit comments

Comments
 (0)