Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/+worker_backoff.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added backing off on auxiliary workers if wrongly alarmed on pending tasks.
1 change: 0 additions & 1 deletion pulpcore/tasking/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def worker(
auxiliary,
):
"""A Pulp worker."""

if reload:
try:
import hupper
Expand Down
112 changes: 70 additions & 42 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import functools
import logging
import math
import os
import random
import select
Expand Down Expand Up @@ -89,9 +90,11 @@ def __init__(self, auxiliary=False):
self.wakeup_unblock = False
self.wakeup_handle = False
self.cancel_task = False
self.unblocked_count = 0

self.ignored_task_ids = []
self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL
self.false_alarms = 0

self.auxiliary = auxiliary
self.task = None
Expand Down Expand Up @@ -158,6 +161,14 @@ def _signal_handler(self, thesignal, frame):
self.shutdown_requested = True

def _pg_notify_handler(self, notification):
if notification.channel == "pulp_worker_broadcast":
key, value = notification.payload.split(":", maxsplit=1)
_logger.debug("broadcast message recieved: %s: %s", key, value)
if key == "unblocked_count":
self.unblocked_count = int(value)
self.wakeup_handle = self.unblocked_count > 0
elif key == "metrics_heartbeat":
self.last_metric_heartbeat = datetime.fromisoformat(key)
if notification.channel == "pulp_worker_wakeup":
if notification.payload == TASK_WAKEUP_UNBLOCK:
# Auxiliary workers don't do this.
Expand All @@ -171,6 +182,7 @@ def _pg_notify_handler(self, notification):
self.wakeup_handle = True

elif notification.channel == "pulp_worker_metrics_heartbeat":
# TODO (in one of the next releases) Remove that superseeded channel.
self.last_metric_heartbeat = datetime.fromisoformat(notification.payload)
elif self.task and notification.channel == "pulp_worker_cancel":
if notification.payload == str(self.task.pk):
Expand Down Expand Up @@ -257,6 +269,7 @@ def record_unblocked_waiting_tasks_metric(self, now):
)

self.cursor.execute(f"NOTIFY pulp_worker_metrics_heartbeat, '{str(now)}'")
self.broadcast("metrics_heartbeat", now)

def beat(self):
now = timezone.now()
Expand All @@ -278,6 +291,9 @@ def beat(self):
if self.otel_enabled and now > self.last_metric_heartbeat + self.heartbeat_period:
self.record_unblocked_waiting_tasks_metric(now)

def broadcast(self, key, value):
self.cursor.execute("SELECT pg_notify('pulp_worker_broadcast', %s)", (f"{key}:{value}",))

def notify_workers(self, reason):
self.cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))

Expand All @@ -290,21 +306,13 @@ def cancel_abandoned_task(self, task, final_state, reason=None):
# A task is considered abandoned when in running state, but no worker holds its lock
domain = task.pulp_domain
task.set_canceling()
if reason:
_logger.info(
"Cleaning up task %s in domain: %s and marking as %s. Reason: %s",
task.pk,
domain.name,
final_state,
reason,
)
else:
_logger.info(
_("Cleaning up task %s in domain: %s and marking as %s."),
task.pk,
domain.name,
final_state,
)
_logger.info(
"Cleaning up task %s in domain: %s and marking as %s. Reason: %s",
task.pk,
domain.name,
final_state,
reason or "unknown",
)
delete_incomplete_resources(task)
task.set_canceled(final_state=final_state, reason=reason)
if task.reserved_resources_record:
Expand Down Expand Up @@ -345,14 +353,17 @@ def unblock_tasks(self):

