Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve complete job #3443

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Deployed on October 14th, 2024
* `Woltka v0.1.7, paired-end` superseded `Woltka v0.1.6` in `qp-woltka`; [more information](https://qiita.ucsd.edu/static/doc/html/processingdata/woltka_pairedend.html). Thank you to @qiyunzhu for the benchmarks!
* Other general fixes, like [#3424](https://github.com/qiita-spots/qiita/pull/3424), [#3425](https://github.com/qiita-spots/qiita/pull/3425), [#3439](https://github.com/qiita-spots/qiita/pull/3439), [#3440](https://github.com/qiita-spots/qiita/pull/3440).
* General SPP improvements, like: [NuQC modified to preserve metadata in fastq files](https://github.com/biocore/mg-scripts/pull/155), [use squeue instead of sacct](https://github.com/biocore/mg-scripts/pull/152), , [job aborts if Qiita study contains sample metadata columns reserved for prep-infos](https://github.com/biocore/mg-scripts/pull/151), [metapool generates OverrideCycles value](https://github.com/biocore/metagenomics_pooling_notebook/pull/225).
* We updated the available parameters for `Filter features against reference [filter_features]`, `Non V4 16S sequence assessment [non_v4_16s]` and all the phylogenetic analytical commands so they can use `Greengenes2 2024.09`.



Expand Down
4 changes: 3 additions & 1 deletion qiita_db/handlers/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def post(self, job_id):
cmd, values_dict={'job_id': job_id,
'payload': self.request.body.decode(
'ascii')})
job = qdb.processing_job.ProcessingJob.create(job.user, params)
# complete_job are unique so it is fine to force them to be created
job = qdb.processing_job.ProcessingJob.create(
job.user, params, force=True)
job.submit()

self.finish()
Expand Down
77 changes: 39 additions & 38 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,10 @@ def create(cls, user, parameters, force=False):
TTRN = qdb.sql_connection.TRN
with TTRN:
command = parameters.command

# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email, processing_job_status,
COUNT(aopj.artifact_id)
if not force:
# check if a job with the same parameters already exists
sql = """SELECT processing_job_id, email,
processing_job_status, COUNT(aopj.artifact_id)
FROM qiita.processing_job
LEFT JOIN qiita.processing_job_status
USING (processing_job_status_id)
Expand All @@ -596,41 +596,42 @@ def create(cls, user, parameters, force=False):
GROUP BY processing_job_id, email,
processing_job_status"""

# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
# we need to use ILIKE because of booleans as they can be
# false or False
params = []
for k, v in parameters.values.items():
# this is necessary in case we have an Iterable as a value
# but that is string
if isinstance(v, Iterable) and not isinstance(v, str):
for vv in v:
params.extend([k, str(vv)])
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s ILIKE %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
params.extend([k, str(v)])

if params:
# divided by 2 as we have key-value pairs
len_params = int(len(params)/2)
sql = sql.format(' AND ' + ' AND '.join(
["command_parameters->>%s ILIKE %s"] * len_params))
params = [command.id] + params
TTRN.add(sql, params)
else:
# the sql variable expects the list of parameters but if there
# is no param we need to replace the {0} with an empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs and not force:
raise ValueError(
'Cannot create job because the parameters are the same as '
'jobs that are queued, running or already have '
'succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))
# the sql variable expects the list of parameters but if
# there is no param we need to replace the {0} with an
# empty string
TTRN.add(sql.format(""), [command.id])

# checking that if the job status is success, it has children
# [2] status, [3] children count
existing_jobs = [r for r in TTRN.execute_fetchindex()
if r[2] != 'success' or r[3] > 0]
if existing_jobs:
raise ValueError(
'Cannot create job because the parameters are the '
'same as jobs that are queued, running or already '
'have succeeded:\n%s' % '\n'.join(
["%s: %s" % (jid, status)
for jid, _, status, _ in existing_jobs]))

sql = """INSERT INTO qiita.processing_job
(email, command_id, command_parameters,
Expand Down
57 changes: 57 additions & 0 deletions qiita_db/support_files/patches/93.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-- Oct 18, 2024
-- ProcessingJob.create can take up to 52 seconds if creating a complete_job; mainly
-- due to the number of jobs of this command and using json. The solution in the database
-- is to convert to jsonb and index the values of the database

-- ### This are the stats before the change in a single example
-- GroupAggregate (cost=67081.81..67081.83 rows=1 width=77) (actual time=51859.962..51862.637 rows=1 loops=1)
-- Group Key: processing_job.processing_job_id, processing_job_status.processing_job_status
-- -> Sort (cost=67081.81..67081.81 rows=1 width=77) (actual time=51859.952..51862.627 rows=1 loops=1)
-- Sort Key: processing_job.processing_job_id, processing_job_status.processing_job_status
-- Sort Method: quicksort Memory: 25kB
-- -> Nested Loop Left Join (cost=4241.74..67081.80 rows=1 width=77) (actual time=51859.926..51862.604 rows=1 loops=1)
-- -> Nested Loop (cost=4237.30..67069.64 rows=1 width=69) (actual time=51859.889..51862.566 rows=1 loops=1)
-- Join Filter: (processing_job.processing_job_status_id = processing_job_status.processing_job_status_id)
-- Rows Removed by Join Filter: 1
-- -> Gather (cost=4237.30..67068.50 rows=1 width=45) (actual time=51859.846..51862.522 rows=1 loops=1)
-- Workers Planned: 2
-- Workers Launched: 2
-- -> Parallel Bitmap Heap Scan on processing_job (cost=3237.30..66068.40 rows=1 width=45) (actual time=51785.317..51785.446 rows=0 loops=3)
-- Recheck Cond: (command_id = 83)
-- Filter: (((command_parameters ->> 'job_id'::text) ~~* '3432a908-f7b8-4e36-89fc-88f3310b84d5'::text) AND ((command_parameters ->> '
-- payload'::text) ~~* '{"success": true, "error": "", "artifacts": {"alpha_diversity": {"artifact_type": "alpha_vector", "filepaths": [["/qmounts/qiita_test_data/tes
-- tlocal/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity/alpha-diversity.tsv", "plain_text"], ["/qmounts/qiita_test_data/testloca
-- l/working_dir/3432a908-f7b8-4e36-89fc-88f3310b84d5/alpha_phylogenetic/alpha_diversity.qza", "qza"]], "archive": {}}}}'::text))
-- Rows Removed by Filter: 97315
-- Heap Blocks: exact=20133
-- -> Bitmap Index Scan on idx_processing_job_command_id (cost=0.00..3237.30 rows=294517 width=0) (actual time=41.569..41.569 rows=
-- 293054 loops=1)
-- Index Cond: (command_id = 83)
-- -> Seq Scan on processing_job_status (cost=0.00..1.09 rows=4 width=40) (actual time=0.035..0.035 rows=2 loops=1)
-- Filter: ((processing_job_status)::text = ANY ('{success,waiting,running,in_construction}'::text[]))
-- Rows Removed by Filter: 1
-- -> Bitmap Heap Scan on artifact_output_processing_job aopj (cost=4.43..12.14 rows=2 width=24) (actual time=0.031..0.031 rows=0 loops=1)
-- Recheck Cond: (processing_job.processing_job_id = processing_job_id)
-- -> Bitmap Index Scan on idx_artifact_output_processing_job_job (cost=0.00..4.43 rows=2 width=0) (actual time=0.026..0.026 rows=0 loops=1)
-- Index Cond: (processing_job_id = processing_job.processing_job_id)
-- Planning Time: 1.173 ms
-- Execution Time: 51862.756 ms

-- Note: for this to work you need to have created as admin the extension
-- CREATE EXTENSION pg_trgm;
CREATE EXTENSION IF NOT EXISTS "pg_trgm" WITH SCHEMA public;

-- This alter table will take close to 11 min
ALTER TABLE qiita.processing_job
ALTER COLUMN command_parameters TYPE JSONB USING command_parameters::jsonb;

-- This indexing will take like 5 min
CREATE INDEX IF NOT EXISTS processing_job_command_parameters_job_id ON qiita.processing_job
USING GIN((command_parameters->>'job_id') gin_trgm_ops);

-- This indexing will take like an hour
CREATE INDEX IF NOT EXISTS processing_job_command_parameters_payload ON qiita.processing_job
USING GIN((command_parameters->>'payload') gin_trgm_ops);

-- After the changes
-- 18710.404 ms
Loading