Skip to content

Commit aa99653

Browse files
committed
Make use of the isIdle() method on the TaskTracker
Previously we would check the number of running jobs, however that sometimes returend incorrect values especially when dealing with failed jobs on the cluster. The result being some TaskTrackers never commit suicide.
1 parent c3f9540 commit aa99653

File tree

1 file changed

+17
-6
lines changed

1 file changed

+17
-6
lines changed

src/main/java/org/apache/hadoop/mapred/MesosExecutor.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.lang.reflect.Field;
1515
import java.lang.ReflectiveOperationException;
1616

17+
import java.util.concurrent.BlockingQueue;
1718
import java.util.concurrent.Executors;
1819
import java.util.concurrent.ScheduledExecutorService;
1920
import java.util.concurrent.TimeUnit;
@@ -235,12 +236,12 @@ public void run() {
235236
return;
236237
}
237238

238-
LOG.info("Checking to see if TaskTracker has no running jobs");
239-
int runningJobs = taskTracker.runningJobs.size();
239+
LOG.info("Checking to see if TaskTracker is idle");
240240

241-
// Check to see if the number of running jobs on the task tracker is zero
242-
if (runningJobs == 0) {
243-
LOG.warn("TaskTracker has zero jobs running, terminating");
241+
// If the task tracker is idle, all tasks have finished and task output
242+
// has been cleaned up.
243+
if (taskTracker.isIdle()) {
244+
LOG.warn("TaskTracker is idle, terminating");
244245

245246
try {
246247
taskTracker.shutdown();
@@ -251,7 +252,17 @@ public void run() {
251252
}
252253
}
253254
else {
254-
LOG.info("TaskTracker has " + runningJobs + " jobs running");
255+
try {
256+
Field field = taskTracker.getClass().getDeclaredField("tasksToCleanup");
257+
field.setAccessible(true);
258+
BlockingQueue<TaskTrackerAction> tasksToCleanup = ((BlockingQueue<TaskTrackerAction>) field.get(taskTracker));
259+
LOG.info("TaskTracker has " + taskTracker.tasks.size() +
260+
" running tasks and " + tasksToCleanup +
261+
" tasks to clean up.");
262+
} catch (ReflectiveOperationException e) {
263+
LOG.fatal("Failed to get task counts from TaskTracker", e);
264+
}
265+
255266
scheduleSuicideTimer();
256267
}
257268
}

0 commit comments

Comments
 (0)