Skip to content

Commit

Permalink
Added QStash scheduling for django-qstash tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
jmitchel3 committed Jan 2, 2025
1 parent ea70e8e commit 936103c
Show file tree
Hide file tree
Showing 35 changed files with 1,087 additions and 30 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ source = [
]
omit = [
"*/migrations/*",
"*/admin.py",
"tests/*",
]

[tool.coverage.paths]
Expand Down
4 changes: 3 additions & 1 deletion rav.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ scripts:
install:
- venv/bin/python3 -m pip install -e .
test:
- venv/bin/python3 -m pytest
- venv/bin/tox run -f py312
- venv/bin/coverage combine
- venv/bin/coverage report
sample_server:
- cd sample_project && ../venv/bin/python manage.py runserver 8133
sample_shell:
Expand Down
4 changes: 2 additions & 2 deletions sample_project/cfehome/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@

DEBUG = config("DJANGO_DEBUG", default=False, cast=bool)

SECRET_KEY = get_random_secret_key()

SECRET_KEY = config("DJANGO_SECRET_KEY", default=get_random_secret_key())
# required by qstash-py
QSTASH_TOKEN = config("QSTASH_TOKEN")
QSTASH_CURRENT_SIGNING_KEY = config("QSTASH_CURRENT_SIGNING_KEY")
Expand Down Expand Up @@ -44,6 +43,7 @@
"django.contrib.staticfiles",
"django_qstash",
"django_qstash.results",
"django_qstash.schedules",
]

