Skip to content

Commit 33e3320

Browse files
committed
improve complete_job
1 parent c0e715b commit 33e3320

File tree

3 files changed

+107
-49
lines changed

3 files changed

+107
-49
lines changed

qiita_db/handlers/processing_job.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ def post(self, job_id):
146146
cmd, values_dict={'job_id': job_id,
147147
'payload': self.request.body.decode(
148148
'ascii')})
149-
job = qdb.processing_job.ProcessingJob.create(job.user, params)
149+
# complete_job are unique so it is fine to force them to be created
150+
job = qdb.processing_job.ProcessingJob.create(
151+
job.user, params, force=True)
150152
job.submit()
151153

152154
self.finish()

qiita_db/processing_job.py

+48-48
Original file line numberDiff line numberDiff line change
@@ -581,56 +581,56 @@ def create(cls, user, parameters, force=False):
581581
"""
582582
TTRN = qdb.sql_connection.TRN
583583
with TTRN:
584-
command = parameters.command
584+
if not force:
585+
command = parameters.command
585586

586-
# check if a job with the same parameters already exists
587-
sql = """SELECT processing_job_id, email, processing_job_status,
588-
COUNT(aopj.artifact_id)
589-
FROM qiita.processing_job
590-
LEFT JOIN qiita.processing_job_status
591-
USING (processing_job_status_id)
592-
LEFT JOIN qiita.artifact_output_processing_job aopj
593-
USING (processing_job_id)
594-
WHERE command_id = %s AND processing_job_status IN (
595-
'success', 'waiting', 'running', 'in_construction') {0}
596-
GROUP BY processing_job_id, email,
597-
processing_job_status"""
598-
599-
# we need to use ILIKE because of booleans as they can be
600-
# false or False
601-
params = []
602-
for k, v in parameters.values.items():
603-
# this is necessary in case we have an Iterable as a value
604-
# but that is string
605-
if isinstance(v, Iterable) and not isinstance(v, str):
606-
for vv in v:
607-
params.extend([k, str(vv)])
587+
# check if a job with the same parameters already exists
588+
sql = """SELECT processing_job_id, processing_job_status
589+
FROM qiita.processing_job
590+
LEFT JOIN qiita.processing_job_status
591+
USING (processing_job_status_id)
592+
LEFT JOIN qiita.artifact_output_processing_job aopj
593+
USING (processing_job_id)
594+
WHERE command_id = %s AND processing_job_status IN (
595+
'success', 'waiting', 'running', 'in_construction')
596+
{0}"""
597+
598+
# we need to use ILIKE because of booleans as they can be
599+
# false or False
600+
params = []
601+
for k, v in parameters.values.items():
602+
# this is necessary in case we have an Iterable as a value
603+
# but that is string
604+
if isinstance(v, Iterable) and not isinstance(v, str):
605+
for vv in v:
606+
params.extend([k, str(vv)])
607+
else:
608+
params.extend([k, str(v)])
609+
610+
if params:
611+
# divided by 2 as we have key-value pairs
612+
len_params = int(len(params)/2)
613+
sql = sql.format(' AND ' + ' AND '.join(
614+
["command_parameters->>%s = %s"] * len_params))
615+
params = [command.id] + params
616+
TTRN.add(sql, params)
608617
else:
609-
params.extend([k, str(v)])
610-
611-
if params:
612-
# divided by 2 as we have key-value pairs
613-
len_params = int(len(params)/2)
614-
sql = sql.format(' AND ' + ' AND '.join(
615-
["command_parameters->>%s ILIKE %s"] * len_params))
616-
params = [command.id] + params
617-
TTRN.add(sql, params)
618-
else:
619-
# the sql variable expects the list of parameters but if there
620-
# is no param we need to replace the {0} with an empty string
621-
TTRN.add(sql.format(""), [command.id])
622-
623-
# checking that if the job status is success, it has children
624-
# [2] status, [3] children count
625-
existing_jobs = [r for r in TTRN.execute_fetchindex()
626-
if r[2] != 'success' or r[3] > 0]
627-
if existing_jobs and not force:
628-
raise ValueError(
629-
'Cannot create job because the parameters are the same as '
630-
'jobs that are queued, running or already have '
631-
'succeeded:\n%s' % '\n'.join(
632-
["%s: %s" % (jid, status)
633-
for jid, _, status, _ in existing_jobs]))
618+
# the sql variable expects the list of parameters but if
619+
# there is no param we need to replace the {0} with an
620+
# empty string
621+
TTRN.add(sql.format(""), [command.id])
622+
623+
# checking that if the job status is success, it has children
624+
# [2] status, [3] children count
625+
existing_jobs = [r for r in TTRN.execute_fetchindex()
626+
if r[2] != 'success' or r[3] > 0]
627+
if existing_jobs:
628+
raise ValueError(
629+
'Cannot create job because the parameters are the '
630+
'same as jobs that are queued, running or already '
631+
'have succeeded:\n%s' % '\n'.join(
632+
["%s: %s" % (jid, status)
633+
for jid, _, status, _ in existing_jobs]))
634634

635635
sql = """INSERT INTO qiita.processing_job
636636
(email, command_id, command_parameters,

qiita_db/support_files/patches/93.sql

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
-- Oct 18, 2024
2+
-- ProcessingJob.create can take up to 52 seconds if creating a complete_job; mainly
3+
-- due to the number of jobs of this command and using json. The solution in the database
4+
-- is to convert to jsonb and index the values of the database
5+
6+
-- ### This are the stats before the change in a single example
7+
-- GroupAggregate (cost=67081.81..67081.83 rows=1 width=77) (actual time=51859.962..51862.637 rows=1 loops=1)
8+
-- Group Key: processing_job.processing_job_id, processing_job_status.processing_job_status
9+
-- -> Sort (cost=67081.81..67081.81 rows=1 width=77) (actual time=51859.952..51862.627 rows=1 loops=1)
10+
-- Sort Key: processing_job.processing_job_id, processing_job_status.processing_job_status
11+
-- Sort Method: quicksort Memory: 25kB
12+
-- -> Nested Loop Left Join (cost=4241.74..67081.80 rows=1 width=77) (actual time=51859.926..51862.604 rows=1 loops=1)
13+
-- -> Nested Loop (cost=4237.30..67069.64 rows=1 width=69) (actual time=51859.889..51862.566 rows=1 loops=1)
14+
-- Join Filter: (processing_job.processing_job_status_id = processing_job_status.processing_job_status_id)
15+
-- Rows Removed by Join Filter: 1
16+
-- -> Gather (cost=4237.30..67068.50 rows=1 width=45) (actual time=51859.846..51862.522 rows=1 loops=1)
17+
-- Workers Planned: 2
18+
-- Workers Launched: 2
19+
-- -> Parallel Bitmap Heap Scan on processing_job (cost=3237.30..66068.40 rows=1 width=45) (actual time=51785.317..51785.446 rows=0 loops=3)
20+
-- Recheck Cond: (command_id = 83)
21+
-- Filter: (((command_parameters ->> 'job_id'::text) ~~* '3432a908-f7b8-4e36-89fc-88f3310b84d5'::text) AND ((command_parameters ->> '
22+
-- payload'::text) ~~* '{"success": true, "error": "", "artifacts": {"alpha_diversity": {"artifact_type": "alpha_vector", "filepaths": [["/qmounts/qiita_test_data/tes
23+
-- tlocal/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity/alpha-diversity.tsv", "plain_text"], ["/qmounts/qiita_test_data/testloca
24+
-- l/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity.qza", "qza"]], "archive": {}}}}'::text))
25+
-- Rows Removed by Filter: 97315
26+
-- Heap Blocks: exact=20133
27+
-- -> Bitmap Index Scan on idx_processing_job_command_id (cost=0.00..3237.30 rows=294517 width=0) (actual time=41.569..41.569 rows=
28+
-- 293054 loops=1)
29+
-- Index Cond: (command_id = 83)
30+
-- -> Seq Scan on processing_job_status (cost=0.00..1.09 rows=4 width=40) (actual time=0.035..0.035 rows=2 loops=1)
31+
-- Filter: ((processing_job_status)::text = ANY ('{success,waiting,running,in_construction}'::text[]))
32+
-- Rows Removed by Filter: 1
33+
-- -> Bitmap Heap Scan on artifact_output_processing_job aopj (cost=4.43..12.14 rows=2 width=24) (actual time=0.031..0.031 rows=0 loops=1)
34+
-- Recheck Cond: (processing_job.processing_job_id = processing_job_id)
35+
-- -> Bitmap Index Scan on idx_artifact_output_processing_job_job (cost=0.00..4.43 rows=2 width=0) (actual time=0.026..0.026 rows=0 loops=1)
36+
-- Index Cond: (processing_job_id = processing_job.processing_job_id)
37+
-- Planning Time: 1.173 ms
38+
-- Execution Time: 51862.756 ms
39+
40+
-- Note: for this to work you need to have created as admin the extension
41+
-- CREATE EXTENSION pg_trgm;
42+
43+
-- This alter table will take close to 11 min
44+
ALTER TABLE qiita.processing_job
45+
ALTER COLUMN command_parameters TYPE JSONB USING command_parameters::jsonb;
46+
47+
-- This indexing will take like 5 min
48+
CREATE INDEX processing_job_command_parameters_job_id ON qiita.processing_job
49+
USING GIN((command_parameters->>'job_id') gin_trgm_ops);
50+
51+
-- This indexing will take like an hour
52+
CREATE INDEX processing_job_command_parameters_payload ON qiita.processing_job
53+
USING GIN((command_parameters->>'payload') gin_trgm_ops);
54+
55+
-- After the changes
56+
-- 18710.404 ms

0 commit comments

Comments
 (0)