44import threading
55import time
66from subprocess import call
7- from typing import List
7+ from typing import Dict , List
88import azure .batch .models as batch_models
99import aztk .spark
1010from aztk import error
@@ -247,6 +247,13 @@ def print_jobs(jobs: List[aztk.spark.models.Job]):
247247 )
248248
249249
250+ def get_applications (client , job : aztk .spark .models .Job ):
251+ if job .applications :
252+ return {j .name : j for j in job .applications }
253+ else :
254+ return client .list_applications (job .id )
255+
256+
250257def print_job (client , job : aztk .spark .models .Job ):
251258 print_format = '{:<36}| {:<15}'
252259
@@ -267,11 +274,41 @@ def print_job(client, job: aztk.spark.models.Job):
267274 log .info (print_format .format ("Cluster" , "Provisioning" ))
268275 log .info ("" )
269276
270- if job .applications :
271- application_summary (job .applications )
277+ application_summary (get_applications (client , job ))
278+ log .info ("" )
279+
280+
281+ def print_job_verbose (client , job : aztk .spark .models .Job , print_logs : bool ):
282+ print_format = '{:<36}| {:<15}'
283+
284+ log .info ("" )
285+ log .info ("Job %s" , job .id )
286+ log .info ("------------------------------------------" )
287+ log .info ("State: %s" , job .state )
288+ log .info ("Transition Time: %s" , utc_to_local (job .state_transition_time ))
289+ log .info ("" )
290+
291+ if job .cluster :
292+ print_cluster_summary (job .cluster )
272293 else :
273- application_summary (client .list_applications (job .id ))
294+ if job .state == 'completed' :
295+ log .info ("Cluster %s" , "Job completed, cluster deallocated." )
296+ log .info ("" )
297+ else :
298+ log .info (print_format .format ("Cluster" , "Provisioning" ))
299+ log .info ("" )
300+
301+ applications = get_applications (client , job )
302+ application_summary (applications )
274303 log .info ("" )
304+ print_applications (applications )
305+ log .info ("" )
306+
307+ if print_logs :
308+ for name , app in applications .items ():
309+ if app and app .exit_code is not None :
310+ log .info ("Logs for %s:" , name )
311+ print (client .get_job_application_log (job .id , name ).log )
275312
276313
277314def node_state_count (cluster : aztk .spark .models .Cluster ):
@@ -296,22 +333,22 @@ def print_cluster_summary(cluster: aztk.spark.models.Cluster):
296333 log .info ("" )
297334
298335
299- def application_summary (applications ):
336+ def application_summary (applications : Dict [ str , aztk . spark . models . Application ] ):
300337 states = {"scheduling" : 0 }
301338 for state in batch_models .TaskState :
302339 states [state .name ] = 0
303340
304341 warn_scheduling = False
305342
306- for application in applications :
307- if type ( application ) == str :
343+ for name , application in applications . items () :
344+ if application is None :
308345 states ["scheduling" ] += 1
309346 warn_scheduling = True
310347 else :
311348 states [application .state ] += 1
312349
313350 print_format = '{:<17} {:<14}'
314- log .info ("Applications " )
351+ log .info ("Application States " )
315352 log .info ("-" * 42 )
316353 for state in states :
317354 if states [state ] > 0 :
@@ -320,15 +357,15 @@ def application_summary(applications):
320357 if warn_scheduling :
321358 log .warning ("\n No Spark applications will be scheduled until the master is selected." )
322359
323- def print_applications (applications ):
360+ def print_applications (applications : Dict [ str , aztk . spark . models . Application ] ):
324361 print_format = '{:<36}| {:<15}| {:<16} | {:^9} |'
325362 print_format_underline = '{:-<36}|{:-<16}|{:-<18}|{:-<11}|'
326363 log .info (print_format .format ("Applications" , "State" , "Transition Time" , "Exit Code" ))
327364 log .info (print_format_underline .format ('' , '' , '' , '' ))
328365
329366 warn_scheduling = False
330- for name in applications :
331- if applications [ name ] is None :
367+ for name , application in applications . items () :
368+ if application is None :
332369 log .info (
333370 print_format .format (
334371 name ,
@@ -339,10 +376,9 @@ def print_applications(applications):
339376 )
340377 warn_scheduling = True
341378 else :
342- application = applications [name ]
343379 log .info (
344380 print_format .format (
345- application . name ,
381+ name ,
346382 application .state ,
347383 utc_to_local (application .state_transition_time ),
348384 application .exit_code if application .exit_code is not None else "-"
0 commit comments