Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions label_studio/core/migration_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from core.redis import start_job_async_or_sync
from django.conf import settings
from django.db import connection
from rq import Retry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -81,6 +82,7 @@ def forwards(apps, schema_editor): # noqa: ARG001
sql=sql_forwards,
apply_on_sqlite=apply_on_sqlite,
reverse=False,
retry=Retry(max=3, interval=[60, 300, 1800]),
)
else:
AsyncMigrationStatus = apps.get_model('core', 'AsyncMigrationStatus')
Expand All @@ -96,6 +98,7 @@ def backwards(apps, schema_editor): # noqa: ARG001
sql=sql_backwards,
apply_on_sqlite=apply_on_sqlite,
reverse=True,
retry=Retry(max=3, interval=[60, 300, 1800]),
)

return forwards, backwards
10 changes: 10 additions & 0 deletions label_studio/core/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs):
:param job: Job function
:param args: Function arguments
:param in_seconds: Job will be delayed for in_seconds
:param retry: RQ Retry object or int (max retries). Only used in async mode.
:param kwargs: Function keywords arguments
:return: Job or function result
"""
from rq import Retry

redis = redis_connected() and kwargs.get('redis', True)
queue_name = kwargs.get('queue_name', 'default')
Expand All @@ -173,6 +175,13 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs):
job_timeout = kwargs['job_timeout']
del kwargs['job_timeout']

retry = None
if 'retry' in kwargs:
retry = kwargs['retry']
del kwargs['retry']
if isinstance(retry, int):
retry = Retry(max=retry)

if redis:
# Async execution with Redis - wrap job for context management
try:
Expand Down Expand Up @@ -202,6 +211,7 @@ def start_job_async_or_sync(job, *args, in_seconds=0, **kwargs):
**kwargs,
job_timeout=job_timeout,
failure_ttl=settings.RQ_FAILED_JOB_TTL,
retry=retry,
)
return job
else:
Expand Down
Loading