Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class Task(BaseModel, AutoAddObjPermsMixin):

The transitions to CANCELING (marked with *) are the only ones allowed to happen without
holding the tasks advisory lock. Canceling is meant to be initiated asyncronously by a sparate
process before signalling the worker via Postgres LISTEN.
process before signalling the worker via a pubsub notification (e.g, Postgres LISTEN).

Fields:

Expand Down
7 changes: 7 additions & 0 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
TASK_WAKEUP_UNBLOCK = "unblock"
TASK_WAKEUP_HANDLE = "handle"

#: All valid tasking pubsub channels
TASK_PUBSUB = SimpleNamespace(
WAKEUP_WORKER="pulp_worker_wakeup",
CANCEL_TASK="pulp_worker_cancel",
WORKER_METRICS="pulp_worker_metrics_heartbeat",
)

#: All valid task states.
TASK_STATES = SimpleNamespace(
WAITING="waiting",
Expand Down
151 changes: 151 additions & 0 deletions pulpcore/tasking/pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from typing import NamedTuple
from pulpcore.constants import TASK_PUBSUB
import os
import logging
import select
from django.db import connection
from contextlib import suppress

logger = logging.getLogger(__name__)


class BasePubSubBackend:
# Utils
@classmethod
def wakeup_worker(cls, reason="unknown"):
cls.publish(TASK_PUBSUB.WAKEUP_WORKER, reason)

@classmethod
def cancel_task(cls, task_pk):
cls.publish(TASK_PUBSUB.CANCEL_TASK, str(task_pk))

@classmethod
def record_worker_metrics(cls, now):
cls.publish(TASK_PUBSUB.WORKER_METRICS, str(now))

# Interface
def subscribe(self, channel):
raise NotImplementedError()

def unsubscribe(self, channel):
raise NotImplementedError()

def get_subscriptions(self):
raise NotImplementedError()

@classmethod
def publish(cls, channel, payload=None):
raise NotImplementedError()

def fileno(self):
"""Add support for being used in select loop."""
raise NotImplementedError()

def fetch(self):
"""Fetch messages new message, if required."""
raise NotImplementedError()

def close(self):
raise NotImplementedError()


class PubsubMessage(NamedTuple):
channel: str
payload: str


def drain_non_blocking_fd(fd):
with suppress(BlockingIOError):
while True:
os.read(fd, 256)


class PostgresPubSub(BasePubSubBackend):
PID = os.getpid()

def __init__(self):
self._subscriptions = set()
self.message_buffer = []
# ensures a connection is initialized
with connection.cursor() as cursor:
cursor.execute("select 1")
self.backend_pid = connection.connection.info.backend_pid
self.sentinel_r, self.sentinel_w = os.pipe()
os.set_blocking(self.sentinel_r, False)
os.set_blocking(self.sentinel_w, False)
Comment on lines +73 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really related? I thought this os.pipe was primarily part of the select calls to receive the signals like SIGINT, SIGTERM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to make the semantics of the PubSub readiness tied to the data it actually stores, which is the self.message_buffers queue (not the connection buffer). If I just use connection.connection, test_pubsub:111 fails as the pg_notify_callback is called automatically on same-process connections and is empty at this point.

But I don't feel very good about how complex this looks. I think using notifies may make our lives easier

connection.connection.add_notify_handler(self._store_messages)

@classmethod
def _debug(cls, message):
logger.debug(f"[{cls.PID}] {message}")

def _store_messages(self, notification):
self.message_buffer.append(
PubsubMessage(channel=notification.channel, payload=notification.payload)
)
if notification.pid == self.backend_pid:
os.write(self.sentinel_w, b"1")
self._debug(f"Received message: {notification}")

@classmethod
def publish(cls, channel, payload=""):
query = (
(f"NOTIFY {channel}",)
if not payload
else ("SELECT pg_notify(%s, %s)", (channel, str(payload)))
)

with connection.cursor() as cursor:
cursor.execute(*query)
cls._debug(f"Sent message: ({channel}, {str(payload)})")

def subscribe(self, channel):
self._subscriptions.add(channel)
with connection.cursor() as cursor:
cursor.execute(f"LISTEN {channel}")

def unsubscribe(self, channel):
self._subscriptions.remove(channel)
for i in range(0, len(self.message_buffer), -1):
if self.message_buffer[i].channel == channel:
self.message_buffer.pop(i)
with connection.cursor() as cursor:
cursor.execute(f"UNLISTEN {channel}")

def get_subscriptions(self):
return self._subscriptions.copy()

def fileno(self) -> int:
# when pub/sub clients are the same, the notification callback may be called
# asynchronously, making select on connection miss new notifications
ready, _, _ = select.select([self.sentinel_r], [], [], 0)
if self.sentinel_r in ready:
return self.sentinel_r
return connection.connection.fileno()

def fetch(self) -> list[PubsubMessage]:
with connection.cursor() as cursor:
cursor.execute("SELECT 1").fetchone()
result = self.message_buffer.copy()
self.message_buffer.clear()
drain_non_blocking_fd(self.sentinel_r)
self._debug(f"Fetched messages: {result}")
return result

def close(self):
self.message_buffer.clear()
connection.connection.remove_notify_handler(self._store_messages)
drain_non_blocking_fd(self.sentinel_r)
os.close(self.sentinel_r)
os.close(self.sentinel_w)
for channel in self.get_subscriptions():
self.unsubscribe(channel)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


backend = PostgresPubSub
23 changes: 8 additions & 15 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
TASK_INCOMPLETE_STATES,
TASK_STATES,
IMMEDIATE_TIMEOUT,
TASK_WAKEUP_HANDLE,
TASK_WAKEUP_UNBLOCK,
)
from pulpcore.middleware import x_task_diagnostics_var
from pulpcore.tasking import pubsub
from pulpcore.tasking.kafka import send_task_notification

