Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRY refactoring #60

Merged
merged 35 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0be276d
WIP DRY refactoring
charles-cowart May 12, 2023
d287bdb
Forgot to add new Step files to DRY refactor.
charles-cowart May 16, 2023
135780c
Laying out new test classes
charles-cowart May 16, 2023
d5196c2
Added support for base Step tests
charles-cowart May 16, 2023
773c9ba
Ensure output_dir is created
charles-cowart May 16, 2023
f199189
bugfix
charles-cowart May 16, 2023
6fd27d2
Temporarily set mg-scripts dependency to development version
charles-cowart May 16, 2023
2ba2bc6
.gitignore fix
charles-cowart May 16, 2023
b740b0d
First test w/pseudo job-submission added.
charles-cowart May 16, 2023
01d2f3d
CI Fixes
charles-cowart May 16, 2023
537cc7e
Fix for CI
charles-cowart May 17, 2023
6dffa20
debug CI.
charles-cowart May 17, 2023
1117f5a
debugging CI
charles-cowart May 17, 2023
65a04ca
ci debug
charles-cowart May 17, 2023
715716e
ci debug
charles-cowart May 17, 2023
b6fde09
ci debug
charles-cowart May 17, 2023
7f7a6ef
Added test for base QCJob
charles-cowart May 17, 2023
70301a6
Added MetagenomicStep tests
charles-cowart May 17, 2023
b889ec3
Updated testing infrastructure
charles-cowart May 17, 2023
fe186de
Added AmpliconStep tests
charles-cowart May 17, 2023
3df5f25
flake8
charles-cowart May 17, 2023
547d3d1
setup now points to merged mg-scripts.
charles-cowart May 18, 2023
b2e48a6
Updated code to create fake files in a searchable path.
charles-cowart May 18, 2023
2f638b0
Bugfix
charles-cowart May 18, 2023
3e788ec
flake8
charles-cowart May 18, 2023
5784752
Easy updates based on feedback
charles-cowart May 18, 2023
1c8f792
Removed configuration.json
charles-cowart May 20, 2023
d9a6efa
Merged klp_util.py into Step and klp.py based on feedback
charles-cowart May 22, 2023
0d82aee
Removed test_klp_util.py
charles-cowart May 22, 2023
8392d24
Updates based on testing
charles-cowart May 22, 2023
f3779cd
Updates based on feedback
charles-cowart May 22, 2023
3d5a223
Changes based on feedback
charles-cowart May 22, 2023
245effb
Add new file
charles-cowart May 22, 2023
1c1e762
Changes based on feedback
charles-cowart May 23, 2023
b2840f2
bugfix
charles-cowart May 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/qiita-plugin-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ jobs:
export QIITA_CONFIG_FP=`pwd`/qiita-dev/qiita_core/support_files/config_test_local.cfg
export QP_KLP_CONFIG_FP=`pwd`/configuration.json
export PYTHONWARNINGS="ignore:Certificate for localhost has no \`subjectAltName\`"

nosetests --with-doctest --with-coverage -v --cover-package=qp_klp

- uses: codecov/codecov-action@v3
Expand Down
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,3 @@ dmypy.json

# Pyre type checker
.pyre/

# Plugin configuration file
configuration.json
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
107 changes: 107 additions & 0 deletions qp_klp/Amplicon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from os import listdir, makedirs
from os.path import join, isfile, basename
from shutil import copyfile
from qp_klp.Step import Step


class Amplicon(Step):
def __init__(self, pipeline, master_qiita_job_id, sn_tid_map_by_project,
status_update_callback=None):
super().__init__(pipeline,
master_qiita_job_id,
sn_tid_map_by_project,
status_update_callback)

if pipeline.pipeline_type != 'amplicon':
raise ValueError("Cannot instantiate Amplicon object from "
f"pipeline of type '{pipeline.pipeline_type}'")

def convert_bcl_to_fastq(self):
# The 'bcl2fastq' key is a convention hard-coded into mg-scripts and
# qp-klp projects. By design and history, amplicon jobs use EMP primers
# and are demuxed downstream of qp-klp by Qiita. This necessitates the
# use of bcl2fastq and a 'dummy' sample-sheet to avoid the
# demultiplexing that otherwise occurs at this stage. The name and
# path of the executable, the resource requirements to instantiate a
# SLURM job with, etc. are stored in configuration['bcl2fastq'].
config = self.pipeline.configuration['bcl2fastq']
super()._convert_bcl_to_fastq(config, self.pipeline.sample_sheet)

def quality_control(self):
# Quality control for Amplicon runs occurs downstream.
# Do not perform QC at this time.

# Simulate QCJob's output directory for use as input into FastQCJob.
projects = self.pipeline.get_project_info()
projects = [x['project_name'] for x in projects]

