|
11 | 11 | from .modules.logging import log
|
12 | 12 |
|
13 | 13 |
|
14 |
| -worker_life = lifecycle.LifecycleManager() |
| 14 | +def start_worker(): |
| 15 | + ''' |
| 16 | + Starts the worker. |
| 17 | + ''' |
| 18 | + worker_life = lifecycle.LifecycleManager() |
| 19 | + |
| 20 | + if not worker_life.is_worker_zero: |
| 21 | + log("Not worker zero, starting TTL timer thread.") |
| 22 | + threading.Thread(target=worker_life.check_worker_ttl_thread).start() |
| 23 | + else: |
| 24 | + log("Worker zero, not starting TTL timer thread.") |
15 | 25 |
|
16 |
| -if not worker_life.is_worker_zero: |
17 |
| - log("Not worker zero, starting TTL timer thread.") |
18 |
| - threading.Thread(target=worker_life.check_worker_ttl_thread).start() |
19 |
| -else: |
20 |
| - log("Worker zero, not starting TTL timer thread.") |
| 26 | + while True: |
| 27 | + if os.environ.get('TEST_LOCAL', 'false') != 'true': |
| 28 | + next_job = job.get(worker_life.worker_id) |
| 29 | + else: |
| 30 | + next_job = job.get_local() |
21 | 31 |
|
| 32 | + if next_job is not None: |
| 33 | + worker_life.work_in_progress = True # Rests when "reset_worker_ttl" is called |
22 | 34 |
|
23 |
| -while True: |
24 |
| - if os.environ.get('TEST_LOCAL', 'false') != 'true': |
25 |
| - next_job = job.get(worker_life.worker_id) |
26 |
| - else: |
27 |
| - next_job = job.get_local() |
| 35 | + try: |
| 36 | + output_urls, job_duration_ms = job.run(next_job['id'], next_job['input']) |
| 37 | + job.post(worker_life.worker_id, next_job['id'], output_urls, job_duration_ms) |
| 38 | + except ValueError as err: |
| 39 | + job.error(worker_life.worker_id, next_job['id'], str(err)) |
28 | 40 |
|
29 |
| - if next_job is not None: |
30 |
| - worker_life.work_in_progress = True # Rests when "reset_worker_ttl" is called |
| 41 | + # -------------------------------- Job Cleanup ------------------------------- # |
| 42 | + shutil.rmtree("input_objects", ignore_errors=True) |
| 43 | + shutil.rmtree("output_objects", ignore_errors=True) |
| 44 | + os.remove('output.zip') |
31 | 45 |
|
32 |
| - try: |
33 |
| - output_urls, job_duration_ms = job.run(next_job['id'], next_job['input']) |
34 |
| - job.post(worker_life.worker_id, next_job['id'], output_urls, job_duration_ms) |
35 |
| - except ValueError as err: |
36 |
| - job.error(worker_life.worker_id, next_job['id'], str(err)) |
| 46 | + worker_life.reset_worker_ttl() |
37 | 47 |
|
38 |
| - # -------------------------------- Job Cleanup ------------------------------- # |
39 |
| - shutil.rmtree("input_objects", ignore_errors=True) |
40 |
| - shutil.rmtree("output_objects", ignore_errors=True) |
41 |
| - os.remove('output.zip') |
| 48 | + if os.environ.get('TEST_LOCAL', 'false') == 'true': |
| 49 | + log("Local testing complete, exiting.") |
| 50 | + break |
42 | 51 |
|
43 |
| - worker_life.reset_worker_ttl() |
44 | 52 |
|
45 |
| - if os.environ.get('TEST_LOCAL', 'false') == 'true': |
46 |
| - log("Local testing complete, exiting.") |
47 |
| - break |
| 53 | +if __name__ == '__main__': |
| 54 | + start_worker() |
0 commit comments