_logger = logging.getLogger(__name__)
Expand All @@ -50,12 +50,6 @@ def _validate_and_get_resources(resources):
return list(resource_set)


def wakeup_worker(reason):
# Notify workers
with connection.connection.cursor() as cursor:
cursor.execute("SELECT pg_notify('pulp_worker_wakeup', %s)", (reason,))


def execute_task(task):
# This extra stack is needed to isolate the current_task ContextVar
contextvars.copy_context().run(_execute_task, task)
Expand Down Expand Up @@ -257,7 +251,8 @@ def dispatch(
task.set_canceling()
task.set_canceled(TASK_STATES.CANCELED, "Resources temporarily unavailable.")
if send_wakeup_signal:
wakeup_worker(TASK_WAKEUP_UNBLOCK)
with pubsub.PostgresPubSub(connection) as pubsub_client:
pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK)
return task


Expand Down Expand Up @@ -297,7 +292,8 @@ async def adispatch(
task.set_canceling()
task.set_canceled(TASK_STATES.CANCELED, "Resources temporarily unavailable.")
if send_wakeup_signal:
await sync_to_async(wakeup_worker)(TASK_WAKEUP_UNBLOCK)
with pubsub.PostgresPubSub(connection) as pubsub_client:
pubsub_client.wakeup_worker(reason=TASK_WAKEUP_UNBLOCK)
return task


Expand Down Expand Up @@ -429,12 +425,9 @@ def cancel_task(task_id):

# This is the only valid transition without holding the task lock.
task.set_canceling()
# Notify the worker that might be running that task.
with connection.cursor() as cursor:
if task.app_lock is None:
wakeup_worker(TASK_WAKEUP_HANDLE)
else:
cursor.execute("SELECT pg_notify('pulp_worker_cancel', %s)", (str(task.pk),))
# Notify the worker that might be running that task and other workers to clean up
pubsub.backend.cancel_task(task_pk=task.pk)
pubsub.backend.wakeup_worker()
return task


Expand Down
Loading
Loading