diff --git a/CHANGES/+worker_backoff.feature b/CHANGES/+worker_backoff.feature new file mode 100644 index 0000000000..727e470749 --- /dev/null +++ b/CHANGES/+worker_backoff.feature @@ -0,0 +1 @@ +Added backing off on auxiliary workers if wrongly alarmed on pending tasks. diff --git a/pulpcore/tasking/entrypoint.py b/pulpcore/tasking/entrypoint.py index 5213de882b..18ec91e9a2 100644 --- a/pulpcore/tasking/entrypoint.py +++ b/pulpcore/tasking/entrypoint.py @@ -34,7 +34,6 @@ def worker( auxiliary, ): """A Pulp worker.""" - if reload: try: import hupper diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 6cce69c7ac..8e635e8c1e 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -2,6 +2,7 @@ import functools import logging +import math import os import random import select @@ -89,9 +90,11 @@ def __init__(self, auxiliary=False): self.wakeup_unblock = False self.wakeup_handle = False self.cancel_task = False + self.unblocked_count = 0 self.ignored_task_ids = [] self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL + self.false_alarms = 0 self.auxiliary = auxiliary self.task = None @@ -158,6 +161,14 @@ def _signal_handler(self, thesignal, frame): self.shutdown_requested = True def _pg_notify_handler(self, notification): + if notification.channel == "pulp_worker_broadcast": + key, value = notification.payload.split(":", maxsplit=1) + _logger.debug("broadcast message recieved: %s: %s", key, value) + if key == "unblocked_count": + self.unblocked_count = int(value) + self.wakeup_handle = self.unblocked_count > 0 + elif key == "metrics_heartbeat": + self.last_metric_heartbeat = datetime.fromisoformat(key) if notification.channel == "pulp_worker_wakeup": if notification.payload == TASK_WAKEUP_UNBLOCK: # Auxiliary workers don't do this. @@ -171,6 +182,7 @@ def _pg_notify_handler(self, notification): self.wakeup_handle = True elif notification.channel == "pulp_worker_metrics_heartbeat": + # TODO (in one of the next releases) Remove that superseeded channel. self.last_metric_heartbeat = datetime.fromisoformat(notification.payload) elif self.task and notification.channel == "pulp_worker_cancel": if notification.payload == str(self.task.pk): @@ -257,6 +269,7 @@ def record_unblocked_waiting_tasks_metric(self, now): ) self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'") + self.broadcast("metrics_heartbeat", now) def beat(self): now = timezone.now() @@ -278,6 +291,9 @@ def beat(self): if self.otel_enabled and now > self.last_metric_heartbeat + self.heartbeat_period: self.record_unblocked_waiting_tasks_metric(now) + def broadcast(self, key, value): + self.cursor.execute("SELECT pg_notify('pulp_worker_broadcast', %s)", (f"{key}:{value}",)) + def notify_workers(self, reason): self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,)) @@ -290,21 +306,13 @@ def cancel_abandoned_task(self, task, final_state, reason=None): # A task is considered abandoned when in running state, but no worker holds its lock domain = task.pulp_domain task.set_canceling() - if reason: - _logger.info( - "Cleaning up task %s in domain: %s and marking as %s. Reason: %s", - task.pk, - domain.name, - final_state, - reason, - ) - else: - _logger.info( - _("Cleaning up task %s in domain: %s and marking as %s."), - task.pk, - domain.name, - final_state, - ) + _logger.info( + "Cleaning up task %s in domain: %s and marking as %s. Reason: %s", + task.pk, + domain.name, + final_state, + reason or "unknown", + ) delete_incomplete_resources(task) task.set_canceled(final_state=final_state, reason=reason) if task.reserved_resources_record: @@ -345,14 +353,17 @@ def unblock_tasks(self): self.wakeup_unblock = False result = self._unblock_tasks() - if result is not None and ( - Task.objects.filter( - state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None + if result is not None: + unblocked_count = ( + Task.objects.filter( + state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None + ) + .exclude(unblocked_at=None) + .count() ) - .exclude(unblocked_at=None) - .exists() - ): - self.notify_workers(TASK_WAKEUP_HANDLE) + if unblocked_count > 0: + self.notify_workers(TASK_WAKEUP_HANDLE) + self.broadcast("unblocked_count", unblocked_count) return True return result @@ -369,6 +380,7 @@ def _unblock_tasks(self): .order_by("pulp_created") .select_related("pulp_domain") ): + _logger.debug("Considering task %s for unblocking.", task.pk) reserved_resources_record = task.reserved_resources_record or [] exclusive_resources = [ resource @@ -389,23 +401,26 @@ def _unblock_tasks(self): ) task.unblock() - elif ( - task.state == TASK_STATES.WAITING - and task.unblocked_at is None - # No exclusive resource taken? - and not any( - resource in taken_exclusive_resources or resource in taken_shared_resources - for resource in exclusive_resources - ) - # No shared resource exclusively taken? - and not any(resource in taken_exclusive_resources for resource in shared_resources) - ): - _logger.debug( - "Marking waiting task %s in domain: %s unblocked.", - task.pk, - task.pulp_domain.name, - ) - task.unblock() + elif task.state == TASK_STATES.WAITING and task.unblocked_at is None: + if ( + # No exclusive resource taken? + not any( + resource in taken_exclusive_resources or resource in taken_shared_resources + for resource in exclusive_resources + ) + # No shared resource exclusively taken? + and not any( + resource in taken_exclusive_resources for resource in shared_resources + ) + ): + _logger.debug( + "Marking waiting task %s in domain: %s unblocked.", + task.pk, + task.pulp_domain.name, + ) + task.unblock() + else: + _logger.debug("Task %s is still blocked.", task.pk) elif task.state == TASK_STATES.RUNNING and task.unblocked_at is None: # This should not happen in normal operation. # And it is only an issue if the worker running that task died, because it will @@ -426,8 +441,8 @@ def _unblock_tasks(self): def sleep(self): """Wait for signals on the wakeup channel while heart beating.""" - _logger.debug(_("Worker %s entering sleep state."), self.name) - while not self.shutdown_requested and not self.wakeup_handle: + _logger.debug("Worker %s entering sleep state.", self.name) + while not self.shutdown_requested: r, w, x = select.select( [self.sentinel, connection.connection], [], @@ -441,7 +456,14 @@ def sleep(self): self.unblock_tasks() if self.sentinel in r: os.read(self.sentinel, 256) - _logger.debug(_("Worker %s leaving sleep state."), self.name) + if self.wakeup_handle: + if not self.auxiliary or random.random() < math.exp( + self.unblocked_count - self.false_alarms + ): + _logger.debug("Worker %s leaving sleep state.", self.name) + break + else: + _logger.debug("Worker %s backing off", self.name) def supervise_task(self, task): """Call and supervise the task process while heart beating. @@ -586,8 +608,12 @@ def handle_unblocked_tasks(self): task = self.fetch_task() if task is None: # No task found + self.false_alarms += 1 + _logger.debug("False Alarms: %s", self.false_alarms) break try: + self.false_alarms //= 2 + _logger.debug("False Alarms: %s", self.false_alarms) if task.state == TASK_STATES.CANCELING: # No worker picked this task up before being canceled. # Or the worker disappeared before handling the canceling. @@ -615,6 +641,7 @@ def run(self, burst=False): signal.signal(signal.SIGHUP, self._signal_handler) # Subscribe to pgsql channels connection.connection.add_notify_handler(self._pg_notify_handler) + self.cursor.execute("LISTEN pulp_worker_broadcast") self.cursor.execute("LISTEN pulp_worker_cancel") self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat") if burst: @@ -638,4 +665,5 @@ def run(self, burst=False): self.cursor.execute("UNLISTEN pulp_worker_wakeup") self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat") self.cursor.execute("UNLISTEN pulp_worker_cancel") + self.cursor.execute("UNLISTEN pulp_worker_broadcast") self.shutdown()