diff --git a/label_studio/core/migration_helpers.py b/label_studio/core/migration_helpers.py index 20a79372a3d1..caa2667bf2a8 100644 --- a/label_studio/core/migration_helpers.py +++ b/label_studio/core/migration_helpers.py @@ -4,6 +4,7 @@ from core.redis import start_job_async_or_sync from django.conf import settings from django.db import connection +from rq import Retry logger = logging.getLogger(__name__) @@ -81,6 +82,7 @@ def forwards(apps, schema_editor): # noqa: ARG001 sql=sql_forwards, apply_on_sqlite=apply_on_sqlite, reverse=False, + retry=Retry(max=3, interval=[60, 300, 1800]), ) else: AsyncMigrationStatus = apps.get_model('core', 'AsyncMigrationStatus') @@ -96,6 +98,7 @@ def backwards(apps, schema_editor): # noqa: ARG001 sql=sql_backwards, apply_on_sqlite=apply_on_sqlite, reverse=True, + retry=Retry(max=3, interval=[60, 300, 1800]), ) return forwards, backwards diff --git a/label_studio/core/redis.py b/label_studio/core/redis.py index bede33066222..b140da7ab885 100644 --- a/label_studio/core/redis.py +++ b/label_studio/core/redis.py @@ -156,9 +156,11 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs): :param job: Job function :param args: Function arguments :param in_seconds: Job will be delayed for in_seconds + :param retry: RQ Retry object or int (max retries). Only used in async mode. :param kwargs: Function keywords arguments :return: Job or function result """ + from rq import Retry redis = redis_connected() and kwargs.get('redis', True) queue_name = kwargs.get('queue_name', 'default') @@ -173,6 +175,13 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs): job_timeout = kwargs['job_timeout'] del kwargs['job_timeout'] + retry = None + if 'retry' in kwargs: + retry = kwargs['retry'] + del kwargs['retry'] + if isinstance(retry, int): + retry = Retry(max=retry) + if redis: # Async execution with Redis - wrap job for context management try: @@ -202,6 +211,7 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs): **kwargs, job_timeout=job_timeout, failure_ttl=settings.RQ_FAILED_JOB_TTL, + retry=retry, ) return job else: