Skip to content

Commit

Permalink
DRY refactoring of qp-klp.
Browse files Browse the repository at this point in the history
This code was originally reviewed and approved as part of the following PRs:
#60
#61

Code refactored to be more extensible to new Assay types.
Also, monolithic unit-tests that required testing on Qiita-RC were removed.
New unittests are more numerous and test smaller self-contained functionality.
Qiita is no longer needed for testing; FakeQiita class is now used to emulate canned Qiita API queries.
Job submission and SLURM responses are also emulated w/fake binaries.
  • Loading branch information
charles-cowart committed Jun 7, 2023
1 parent bc1d524 commit a1a605c
Show file tree
Hide file tree
Showing 25 changed files with 3,434 additions and 4,203 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/qiita-plugin-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ jobs:
pip --quiet install https://github.com/qiita-spots/qtp-job-output-folder/archive/refs/heads/main.zip
pip --quiet install .
pip --quiet install coveralls
export QP_KLP_CONFIG_FP=`pwd`/configuration.json
configure_qtp_job_output_folder --env-script "source /home/runner/.profile; conda activate klp" --server-cert $QIITA_SERVER_CERT
Expand Down Expand Up @@ -134,13 +135,10 @@ jobs:
export QIITA_CONFIG_FP=`pwd`/qiita-dev/qiita_core/support_files/config_test_local.cfg
export QP_KLP_CONFIG_FP=`pwd`/configuration.json
export PYTHONWARNINGS="ignore:Certificate for localhost has no \`subjectAltName\`"
nosetests --with-doctest --with-coverage -v --cover-package=qp_klp
- uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: codecov.yml

lint:
runs-on: ubuntu-latest
Expand All @@ -157,3 +155,4 @@ jobs:
run: |
pip install -q flake8
flake8 qp_klp setup.py scripts/*
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,6 @@ dmypy.json
# Pyre type checker
.pyre/


# Plugin configuration file
configuration.json
135 changes: 135 additions & 0 deletions qp_klp/Amplicon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from os import listdir, makedirs
from os.path import join, isfile, basename
from shutil import copyfile
from qp_klp.Step import Step


class Amplicon(Step):
def __init__(self, pipeline, master_qiita_job_id,
status_update_callback=None):
super().__init__(pipeline,
master_qiita_job_id,
status_update_callback)

if pipeline.pipeline_type != Step.AMPLICON_TYPE:
raise ValueError("Cannot create an Amplicon run using a "
f"{pipeline.pipeline_type}-configured Pipeline.")

def convert_bcl_to_fastq(self):
# The 'bcl2fastq' key is a convention hard-coded into mg-scripts and
# qp-klp projects. By design and history, amplicon jobs use EMP primers
# and are demuxed downstream of qp-klp by Qiita. This necessitates the
# use of bcl2fastq and a 'dummy' sample-sheet to avoid the
# demultiplexing that otherwise occurs at this stage. The name and
# path of the executable, the resource requirements to instantiate a
# SLURM job with, etc. are stored in configuration['bcl2fastq'].
config = self.pipeline.configuration['bcl2fastq']
super()._convert_bcl_to_fastq(config, self.pipeline.sample_sheet)

def quality_control(self):
# Quality control for Amplicon runs occurs downstream.
# Do not perform QC at this time.

# Simulate QCJob's output directory for use as input into FastQCJob.
projects = self.pipeline.get_project_info()
projects = [x['project_name'] for x in projects]

for project_name in projects:
# copy the files from ConvertJob output to faked QCJob output
# folder: $WKDIR/$RUN_ID/QCJob/$PROJ_NAME/amplicon
output_folder = join(self.pipeline.output_path,
'QCJob',
project_name,
# for legacy purposes, output folders are
# either 'trimmed_sequences', 'amplicon', or
# 'filtered_sequences'. Hence, this folder
# is not defined using AMPLICON_TYPE as that
# value may or may not equal the needed value.
'amplicon')

makedirs(output_folder)

raw_fastq_files_path = join(self.pipeline.output_path,
'ConvertJob')

# get list of all raw output files to be copied.
job_output = [join(raw_fastq_files_path, x) for x in
listdir(raw_fastq_files_path)]
job_output = [x for x in job_output if isfile(x)]
job_output = [x for x in job_output if x.endswith('fastq.gz')]
# Undetermined files are very small and should be filtered from
# results.
job_output = [x for x in job_output if
not basename(x).startswith('Undetermined')]

# copy the file
for fastq_file in job_output:
new_path = join(output_folder, basename(fastq_file))
copyfile(fastq_file, new_path)

# FastQC expects the ConvertJob output to also be organized by
# project. Since this would entail running the same ConvertJob
# multiple times on the same input with just a name-change in
# the dummy sample-sheet, we instead perform ConvertJob once
# and copy the output from ConvertJob's output folder into
# ConvertJob's output folder/project1, project2 ... projectN.
output_folder = join(raw_fastq_files_path, project_name)
makedirs(output_folder)

job_output = [join(raw_fastq_files_path, x) for x in
listdir(raw_fastq_files_path)]
job_output = [x for x in job_output if isfile(x) and x.endswith(
'fastq.gz') and not basename(x).startswith('Undetermined')]

for raw_fastq_file in job_output:
new_path = join(output_folder, basename(raw_fastq_file))
copyfile(raw_fastq_file, new_path)

def generate_reports(self):
super()._generate_reports()
return None # amplicon doesn't need project names

def _get_data_type(self, prep_file_path):
metadata = Step.parse_prep_file(prep_file_path)
if 'target_gene' in metadata.columns:
# remove duplicate values, then convert back to list for
# accession.
tg = list(set(metadata['sample target_gene']))
if len(tg) != 1:
raise ValueError("More than one value for target_gene")

if tg[0] in Step.AMPLICON_SUB_TYPES:
return tg[0]

raise ValueError(f"'{tg[0]}' is not a valid type - valid data-"
f"types are {Step.AMPLICON_SUB_TYPES}")
else:
raise ValueError("'target_gene' column not present in "
"generated prep-files")

def generate_touched_studies(self, qclient):
results = {}
for study_id, pf_paths in self.prep_file_paths.items():
for pf_path in pf_paths:
results[pf_path] = self._get_data_type(pf_path)

super()._generate_touched_studies(qclient, results)

def generate_prep_file(self):
config = self.pipeline.configuration['seqpro']
seqpro_path = config['seqpro_path'].replace('seqpro', 'seqpro_mf')
project_names = [x['project_name'] for x in
self.pipeline.get_project_info()]

job = super()._generate_prep_file(config,
self.pipeline.mapping_file_path,
seqpro_path,
project_names)

self.prep_file_paths = job.prep_file_paths

def generate_commands(self, qclient):
super()._generate_commands()
self.cmds.append(f'cd {self.pipeline.output_path}; '
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports')
self.write_commands_to_output_path()
172 changes: 172 additions & 0 deletions qp_klp/Metagenomic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from os import walk
from os.path import exists
from sequence_processing_pipeline.PipelineError import PipelineError
import pandas as pd
from qp_klp.Step import FailedSamplesRecord
from os.path import join, basename
from qp_klp.Step import Step


class Metagenomic(Step):
def __init__(self, pipeline, master_qiita_job_id,
status_update_callback=None):
super().__init__(pipeline,
master_qiita_job_id,
status_update_callback)

if pipeline.pipeline_type not in Step.META_TYPES:
raise ValueError("Cannot instantiate Metagenomic object from "
f"pipeline of type '{pipeline.pipeline_type}'")

# Note: FailedSamplesRecord is not used when processing amplicon as the
# samples are processed as a single fastq file and hence that info
# is not available.
self.fsr = FailedSamplesRecord(self.pipeline.output_path,
pipeline.sample_sheet.samples)

def convert_bcl_to_fastq(self):
# The 'bcl-convert' key is a convention hard-coded into mg-scripts and
# qp-klp projects. Currently meta*omic jobs use bcl-convert for its
# improved performance over bcl2fastq. The name and path of the
# executable, the resource requirements to instantiate a SLURM job
# with, etc. are stored in configuration['bcl-convert''].
config = self.pipeline.configuration['bcl-convert']
job = super()._convert_bcl_to_fastq(config,
self.pipeline.sample_sheet.path)
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'ConvertJob')

def quality_control(self):
config = self.pipeline.configuration['qc']
job = super()._quality_control(config, self.pipeline.sample_sheet.path)
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'QCJob')

def generate_reports(self):
job = super()._generate_reports()
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'FastQCJob')

self.project_names = job.project_names

def generate_prep_file(self):
config = self.pipeline.configuration['seqpro']

if self.project_names is None:
raise ValueError("reports not yet generated")

job = super()._generate_prep_file(config,
self.pipeline.sample_sheet.path,
config['seqpro_path'],
self.project_names)

self.prep_file_paths = job.prep_file_paths

def generate_touched_studies(self, qclient):
results = {}

for study_id, pf_paths in self.prep_file_paths.items():
for pf_path in pf_paths:
# record the data-type as either metagenomic or
# metatranscriptomic, according to what's stored in the
# pipeline.
results[pf_path] = self.pipeline.pipeline_type

super()._generate_touched_studies(qclient, results)

def generate_commands(self, qclient):
super()._generate_commands()

out_dir = self.pipeline.output_path
output_path = self.pipeline.output_path

self.cmds.append(f'cd {self.pipeline.output_path}; '
'tar zcvf logs-QCJob.tgz QCJob/logs')

self.cmds.append(f'cd {self.pipeline.output_path}; '
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports '
'ConvertJob/Logs')

self.write_commands_to_output_path()

if self.sifs:
# just use the filenames for tarballing the sifs.
# the sifs should all be stored in the {out_dir} by default.
tmp = [basename(x) for x in self.sifs]
# convert sifs into a list of filenames.
tmp = ' '.join(tmp)
self.cmds.append(f'cd {out_dir}; tar zcvf sample-files.tgz {tmp}')

csv_fps = []
for root, dirs, files in walk(join(output_path, 'PrepFiles')):
for csv_file in files:
csv_fps.append(join(root, csv_file))

touched_studies = []

for project, upload_dir, qiita_id in self.special_map:
# sif filenames are of the form:
blanks_file = f'{self.pipeline.run_id}_{project}_blanks.tsv'
if self.sifs and [x for x in self.sifs if blanks_file in x]:
# move uncompressed sifs to upload_dir.
tmp = f'cd {out_dir}; mv {blanks_file} {upload_dir}'
self.cmds.append(tmp)

# record that something is being moved into a Qiita Study.
# this will allow us to notify the user which Studies to
# review upon completion.
touched_studies.append((qiita_id, project))

if self.pipeline.pipeline_type in Step.META_TYPES:
self.cmds.append(f'cd {out_dir}; tar zcvf reports-QCJob.tgz '
f'QCJob/{project}/fastp_reports_dir')

if exists(f'{out_dir}/QCJob/{project}/filtered_sequences'):
self.cmds.append(f'cd {out_dir}; mv '
f'QCJob/{project}/filtered_sequences/* '
f'{upload_dir}')
elif exists(f'{out_dir}/QCJob/{project}/trimmed_sequences'):
self.cmds.append(f'cd {out_dir}; mv '
f'QCJob/{project}/trimmed_sequences/* '
f'{upload_dir}')
elif exists(f'{out_dir}/QCJob/{project}/amplicon'):
self.cmds.append(f'cd {out_dir}; mv '
f'QCJob/{project}/amplicon/* '
f'{upload_dir}')
else:
raise PipelineError("QCJob output not in expected location")

for csv_file in csv_fps:
if project in csv_file:
tmp = f'cd {out_dir}; mv {csv_file} {upload_dir}'
self.cmds.append(tmp)
break

# create a set of unique study-ids that were touched by the Pipeline
# and return this information to the user.
touched_studies = sorted(list(set(touched_studies)))

data = []
for qiita_id, project in touched_studies:
for prep_id in self.touched_studies_prep_info[qiita_id]:
surl = f'{qclient._server_url}/study/description/{qiita_id}'
prep_url = (f'{qclient._server_url}/study/description/'
f'{qiita_id}?prep_id={prep_id}')
data.append({'Project': project, 'Qiita Study ID': qiita_id,
'Qiita Prep ID': prep_id, 'Qiita URL': surl,
'Prep URL': prep_url})

df = pd.DataFrame(data)

with open(join(out_dir, 'touched_studies.html'), 'w') as f:
f.write(df.to_html(border=2, index=False, justify="left",
render_links=True, escape=False))

# copy all tgz files, including sample-files.tgz, to final_results.
self.cmds.append(f'cd {out_dir}; mv *.tgz final_results')
self.cmds.append(f'cd {out_dir}; mv FastQCJob/multiqc final_results')

if exists(join(out_dir, 'touched_studies.html')):
tmp = f'cd {out_dir}; mv touched_studies.html final_results'
self.cmds.append(tmp)

if exists(join(out_dir, 'failed_samples.html')):
tmp = f'cd {out_dir}; mv failed_samples.html final_results'
self.cmds.append(tmp)
Loading

0 comments on commit a1a605c

Please sign in to comment.