Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Moves schedule checking to own thread which sleeps most of the time
Browse files Browse the repository at this point in the history
  • Loading branch information
stumpylog committed Sep 12, 2022
1 parent 8b5289d commit d00cfa5
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 64 deletions.
27 changes: 24 additions & 3 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime
from multiprocessing import Event, Process, Value, current_process
from time import sleep
import threading

# External
import arrow
Expand Down Expand Up @@ -63,6 +64,19 @@ class WorkerStatus(enum.IntEnum):
TIMEOUT = 4
STARTING = 5

class ScheduleThread(threading.Thread):
def __init__(self, broker: Broker, event: threading.Event):
threading.Thread.__init__(self)
self.broker = broker
self.stopped = event

def run(self):
# Call once on thread start
scheduler(self.broker)
while not self.stopped.wait(60.0):
# Call thereafter every ~60s
scheduler(self.broker)


class Cluster:
def __init__(self, broker: Broker = None):
Expand Down Expand Up @@ -181,6 +195,8 @@ def __init__(
self.event_out = Event()
self.monitor = None
self.pusher = None
self.scheduler_stop = threading.Event()
self.scheduler = ScheduleThread(self.broker, self.scheduler_stop)
if start:
self.start()

Expand Down Expand Up @@ -265,6 +281,8 @@ def spawn_cluster(self):
# spawn auxiliary
self.monitor = self.spawn_monitor()
self.pusher = self.spawn_pusher()
if Conf.SCHEDULER:
self.scheduler.start()
# set worker cpu affinity if needed
if psutil and Conf.CPU_AFFINITY:
set_cpu_affinity(Conf.CPU_AFFINITY, [w.pid for w in self.pool])
Expand Down Expand Up @@ -297,9 +315,9 @@ def guard(self):
self.reincarnate(self.pusher)
# Call scheduler once a minute (or so)
counter += cycle
if counter >= 30 and Conf.SCHEDULER:
counter = 0
scheduler(broker=self.broker)
#if counter >= 30 and Conf.SCHEDULER:
#counter = 0
#scheduler(broker=self.broker)
# Save current status
Stat(self).save()
sleep(cycle)
Expand All @@ -315,6 +333,9 @@ def stop(self):
while self.pusher.is_alive():
sleep(0.1)
Stat(self).save()
if Conf.SCHEDULER:
self.scheduler_stop.set()
self.scheduler.join()
# Put poison pills in the queue
for __ in range(len(self.pool)):
self.task_queue.put("STOP")
Expand Down
Loading

0 comments on commit d00cfa5

Please sign in to comment.