Skip to content

Commit

Permalink
Merge branch 'job-results' of 'https://github.com/jjmerchante/grimoir…
Browse files Browse the repository at this point in the history
  • Loading branch information
sduenas authored Aug 6, 2024
2 parents ae02292 + 9f8c9fe commit 51f29eb
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 24 deletions.
2 changes: 1 addition & 1 deletion docker/worker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ set -e
export SORTINGHAT_CONFIG=sortinghat.config.settings

# Build the command to run
set - sortinghatw
set - sortinghatw "$@"

# Run the worker
exec "$@"
8 changes: 8 additions & 0 deletions releases/unreleased/job-results-in-sortinghat.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
title: Job results in SortingHat
category: fixed
author: Jose Javier Merchante <[email protected]>
issue: null
notes: >
Fixed a bug that caused jobs to be missing in SortingHat.
Job results will be kept in SortingHat for one week.
43 changes: 28 additions & 15 deletions sortinghat/core/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@


MAX_CHUNK_SIZE = 2000
DEFAULT_JOB_RESULT_TTL = 60 * 60 * 24 * 7 # seconds


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -104,21 +105,31 @@ def job_in_tenant(job, tenant):
logger.debug("Retrieving list of jobs ...")

queue = get_tenant_queue(tenant)
started_jobs = [find_job(id, tenant)
for id
in queue.started_job_registry.get_job_ids()]
deferred_jobs = [find_job(id, tenant)
for id
in queue.deferred_job_registry.get_job_ids()]
finished_jobs = [find_job(id, tenant)
for id
in queue.finished_job_registry.get_job_ids()]
failed_jobs = [find_job(id, tenant)
for id
in queue.failed_job_registry.get_job_ids()]
scheduled_jobs = [find_job(id, tenant)
for id
in queue.scheduled_job_registry.get_job_ids()]
started_jobs = django_rq.utils.get_jobs(
queue,
queue.started_job_registry.get_job_ids(),
queue.started_job_registry
)
deferred_jobs = django_rq.utils.get_jobs(
queue,
queue.deferred_job_registry.get_job_ids(),
queue.deferred_job_registry
)
finished_jobs = django_rq.utils.get_jobs(
queue,
queue.finished_job_registry.get_job_ids(),
queue.finished_job_registry
)
failed_jobs = django_rq.utils.get_jobs(
queue,
queue.failed_job_registry.get_job_ids(),
queue.failed_job_registry
)
scheduled_jobs = django_rq.utils.get_jobs(
queue,
queue.scheduled_job_registry.get_job_ids(),
queue.scheduled_job_registry
)
jobs = (queue.jobs + started_jobs + deferred_jobs + finished_jobs + failed_jobs + scheduled_jobs)
jobs = (job for job in jobs if job_in_tenant(job, tenant))

Expand Down Expand Up @@ -856,6 +867,8 @@ def schedule_task(ctx, fn, task, scheduled_datetime=None, **kwargs):
on_success=on_success_job,
on_failure=on_failed_job,
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL,
**kwargs)
task.scheduled_datetime = scheduled_datetime
task.job_id = job.id
Expand Down
30 changes: 22 additions & 8 deletions sortinghat/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@


DEFAULT_IMPORT_IDENTITIES_INTERVAL = 60 * 24 * 7 # minutes
DEFAULT_JOB_RESULT_TTL = 60 * 60 * 24 * 7 # seconds


@convert_django_field.register(JSONField)
Expand Down Expand Up @@ -1109,7 +1110,9 @@ def mutate(self, info, uuids=None, last_modified=MIN_PERIOD_DATE):

job = get_tenant_queue(tenant).enqueue(recommend_affiliations,
ctx, uuids, last_modified,
job_timeout=-1)
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL)

return RecommendAffiliations(
job_id=job.id
Expand Down Expand Up @@ -1154,7 +1157,9 @@ def mutate(self, info, criteria,
strict,
match_source,
last_modified,
job_timeout=-1)
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL)

return RecommendMatches(
job_id=job.id
Expand All @@ -1179,7 +1184,9 @@ def mutate(self, info, uuids=None, exclude=True, no_strict_matching=False):
job = get_tenant_queue(tenant).enqueue(recommend_gender,
ctx, uuids,
exclude, no_strict_matching,
job_timeout=-1)
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL)

return RecommendGender(
job_id=job.id
Expand All @@ -1203,7 +1210,9 @@ def mutate(self, info, uuids=None, last_modified=MIN_PERIOD_DATE):

job = get_tenant_queue(tenant).enqueue(affiliate, ctx, uuids,
last_modified,
job_timeout=-1)
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL)

return Affiliate(
job_id=job.id
Expand Down Expand Up @@ -1247,7 +1256,9 @@ def mutate(self, info, criteria,
strict,
match_source,
last_modified,
job_timeout=-1)
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL)

return Unify(
job_id=job.id
Expand All @@ -1271,7 +1282,9 @@ def mutate(self, info, uuids=None, exclude=True, no_strict_matching=False):

job = get_tenant_queue(tenant).enqueue(genderize, ctx, uuids,
exclude, no_strict_matching,
job_timeout=-1)
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL)

return Genderize(
job_id=job.id
Expand Down Expand Up @@ -1431,7 +1444,9 @@ def mutate(self, info, backend, url, params=None):

job = get_tenant_queue(tenant).enqueue(import_identities, ctx,
backend, url, params,
job_timeout=-1)
job_timeout=-1,
result_ttl=DEFAULT_JOB_RESULT_TTL,
failure_ttl=DEFAULT_JOB_RESULT_TTL)

return ImportIdentities(
job_id=job.id
Expand Down Expand Up @@ -1529,7 +1544,6 @@ def mutate(self, info, task_id, data):
new_dt = datetime.datetime.now(datetime.timezone.utc) \
+ datetime.timedelta(minutes=task.interval)
if task.scheduled_datetime and task.scheduled_datetime > new_dt:
find_job(task.job_id, tenant)
if task.job_id:
job = find_job(task.job_id, tenant)
if job:
Expand Down

0 comments on commit 51f29eb

Please sign in to comment.