Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2 from paperless-ngx/timeout-improvements
Browse files Browse the repository at this point in the history
Feature: Worker timeout improvements
  • Loading branch information
stumpylog authored Sep 3, 2022
2 parents 9838d0d + cf4f247 commit 21ef911
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 41 deletions.
78 changes: 48 additions & 30 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Standard
import ast
import enum
import inspect
import pydoc
import signal
Expand Down Expand Up @@ -46,6 +47,15 @@
from django_q.signals import post_execute, pre_execute
from django_q.signing import BadSignature, SignedPackage
from django_q.status import Stat, Status
from django_q.timeouts import JobTimeoutException, UnixSignalDeathPenalty


class WorkerStatus(enum.IntEnum):
IDLE = 1
BUSY = 2
RECYCLE = 3
TIMEOUT = 4
STARTING = 5


class Cluster:
Expand Down Expand Up @@ -192,7 +202,7 @@ def spawn_process(self, target, *args) -> Process:
p.daemon = True
if target == worker:
p.daemon = Conf.DAEMONIZE_WORKERS
p.timer = args[2]
p.status = args[2]
self.pool.append(p)
p.start()
return p
Expand All @@ -202,7 +212,7 @@ def spawn_pusher(self) -> Process:

def spawn_worker(self):
self.spawn_process(
worker, self.task_queue, self.result_queue, Value("f", -1), self.timeout
worker, self.task_queue, self.result_queue, Value('I', WorkerStatus.IDLE.value), self.timeout
)

def spawn_monitor(self) -> Process:
Expand All @@ -225,11 +235,11 @@ def reincarnate(self, process):
else:
self.pool.remove(process)
self.spawn_worker()
if process.timer.value == 0:
if process.status.value == WorkerStatus.TIMEOUT.value:
# only need to terminate on timeout, otherwise we risk destabilizing the queues
process.terminate()
logger.warning(_(f"reincarnated worker {process.name} after timeout"))
elif int(process.timer.value) == -2:
elif process.status.value == WorkerStatus.RECYCLE.value:
logger.info(_(f"recycled worker {process.name}"))
else:
logger.error(_(f"reincarnated worker {process.name} after death"))
Expand Down Expand Up @@ -267,14 +277,11 @@ def guard(self):
while not self.stop_event.is_set() or not counter:
# Check Workers
for p in self.pool:
with p.timer.get_lock():
with p.status.get_lock():
# Are you alive?
if not p.is_alive() or p.timer.value == 0:
if not p.is_alive() or p.status.value in {WorkerStatus.TIMEOUT.value, WorkerStatus.RECYCLE.value}:
self.reincarnate(p)
continue
# Decrement timer if work is being done
if p.timer.value > 0:
p.timer.value -= cycle
# Check Monitor
if not self.monitor.is_alive():
self.reincarnate(self.monitor)
Expand Down Expand Up @@ -401,24 +408,24 @@ def monitor(result_queue: Queue, broker: Broker = None):