MIDDLEWARE = [
Expand Down
2 changes: 1 addition & 1 deletion sample_project/cfehome/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""


@shared_task
@shared_task(name="Math adder")
def math_add_task(a, b, *args, **kwargs):
logger.info("Adding %s and %s", a, b)
with open("test.txt", "w") as f:
Expand Down
2 changes: 1 addition & 1 deletion sample_project/test.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
12 + 454 = 466
123 + 123 = 246
15 changes: 15 additions & 0 deletions src/django_qstash/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from django_qstash.settings import DJANGO_QSTASH_DOMAIN
from django_qstash.settings import DJANGO_QSTASH_WEBHOOK_PATH


def get_callback_url() -> str:
"""
Get the callback URL based on the settings.
"""
callback_domain = DJANGO_QSTASH_DOMAIN.rstrip("/")
if not callback_domain.startswith(("http://", "https://")):
callback_domain = f"https://{callback_domain}"
webhook_path = DJANGO_QSTASH_WEBHOOK_PATH.strip("/")
return f"{callback_domain}/{webhook_path}/"
7 changes: 7 additions & 0 deletions src/django_qstash/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from __future__ import annotations

from qstash import QStash

from django_qstash.settings import QSTASH_TOKEN

qstash_client = QStash(QSTASH_TOKEN)
Empty file.
38 changes: 38 additions & 0 deletions src/django_qstash/discovery/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

from django import forms

from django_qstash.discovery.utils import discover_tasks
from django_qstash.discovery.validators import task_exists_validator


class TaskChoiceField(forms.ChoiceField):
"""
A form field that provides choices from discovered QStash tasks
"""

def __init__(self, *args, **kwargs):
# Remove max_length if it's present since ChoiceField doesn't use it
kwargs.pop("max_length", None)

# Get tasks before calling parent to set choices
tasks = discover_tasks()

# Convert tasks to choices using (task_name, task_name) format
task_choices = [(task_value, task_label) for task_value, task_label in tasks]

kwargs["choices"] = task_choices
kwargs["validators"] = [task_exists_validator] + kwargs.get("validators", [])
super().__init__(*args, **kwargs)

def get_task(self):
"""
Returns the actual task dot notation path for the selected value
"""
if self.data:
tasks = discover_tasks()

for task_value, task_label in tasks:
if task_label == self.data:
return task_value
return None
25 changes: 25 additions & 0 deletions src/django_qstash/discovery/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

from django.db import models

from django_qstash.discovery.fields import TaskChoiceField


class TaskField(models.CharField):
"""
A model field for storing QStash task references
"""

def __init__(self, *args, **kwargs):
# Set a reasonable max_length for task names if not provided
if "max_length" not in kwargs:
kwargs["max_length"] = 255
super().__init__(*args, **kwargs)

def formfield(self, **kwargs):
# Use our custom form field
defaults = {
"form_class": TaskChoiceField,
}
defaults.update(kwargs)
return super().formfield(**defaults)
93 changes: 93 additions & 0 deletions src/django_qstash/discovery/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from __future__ import annotations

import logging
import os
import warnings
from functools import lru_cache
from importlib import import_module

from django.apps import apps
from django.conf import settings
from django.core.signals import request_started
from django.utils.module_loading import module_has_submodule

logger = logging.getLogger(__name__)

DJANGO_QSTASH_DISCOVER_INCLUDE_SETTINGS_DIR = getattr(
settings, "DJANGO_QSTASH_DISCOVER_INCLUDE_SETTINGS_DIR", True
)


@lru_cache(maxsize=None)
def discover_tasks() -> list[tuple[str, str]]:
"""
Automatically discover tasks in Django apps and return them as a list of tuples.
Each tuple contains (dot_notation_path, task_name).
If no custom task name is specified, both values will be the dot notation path.
Returns:
List of tuples: [(dot_notation_path, task_name), ...]
Example: [
('example_app.tasks.my_task', 'example_app.tasks.my_task'),
('other_app.tasks.custom_task', 'special_name')
]
"""
from django_qstash.tasks import QStashTask

discovered_tasks = []
packages = []

# Add Django apps that contain tasks.py
for app_config in apps.get_app_configs():
if module_has_submodule(app_config.module, "tasks"):
packages.append(app_config.name)

# Add the directory containing settings.py if it has a tasks.py module
if DJANGO_QSTASH_DISCOVER_INCLUDE_SETTINGS_DIR:
settings_module = os.environ.get("DJANGO_SETTINGS_MODULE", "")
if settings_module:
settings_package = settings_module.rsplit(".", 1)[0]
try:
settings_module_obj = import_module(settings_package)
if module_has_submodule(settings_module_obj, "tasks"):
packages.append(settings_package)
except ImportError:
warnings.warn(
f"Could not import settings package {settings_package} for task discovery",
RuntimeWarning,
stacklevel=2,
)

# Rest of the discovery logic
for package in packages:
try:
tasks_module = import_module(f"{package}.tasks")
# Find all attributes that are QstashTask instances
for attr_name in dir(tasks_module):
attr = getattr(tasks_module, attr_name)

if isinstance(attr, QStashTask):
value = f"{package}.tasks.{attr_name}"
if attr.name == attr_name:
label = value
else:
label = f"{attr.name} ({value})"
discovered_tasks.append((value, label))
except Exception as e:
warnings.warn(
f"Failed to import tasks from {package}: {str(e)}",
RuntimeWarning,
stacklevel=2,
)
return discovered_tasks


def clear_discover_tasks_cache(sender, **kwargs):
logger.info("Clearing Django QStash discovered tasks cache")
discover_tasks.cache_clear()


request_started.connect(
clear_discover_tasks_cache,
dispatch_uid="clear_django_qstash_discovered_tasks_cache",
)
24 changes: 24 additions & 0 deletions src/django_qstash/discovery/validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from django.core.exceptions import ValidationError

from django_qstash.discovery.utils import discover_tasks


def task_exists_validator(task_name):
"""
Validates that a task name exists in the discovered tasks
Args:
task_name: The name of the task to validate
Raises:
ValidationError: If the task cannot be found
"""
tasks = discover_tasks()
available_tasks = [task[0] for task in tasks]

if task_name not in available_tasks:
raise ValidationError(
f"Task '{task_name}' not found. Available tasks: {', '.join(available_tasks)}"
)
112 changes: 112 additions & 0 deletions src/django_qstash/management/commands/task_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

import json
import logging

from django.apps import apps
from django.core.management.base import BaseCommand
from django.db import models

from django_qstash.schedules.client import QStashScheduleClient

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Management command to list and sync QStash schedules."""

help = "List and sync schedules from QStash"

def add_arguments(self, parser) -> None:
parser.add_argument(
"--list",
action="store_true",
help="List schedules from QStash",
)
parser.add_argument(
"--sync",
action="store_true",
help="Sync schedules from QStash to local database",
)

def get_task_schedule_model(self) -> models.Model | None:
"""Get the TaskSchedule model if available."""
try:
return apps.get_model("django_qstash_schedules", "TaskSchedule")
except LookupError:
self.stdout.write(
self.style.ERROR(
"Django QStash Schedules not installed.\n"
"Add `django_qstash.schedules` to INSTALLED_APPS and run migrations."
)
)
return None

def sync_schedules(self, schedules: list) -> None:
"""Sync remote schedules to local database."""
TaskSchedule = self.get_task_schedule_model()
if not TaskSchedule:
return

for schedule in schedules:
try:
body = json.loads(schedule.body)
task_name = body.get("task_name", "Unnamed Task")
function = f"{body['module']}.{body['function']}"

obj, created = TaskSchedule.objects.update_or_create(
schedule_id=schedule.schedule_id,
defaults={
"name": task_name,
"task": function,
"cron": schedule.cron,
"args": body.get("args", []),
"kwargs": body.get("kwargs", {}),
},
)
status = "Created" if created else "Updated"
logger.info(
"%s schedule: %s (%s)", status, task_name, schedule.schedule_id
)
except Exception:
logger.exception("Failed to sync schedule %s", schedule.schedule_id)

def handle(self, *args, **options) -> None:
if not (options.get("sync") or options.get("list")):
self.stdout.write(
self.style.ERROR("Please specify either --list or --sync option")
)
return

try:
client = QStashScheduleClient()
destination = client._get_callback_url()
schedules = client.list_schedules()

self.stdout.write(
self.style.SUCCESS(
f"Found {len(schedules)} remote schedules based on destination: {destination}"
)
)

for schedule in schedules:
body = json.loads(schedule.body)
task_name = body.get("task_name", "Unnamed Task")
function = f"{body['module']}.{body['function']}"

self.stdout.write(
f"\nSchedule ID: {schedule.schedule_id}"
f"\n Task: {task_name} ({function})"
f"\n Cron: {schedule.cron}"
f"\n Destination: {schedule.destination}"
f"\n Retries: {schedule.retries}"
f"\n Status: {'Paused' if schedule.paused else 'Active'}"
)

if options.get("sync"):
user_input = input("Do you want to sync remote schedules? (y/n): ")
if user_input.lower() == "y":
self.sync_schedules(schedules)

except Exception as e:
self.stdout.write(self.style.ERROR(f"An error occurred: {str(e)}"))
Empty file.
Loading

0 comments on commit 936103c

Please sign in to comment.