1111
1212import  java .io .*;
1313
14+ import  java .lang .reflect .Field ;
15+ import  java .lang .ReflectiveOperationException ;
16+ 
17+ import  java .util .concurrent .BlockingQueue ;
18+ import  java .util .concurrent .Executors ;
19+ import  java .util .concurrent .ScheduledExecutorService ;
20+ import  java .util .concurrent .TimeUnit ;
21+ 
1422public  class  MesosExecutor  implements  Executor  {
1523  public  static  final  Log  LOG  = LogFactory .getLog (MesosExecutor .class );
1624  private  SlaveInfo  slaveInfo ;
1725  private  TaskTracker  taskTracker ;
1826
27+   protected  final  ScheduledExecutorService  timerScheduler  =
28+        Executors .newScheduledThreadPool (1 );
29+ 
1930  public  static  void  main (String [] args ) {
2031    MesosExecutorDriver  driver  = new  MesosExecutorDriver (new  MesosExecutor ());
2132    System .exit (driver .run () == Status .DRIVER_STOPPED  ? 0  : 1 );
@@ -37,10 +48,8 @@ private JobConf configure(final TaskInfo task) {
3748      conf .writeXml (writer );
3849      writer .flush ();
3950      String  xml  = writer .getBuffer ().toString ();
40-       String  xmlFormatted  =
41-           org .apache .mesos .hadoop .Utils .formatXml (xml );
4251      LOG .info ("XML Configuration received:\n "  +
43-           xmlFormatted );
52+                 org . apache . mesos . hadoop . Utils . formatXml ( xml ) );
4453    } catch  (Exception  e ) {
4554      LOG .warn ("Failed to output configuration as XML." , e );
4655    }
@@ -123,14 +132,22 @@ public void run() {
123132  }
124133
125134  @ Override 
126-   public  void  killTask (ExecutorDriver  driver , TaskID  taskId ) {
135+   public  void  killTask (final   ExecutorDriver  driver ,  final  TaskID  taskId ) {
127136    LOG .info ("Killing task : "  + taskId .getValue ());
128-     try  {
129-       taskTracker .shutdown ();
130-     } catch  (IOException  e ) {
131-       LOG .error ("Failed to shutdown TaskTracker" , e );
132-     } catch  (InterruptedException  e ) {
133-       LOG .error ("Failed to shutdown TaskTracker" , e );
137+     if  (taskTracker  != null ) {
138+       LOG .info ("Revoking task tracker map/reduce slots" );
139+       revokeSlots ();
140+ 
141+       // Send the TASK_FINISHED status 
142+       new  Thread ("TaskFinishedUpdate" ) {
143+         @ Override 
144+         public  void  run () {
145+           driver .sendStatusUpdate (TaskStatus .newBuilder ()
146+             .setTaskId (taskId )
147+             .setState (TaskState .TASK_FINISHED )
148+             .build ());
149+         }
150+       }.start ();
134151    }
135152  }
136153
@@ -159,4 +176,96 @@ public void error(ExecutorDriver d, String message) {
159176  public  void  shutdown (ExecutorDriver  d ) {
160177    LOG .info ("Executor asked to shutdown" );
161178  }
179+ 
180+   public  void  revokeSlots () {
181+     if  (taskTracker  == null ) {
182+       LOG .error ("Task tracker is not initialized" );
183+       return ;
184+     }
185+ 
186+     int  maxMapSlots  = 0 ;
187+     int  maxReduceSlots  = 0 ;
188+ 
189+     // TODO(tarnfeld): Sanity check that it's safe for us to change the slots. 
190+     // Be sure there's nothing running and nothing in the launcher queue. 
191+ 
192+     // If we expect to have no slots, let's go ahead and terminate the task launchers 
193+     if  (maxMapSlots  == 0 ) {
194+       try  {
195+         Field  launcherField  = taskTracker .getClass ().getDeclaredField ("mapLauncher" );
196+         launcherField .setAccessible (true );
197+ 
198+         // Kill the current map task launcher 
199+         TaskTracker .TaskLauncher  launcher  = ((TaskTracker .TaskLauncher ) launcherField .get (taskTracker ));
200+         launcher .notifySlots ();
201+         launcher .interrupt ();
202+       } catch  (ReflectiveOperationException  e ) {
203+         LOG .fatal ("Failed updating map slots due to error with reflection" , e );
204+       }
205+     }
206+ 
207+     if  (maxReduceSlots  == 0 ) {
208+       try  {
209+         Field  launcherField  = taskTracker .getClass ().getDeclaredField ("reduceLauncher" );
210+         launcherField .setAccessible (true );
211+ 
212+         // Kill the current reduce task launcher 
213+         TaskTracker .TaskLauncher  launcher  = ((TaskTracker .TaskLauncher ) launcherField .get (taskTracker ));
214+         launcher .notifySlots ();
215+         launcher .interrupt ();
216+       } catch  (ReflectiveOperationException  e ) {
217+         LOG .fatal ("Failed updating reduce slots due to error with reflection" , e );
218+       }
219+     }
220+ 
221+     // Configure the new slot counts on the task tracker 
222+     taskTracker .setMaxMapSlots (maxMapSlots );
223+     taskTracker .setMaxReduceSlots (maxReduceSlots );
224+ 
225+     // If we have zero slots left, commit suicide when no jobs are running 
226+     if  ((maxMapSlots  + maxReduceSlots ) == 0 ) {
227+       scheduleSuicideTimer ();
228+     }
229+   }
230+ 
231+   protected  void  scheduleSuicideTimer () {
232+     timerScheduler .schedule (new  Runnable () {
233+       @ Override 
234+       public  void  run () {
235+         if  (taskTracker  == null ) {
236+           return ;
237+         }
238+ 
239+         LOG .info ("Checking to see if TaskTracker is idle" );
240+ 
241+         // If the task tracker is idle, all tasks have finished and task output 
242+         // has been cleaned up. 
243+         if  (taskTracker .isIdle ()) {
244+           LOG .warn ("TaskTracker is idle, terminating" );
245+ 
246+           try  {
247+             taskTracker .shutdown ();
248+           } catch  (IOException  e ) {
249+             LOG .error ("Failed to shutdown TaskTracker" , e );
250+           } catch  (InterruptedException  e ) {
251+             LOG .error ("Failed to shutdown TaskTracker" , e );
252+           }
253+         }
254+         else  {
255+           try  {
256+             Field  field  = taskTracker .getClass ().getDeclaredField ("tasksToCleanup" );
257+             field .setAccessible (true );
258+             BlockingQueue <TaskTrackerAction > tasksToCleanup  = ((BlockingQueue <TaskTrackerAction >) field .get (taskTracker ));
259+             LOG .info ("TaskTracker has "  + taskTracker .tasks .size () +
260+                      " running tasks and "  + tasksToCleanup  +
261+                      " tasks to clean up." );
262+           } catch  (ReflectiveOperationException  e ) {
263+             LOG .fatal ("Failed to get task counts from TaskTracker" , e );
264+           }
265+ 
266+           scheduleSuicideTimer ();
267+         }
268+       }
269+     }, 1000 , TimeUnit .MILLISECONDS );
270+   }
162271}
0 commit comments