def worker(
task_queue: Queue, result_queue: Queue, timer: Value, timeout: int = Conf.TIMEOUT
task_queue: Queue, result_queue: Queue, status: Value, timeout: int = Conf.TIMEOUT
):
"""
Takes a task from the task queue, tries to execute it and puts the result back in the result queue
:param timeout: number of seconds wait for a worker to finish.
:type task_queue: multiprocessing.Queue
:type result_queue: multiprocessing.Queue
:type timer: multiprocessing.Value
:type timer: multiprocessing.Value wrapping an unsigned int
"""
name = current_process().name
logger.info(_(f"{name} ready for work at {current_process().pid}"))
task_count = 0
if timeout is None:
timeout = -1
# Start reading the task queue
for task in iter(task_queue.get, "STOP"):
result = None
timer.value = -1 # Idle
timed_out = False
# Got a task package, but have not yet called the work
status.value = WorkerStatus.STARTING.value
task_count += 1
# Get the function from the task
logger.info(_(f'{name} processing [{task["name"]}]'))
Expand All @@ -427,31 +434,42 @@ def worker(
if not callable(task["func"]):
f = pydoc.locate(f)
close_old_django_connections()
timer_value = task.pop("timeout", timeout)
timeout = task.pop("timeout", timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
timer.value = timer_value # Busy
status.value = WorkerStatus.BUSY.value
try:
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except Exception as e:
with UnixSignalDeathPenalty(timeout=timeout):
res = f(*task["args"], **task["kwargs"])
result = (res, True)
except (JobTimeoutException, Exception) as e:
result = (f"{e} : {traceback.format_exc()}", False)
if isinstance(e, JobTimeoutException):
timed_out = True
if error_reporter:
error_reporter.report()
if task.get("sync", False):
raise
with timer.get_lock():
# Process result
task["result"] = result[0]
task["success"] = result[1]
task["stopped"] = timezone.now()
result_queue.put(task)
timer.value = -1 # Idle
# Recycle
if task_count == Conf.RECYCLE or rss_check():
timer.value = -2 # Recycled
break
finally:
with status.get_lock():
# Process result
if result is None:
result = (None, False)
task["result"] = result[0]
task["success"] = result[1]
task["stopped"] = timezone.now()
result_queue.put(task)

if timed_out:
status.value = WorkerStatus.TIMEOUT.value
break
elif task_count == Conf.RECYCLE or rss_check():
status.value = WorkerStatus.RECYCLE.value
break
else:
status.value = WorkerStatus.IDLE.value

logger.info(_(f"{name} stopped doing work"))


Expand Down
3 changes: 2 additions & 1 deletion django_q/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,13 +756,14 @@ def fetch_group(self, failures=True, wait=0, count=None):
def _sync(pack):
"""Simulate a package travelling through the cluster."""
from django_q.cluster import monitor, worker
from django_q.cluster import WorkerStatus

task_queue = Queue()
result_queue = Queue()
task = SignedPackage.loads(pack)
task_queue.put(task)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue)
task_queue.close()
Expand Down
16 changes: 8 additions & 8 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
sys.path.insert(0, myPath + "/../")

from django_q.brokers import Broker, get_broker
from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker
from django_q.cluster import Cluster, Sentinel, monitor, pusher, save_task, worker, WorkerStatus
from django_q.conf import Conf
from django_q.humanhash import DEFAULT_WORDLIST, uuid
from django_q.models import Success, Task
Expand Down Expand Up @@ -124,7 +124,7 @@ def test_cluster(broker):
assert queue_size(broker=broker) == 0
# Test work
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert task_queue.qsize() == 0
assert result_queue.qsize() == 1
# Test monitor
Expand Down Expand Up @@ -227,7 +227,7 @@ def test_enqueue(broker, admin_user):
assert fetch_group("test_j", count=2, wait=10) is None
# let a worker handle them
result_queue = Queue()
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert result_queue.qsize() == task_count
result_queue.put("STOP")
# store the results
Expand Down Expand Up @@ -437,7 +437,7 @@ def test_recycle(broker, monkeypatch):
pusher(task_queue, stop_event, broker=broker)
pusher(task_queue, stop_event, broker=broker)
# worker should exit on recycle
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
# check if the work has been done
assert result_queue.qsize() == 2
# save_limit test
Expand Down Expand Up @@ -472,7 +472,7 @@ def test_max_rss(broker, monkeypatch):
# push the task
pusher(task_queue, stop_event, broker=broker)
# worker should exit on recycle
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
# check if the work has been done
assert result_queue.qsize() == 1
# save_limit test
Expand Down Expand Up @@ -508,7 +508,7 @@ def test_bad_secret(broker, monkeypatch):
worker(
task_queue,
result_queue,
Value("f", -1),
Value("I", WorkerStatus.IDLE.value),
)
assert result_queue.qsize() == 0
broker.delete_queue()
Expand Down Expand Up @@ -693,7 +693,7 @@ def handler(sender, task, func, **kwargs):
event.set()
pusher(task_queue, event, broker=broker)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue, broker)
broker.delete_queue()
Expand Down Expand Up @@ -722,7 +722,7 @@ def handler(sender, task, **kwargs):
event.set()
pusher(task_queue, event, broker=broker)
task_queue.put("STOP")
worker(task_queue, result_queue, Value("f", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
result_queue.put("STOP")
monitor(result_queue, broker)
broker.delete_queue()
Expand Down
4 changes: 2 additions & 2 deletions django_q/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.utils.timezone import is_naive

from django_q.brokers import Broker, get_broker
from django_q.cluster import monitor, pusher, scheduler, worker, localtime
from django_q.cluster import WorkerStatus, localtime, monitor, pusher, scheduler, worker
from django_q.conf import Conf
from django_q.queues import Queue
from django_q.tasks import Schedule, fetch
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_scheduler(broker, monkeypatch):
task_queue.put("STOP")
# let a worker handle them
result_queue = Queue()
worker(task_queue, result_queue, Value("b", -1))
worker(task_queue, result_queue, Value("I", WorkerStatus.IDLE.value))
assert result_queue.qsize() == 1
result_queue.put("STOP")
# store the results
Expand Down
71 changes: 71 additions & 0 deletions django_q/timeouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
Using signal, implements a alarm based callback after a certain amount of time.
Borrowed from rq: https://github.com/rq/rq/blob/master/rq/timeouts.py
"""
import signal


class JobTimeoutException(SystemExit):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value. Inherits from SystemExit to prevent user code which catches
Exception from not timing out correctly.
"""
pass


class BaseDeathPenalty:
"""Base class to setup job timeouts."""

def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
# If signal.alarm timeout is 0, no alarm will be scheduled
# by signal.alarm
if timeout is None:
timeout = 0
self._timeout = timeout
self._exception = exception

def __enter__(self):
self.setup_death_penalty()

def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except JobTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
pass

# __exit__ may return True to supress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, BaseTimeoutException being handled normally to the
# invoking context.
return False

def setup_death_penalty(self):
raise NotImplementedError()

def cancel_death_penalty(self):
raise NotImplementedError()


class UnixSignalDeathPenalty(BaseDeathPenalty):

def handle_death_penalty(self, signum, frame):
raise self._exception('Task exceeded maximum timeout value '
'({0} seconds)'.format(self._timeout))

def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
an exception after the timeout amount (expressed in seconds).
"""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)

def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)

0 comments on commit 21ef911

Please sign in to comment.