From dfb443efa8a5d125171783355ba2f5bd6a291db9 Mon Sep 17 00:00:00 2001 From: Alex Morega Date: Mon, 23 Dec 2024 17:32:38 +0200 Subject: [PATCH 1/3] Delete and retry tasks through Django Admin --- django_tasks/backends/database/admin.py | 29 ++++++++++++++++++++---- django_tasks/backends/database/models.py | 13 ++++++++++- tests/tests/test_database_backend.py | 25 ++++++++++++++++++++ 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/django_tasks/backends/database/admin.py b/django_tasks/backends/database/admin.py index cacbcd6..3cb1d1e 100644 --- a/django_tasks/backends/database/admin.py +++ b/django_tasks/backends/database/admin.py @@ -1,11 +1,34 @@ from typing import List, Optional from django.contrib import admin +from django.db.models import QuerySet from django.http import HttpRequest +from django_tasks.task import ResultStatus + from .models import DBTaskResult +def reenqueue( + modeladmin: admin.ModelAdmin, + request: HttpRequest, + queryset: QuerySet[DBTaskResult], +) -> None: + tasks = queryset.update(status=ResultStatus.NEW) + modeladmin.message_user(request, f"Rescheduled {tasks} tasks.", "SUCCESS") + + +def duplicate( + modeladmin: admin.ModelAdmin, + request: HttpRequest, + queryset: QuerySet[DBTaskResult], +) -> None: + tasks = DBTaskResult.objects.bulk_create( + old_task.duplicate() for old_task in queryset + ) + modeladmin.message_user(request, f"Rescheduled {tasks} tasks.", "SUCCESS") + + @admin.register(DBTaskResult) class DBTaskResultAdmin(admin.ModelAdmin): list_display = ( @@ -20,17 +43,13 @@ class DBTaskResultAdmin(admin.ModelAdmin): ) list_filter = ("status", "priority", "queue_name") ordering = ["-enqueued_at"] + actions = [reenqueue, duplicate] def has_add_permission( self, request: HttpRequest, obj: Optional[DBTaskResult] = None ) -> bool: return False - def has_delete_permission( - self, request: HttpRequest, obj: Optional[DBTaskResult] = None - ) -> bool: - return False - def has_change_permission( self, request: HttpRequest, obj: Optional[DBTaskResult] = None ) -> bool: diff --git a/django_tasks/backends/database/models.py b/django_tasks/backends/database/models.py index e70c6af..9e1c881 100644 --- a/django_tasks/backends/database/models.py +++ b/django_tasks/backends/database/models.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar +from typing import TYPE_CHECKING, Any, Generic, Optional, Self, TypeVar import django from django.core.exceptions import SuspiciousOperation @@ -167,6 +167,7 @@ def task_result(self) -> "TaskResult[T]": object.__setattr__(task_result, "_exception_class", exception_class) object.__setattr__(task_result, "_traceback", self.traceback or None) + object.__setattr__(task_result, "_return_value", self.return_value) return task_result @@ -226,3 +227,13 @@ def set_failed(self, exc: BaseException) -> None: "traceback", ] ) + + def duplicate(self) -> Self: + return type(self)( + args_kwargs=self.args_kwargs, + priority=self.priority, + task_path=self.task_path, + queue_name=self.queue_name, + backend_name=self.backend_name, + run_after=self.run_after, + ) diff --git a/tests/tests/test_database_backend.py b/tests/tests/test_database_backend.py index 25237cc..fe8bec2 100644 --- a/tests/tests/test_database_backend.py +++ b/tests/tests/test_database_backend.py @@ -26,6 +26,7 @@ from django_tasks import ResultStatus, Task, default_task_backend, tasks from django_tasks.backends.database import DatabaseBackend +from django_tasks.backends.database.backend import TaskResult from django_tasks.backends.database.management.commands.prune_db_task_results import ( logger as prune_db_tasks_logger, ) @@ -757,6 +758,8 @@ def test_worker_with_locked_rows(self) -> None: } ) class DatabaseTaskResultTestCase(TransactionTestCase): + run_worker = partial(call_command, "db_worker", verbosity=0, batch=True, interval=0) + def execute_in_new_connection(self, sql: Union[str, QuerySet]) -> Sequence: if isinstance(sql, QuerySet): sql = str(sql.query) @@ -953,6 +956,28 @@ def test_get_locked_with_locked_rows(self) -> None: finally: new_connection.close() + def test_duplicate(self) -> None: + result_1 = cast(TaskResult, test_tasks.calculate_meaning_of_life.enqueue()) + db_result_1 = result_1.db_result + db_result_2 = result_1.db_result.duplicate() + db_result_2.save() + result_2 = db_result_2.task_result + + assert db_result_1.pk != db_result_2.pk + + call_command( + "db_worker", + verbosity=0, + batch=True, + interval=0, + startup_delay=False, + ) + + result_1.refresh() + result_2.refresh() + + assert result_1.return_value == result_2.return_value + class ConnectionExclusiveTranscationTestCase(TestCase): def setUp(self) -> None: From 5f51e99e272ac4f8ea801fc33f82e467c1e5a501 Mon Sep 17 00:00:00 2001 From: Alex Morega Date: Mon, 23 Dec 2024 17:43:55 +0200 Subject: [PATCH 2/3] Fix import of `Self` --- django_tasks/backends/database/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/django_tasks/backends/database/models.py b/django_tasks/backends/database/models.py index 9e1c881..4017ddc 100644 --- a/django_tasks/backends/database/models.py +++ b/django_tasks/backends/database/models.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import TYPE_CHECKING, Any, Generic, Optional, Self, TypeVar +from typing import TYPE_CHECKING, Any, Generic, Optional, TypeVar import django from django.core.exceptions import SuspiciousOperation @@ -10,7 +10,7 @@ from django.utils import timezone from django.utils.module_loading import import_string from django.utils.translation import gettext_lazy as _ -from typing_extensions import ParamSpec +from typing_extensions import ParamSpec, Self from django_tasks.task import ( DEFAULT_PRIORITY, From 6ee00edd0bbdcb00cc2ff569f194f63cd8381ec9 Mon Sep 17 00:00:00 2001 From: Alex Morega Date: Mon, 23 Dec 2024 18:12:56 +0200 Subject: [PATCH 3/3] Remove errant `run_worker` --- tests/tests/test_database_backend.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/tests/test_database_backend.py b/tests/tests/test_database_backend.py index fe8bec2..9938ab3 100644 --- a/tests/tests/test_database_backend.py +++ b/tests/tests/test_database_backend.py @@ -758,8 +758,6 @@ def test_worker_with_locked_rows(self) -> None: } ) class DatabaseTaskResultTestCase(TransactionTestCase): - run_worker = partial(call_command, "db_worker", verbosity=0, batch=True, interval=0) - def execute_in_new_connection(self, sql: Union[str, QuerySet]) -> Sequence: if isinstance(sql, QuerySet): sql = str(sql.query)