for project_name in projects:
# copy the files from ConvertJob output to faked QCJob output
# folder: $WKDIR/$RUN_ID/QCJob/$PROJ_NAME/amplicon
faked_output_folder = join(self.pipeline.output_path,
'QCJob',
project_name,
'amplicon')

makedirs(faked_output_folder)

raw_fastq_files_path = join(self.pipeline.output_path,
'ConvertJob')

# get list of all raw output files to be copied.
job_output = [join(raw_fastq_files_path, x) for x in
listdir(raw_fastq_files_path)]
job_output = [x for x in job_output if isfile(x)]
job_output = [x for x in job_output if x.endswith('fastq.gz')]
# Undetermined files are very small and should be filtered from
# results.
job_output = [x for x in job_output if
not basename(x).startswith('Undetermined')]

# copy the file
for fastq_file in job_output:
new_path = join(faked_output_folder, basename(fastq_file))
copyfile(fastq_file, new_path)

# FastQC expects the ConvertJob output to also be organized by
# project. Since this would entail running the same ConvertJob
# multiple times on the same input with just a name-change in
# the dummy sample-sheet, we instead perform ConvertJob once
# and copy the output from ConvertJob's output folder into
# ConvertJob's output folder/project1, project2 ... projectN.
faked_output_folder = join(raw_fastq_files_path, project_name)
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
makedirs(faked_output_folder)
job_output = [join(raw_fastq_files_path, x) for x in
listdir(raw_fastq_files_path)]
job_output = [x for x in job_output if isfile(x)]
job_output = [x for x in job_output if x.endswith('fastq.gz')]
job_output = [x for x in job_output if
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
not basename(x).startswith('Undetermined')]

for raw_fastq_file in job_output:
new_path = join(faked_output_folder, basename(raw_fastq_file))
copyfile(raw_fastq_file, new_path)

def generate_reports(self, config, input_file_path):
config = self.pipeline.configuration['fastqc']
super()._generate_reports(config, self.pipeline.mapping_file)

return None # amplicon doesn't need project names

def generate_prep_file(self):
config = self.pipeline.configuration['seqpro']

seqpro_path = config['seqpro_path'].replace('seqpro', 'seqpro_mf')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan to remove seqpro_mf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an easy enough change on this side of things, and a little bit of refactoring for metapool package. I can make the change by the end of the week in between other things.


job = super()._generate_prep_file(config,
self.pipeline.mapping_file,
seqpro_path,
self.project_names)

self.prep_file_paths = job.prep_file_paths

def generate_commands(self):
super()._generate_commands()
self.cmds.append(f'cd {self.pipeline.output_path}; '
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports')
self.write_commands_to_output_path()
164 changes: 164 additions & 0 deletions qp_klp/Metagenomic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from os import walk
from os.path import exists, join, basename
from qp_klp.klp_util import FailedSamplesRecord
from sequence_processing_pipeline.PipelineError import PipelineError
import pandas as pd
from qp_klp.Step import Step


class Metagenomic(Step):
def __init__(self, pipeline, master_qiita_job_id, sn_tid_map_by_project,
status_update_callback=None):
super().__init__(pipeline,
master_qiita_job_id,
sn_tid_map_by_project,
status_update_callback)

if pipeline.pipeline_type != 'metagenomic':
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Cannot instantiate Metagenomic object from "
f"pipeline of type '{pipeline.pipeline_type}'")

# Note: FailedSamplesRecord is not used when processing amplicon as the
# samples are processed as a single fastq file and hence that info
# is not available.
self.fsr = FailedSamplesRecord(self.pipeline.output_path,
pipeline.sample_sheet.samples)
self.project_names = None

def convert_bcl_to_fastq(self):
# The 'bcl-convert' key is a convention hard-coded into mg-scripts and
# qp-klp projects. Currently metagenomic jobs use bcl-convert for its
# improved performance over bcl2fastq. The name and path of the
# executable, the resource requirements to instantiate a SLURM job
# with, etc. are stored in configuration['bcl-convert''].
config = self.pipeline.configuration['bcl-convert']
job = super()._convert_bcl_to_fastq(config,
self.pipeline.sample_sheet.path)
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'ConvertJob')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like at the end of each method something similar to this line is called, should it be more general? I mean something like self.failed_samples_audit('ConvertJob') where that calls something like self.write(job.audit(self.pipeline.get_sample_ids()), audit_name)

Copy link
Contributor Author

@charles-cowart charles-cowart May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this out, but it seems like it will just create another function something like the following:
def _failed_samples_audit(self, job, step_name):
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), step_name)

It doesn't really do much. The meat of each audit is in each Job class, while the structure and writing of each failed samples record is in the FailedSamplesRecord class.