self.wakeup_unblock = False
result = self._unblock_tasks()
if result is not None and (
Task.objects.filter(
state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None
if result is not None:
unblocked_count = (
Task.objects.filter(
state__in=[TASK_STATES.WAITING, TASK_STATES.CANCELING], app_lock=None
)
.exclude(unblocked_at=None)
.count()
)
.exclude(unblocked_at=None)
.exists()
):
self.notify_workers(TASK_WAKEUP_HANDLE)
if unblocked_count > 0:
self.notify_workers(TASK_WAKEUP_HANDLE)
self.broadcast("unblocked_count", unblocked_count)
return True

return result
Expand All @@ -369,6 +380,7 @@ def _unblock_tasks(self):
.order_by("pulp_created")
.select_related("pulp_domain")
):
_logger.debug("Considering task %s for unblocking.", task.pk)
reserved_resources_record = task.reserved_resources_record or []
exclusive_resources = [
resource
Expand All @@ -389,23 +401,26 @@ def _unblock_tasks(self):
)
task.unblock()

elif (
task.state == TASK_STATES.WAITING
and task.unblocked_at is None
# No exclusive resource taken?
and not any(
resource in taken_exclusive_resources or resource in taken_shared_resources
for resource in exclusive_resources
)
# No shared resource exclusively taken?
and not any(resource in taken_exclusive_resources for resource in shared_resources)
):
_logger.debug(
"Marking waiting task %s in domain: %s unblocked.",
task.pk,
task.pulp_domain.name,
)
task.unblock()
elif task.state == TASK_STATES.WAITING and task.unblocked_at is None:
if (
# No exclusive resource taken?
not any(
resource in taken_exclusive_resources or resource in taken_shared_resources
for resource in exclusive_resources
)
# No shared resource exclusively taken?
and not any(
resource in taken_exclusive_resources for resource in shared_resources
)
):
_logger.debug(
"Marking waiting task %s in domain: %s unblocked.",
task.pk,
task.pulp_domain.name,
)
task.unblock()
else:
_logger.debug("Task %s is still blocked.", task.pk)
elif task.state == TASK_STATES.RUNNING and task.unblocked_at is None:
# This should not happen in normal operation.
# And it is only an issue if the worker running that task died, because it will
Expand All @@ -426,8 +441,8 @@ def _unblock_tasks(self):
def sleep(self):
"""Wait for signals on the wakeup channel while heart beating."""

_logger.debug(_("Worker %s entering sleep state."), self.name)
while not self.shutdown_requested and not self.wakeup_handle:
_logger.debug("Worker %s entering sleep state.", self.name)
while not self.shutdown_requested:
r, w, x = select.select(
[self.sentinel, connection.connection],
[],
Expand All @@ -441,7 +456,14 @@ def sleep(self):
self.unblock_tasks()
if self.sentinel in r:
os.read(self.sentinel, 256)
_logger.debug(_("Worker %s leaving sleep state."), self.name)
if self.wakeup_handle:
if not self.auxiliary or random.random() < math.exp(
self.unblocked_count - self.false_alarms
):
_logger.debug("Worker %s leaving sleep state.", self.name)
break
else:
_logger.debug("Worker %s backing off", self.name)

def supervise_task(self, task):
"""Call and supervise the task process while heart beating.
Expand Down Expand Up @@ -586,8 +608,12 @@ def handle_unblocked_tasks(self):
task = self.fetch_task()
if task is None:
# No task found
self.false_alarms += 1
_logger.debug("False Alarms: %s", self.false_alarms)
break
try:
self.false_alarms //= 2
_logger.debug("False Alarms: %s", self.false_alarms)
if task.state == TASK_STATES.CANCELING:
# No worker picked this task up before being canceled.
# Or the worker disappeared before handling the canceling.
Expand Down Expand Up @@ -615,6 +641,7 @@ def run(self, burst=False):
signal.signal(signal.SIGHUP, self._signal_handler)
# Subscribe to pgsql channels
connection.connection.add_notify_handler(self._pg_notify_handler)
self.cursor.execute("LISTEN pulp_worker_broadcast")
self.cursor.execute("LISTEN pulp_worker_cancel")
self.cursor.execute("LISTEN pulp_worker_metrics_heartbeat")
if burst:
Expand All @@ -638,4 +665,5 @@ def run(self, burst=False):
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
self.cursor.execute("UNLISTEN pulp_worker_cancel")
self.cursor.execute("UNLISTEN pulp_worker_broadcast")
self.shutdown()