def quality_control(self):
config = self.pipeline.configuration['qc']
job = super()._quality_control(config, self.pipeline.sample_sheet.path)
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'QCJob')

def generate_reports(self):
config = self.pipeline.configuration['fastqc']
job = super()._generate_reports(config,
self.pipeline.sample_sheet.path)
self.fsr.write(job.audit(self.pipeline.get_sample_ids()), 'FastQCJob')

self.project_names = job.project_names

def generate_prep_file(self):
config = self.pipeline.configuration['seqpro']

if self.project_names is None:
raise ValueError("reports not yet generated")

job = super()._generate_prep_file(config,
self.pipeline.sample_sheet,
config['seqpro_path'],
self.project_names)

self.prep_file_paths = job.prep_file_paths

def generate_commands(self, special_map, server_url,
touched_studies_prep_info):
super()._generate_commands()

out_dir = self.pipeline.output_path
output_path = self.pipeline.output_path

self.cmds.append(f'cd {self.pipeline.output_path}; '
'tar zcvf logs-QCJob.tgz QCJob/logs')

self.cmds.append(f'cd {self.pipeline.output_path}; '
'tar zcvf reports-ConvertJob.tgz ConvertJob/Reports '
'ConvertJob/Logs')

self.write_commands_to_output_path()

if self.sifs:
# just use the filenames for tarballing the sifs.
# the sifs should all be stored in the {out_dir} by default.
tmp = [basename(x) for x in self.sifs]
# convert sifs into a list of filenames.
tmp = ' '.join(tmp)
self.cmds.append(f'cd {out_dir}; tar zcvf sample-files.tgz {tmp}')

csv_fps = []
for root, dirs, files in walk(join(output_path, 'PrepFiles')):
for csv_file in files:
csv_fps.append(join(root, csv_file))

touched_studies = []

for project, upload_dir, qiita_id in special_map:
# sif filenames are of the form:
blanks_file = f'{self.pipeline.run_id}_{project}_blanks.tsv'
if self.sifs and [x for x in self.sifs if blanks_file in x]:
# move uncompressed sifs to upload_dir.
tmp = f'cd {out_dir}; mv {blanks_file} {upload_dir}'
self.cmds.append(tmp)

# record that something is being moved into a Qiita Study.
# this will allow us to notify the user which Studies to
# review upon completion.
touched_studies.append((qiita_id, project))

if self.pipeline.pipeline_type == 'metagenomic':
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
self.cmds.append(f'cd {out_dir}; tar zcvf reports-QCJob.tgz '
f'QCJob/{project}/fastp_reports_dir')

if exists(f'{out_dir}/QCJob/{project}/filtered_sequences'):
self.cmds.append(f'cd {out_dir}; mv '
f'QCJob/{project}/filtered_sequences/* '
f'{upload_dir}')
elif exists(f'{out_dir}/QCJob/{project}/trimmed_sequences'):
self.cmds.append(f'cd {out_dir}; mv '
f'QCJob/{project}/trimmed_sequences/* '
f'{upload_dir}')
elif exists(f'{out_dir}/QCJob/{project}/amplicon'):
self.cmds.append(f'cd {out_dir}; mv '
f'QCJob/{project}/amplicon/* '
f'{upload_dir}')
else:
raise PipelineError("QCJob output not in expected location")

for csv_file in csv_fps:
if project in csv_file:
tmp = f'cd {out_dir}; mv {csv_file} {upload_dir}'
self.cmds.append(tmp)
break

# create a set of unique study-ids that were touched by the Pipeline
# and return this information to the user.
touched_studies = sorted(list(set(touched_studies)))

data = []
for qiita_id, project in touched_studies:
for prep_id in touched_studies_prep_info[qiita_id]:
study_url = f'{server_url}/study/description/{qiita_id}'
prep_url = (f'{server_url}/study/description/'
f'{qiita_id}?prep_id={prep_id}')
data.append({'Project': project, 'Qiita Study ID': qiita_id,
'Qiita Prep ID': prep_id, 'Qiita URL': study_url,
'Prep URL': prep_url})

df = pd.DataFrame(data)

with open(join(out_dir, 'touched_studies.html'), 'w') as f:
f.write(df.to_html(border=2, index=False, justify="left",
render_links=True, escape=False))

# copy all tgz files, including sample-files.tgz, to final_results.
self.cmds.append(f'cd {out_dir}; mv *.tgz final_results')
self.cmds.append(f'cd {out_dir}; mv FastQCJob/multiqc final_results')

if exists(join(out_dir, 'touched_studies.html')):
tmp = f'cd {out_dir}; mv touched_studies.html final_results'
self.cmds.append(tmp)

if exists(join(out_dir, 'failed_samples.html')):
tmp = f'cd {out_dir}; mv failed_samples.html final_results'
self.cmds.append(tmp)
Loading