Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to use Spades and multithreading for Bowtie2 and Spades #210

Merged
merged 3 commits into from
Feb 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 112 additions & 2 deletions ariba/assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pymummer
import fermilite_ariba
from ariba import common, faidx, mapping, bam_parse, external_progs, ref_seq_chooser
import shlex

class Error (Exception): pass

Expand All @@ -29,6 +30,9 @@ def __init__(self,
nucmer_breaklen=200,
extern_progs=None,
clean=True,
spades_mode="wgs",
spades_options=None,
threads=1
):
self.reads1 = os.path.abspath(reads1)
self.reads2 = os.path.abspath(reads2)
Expand All @@ -50,6 +54,9 @@ def __init__(self,
self.nucmer_min_len = nucmer_min_len
self.nucmer_breaklen = nucmer_breaklen
self.clean = clean
self.spades_mode = spades_mode
self.spades_options = spades_options
self.threads = threads

if extern_progs is None:
self.extern_progs = external_progs.ExternalProgs()
Expand Down Expand Up @@ -94,6 +101,106 @@ def _assemble_with_fermilite(self):
self.assembled_ok = (got_from_fermilite == 0)
os.chdir(cwd)

@staticmethod
def _check_spades_log_file(logfile):
'''SPAdes can fail with a strange error. Stop everything if this happens'''
f = pyfastaq.utils.open_file_read(logfile)

for line in f:
if line.startswith('== Error == system call for:') and line.rstrip().endswith('finished abnormally, err code: -7'):
pyfastaq.utils.close(f)
print('Error running SPAdes. Cannot continue. This is the error from the log file', logfile, '...', file=sys.stderr)
print(line, file=sys.stderr)
raise Error('Fatal error ("err code: -7") running spades. Cannot continue')

pyfastaq.utils.close(f)
return True

def _assemble_with_spades(self):
cwd = os.getcwd()
self.assembled_ok = False
try:
try:
os.chdir(self.working_dir)
except:
raise Error('Error chdir ' + self.working_dir)
spades_exe = self.extern_progs.exe('spades')
if not spades_exe:
raise Error("Spades executable has not been found")
spades_options = self.spades_options
if spades_options is not None:
spades_options = shlex.split(self.spades_options)
if self.spades_mode == "rna":
spades_options = ["--rna"] + (["-k","127"] if spades_options is None else spades_options)
spades_out_seq_base = "transcripts.fasta"
elif self.spades_mode == "sc":
spades_options = ["--sc"] + (["-k", "33,55,77,99,127","--careful"] if spades_options is None else spades_options)
spades_out_seq_base = "contigs.fasta"
elif self.spades_mode == "wgs":
spades_options = ["-k", "33,55,77,99,127","--careful"] if spades_options is None else spades_options
spades_out_seq_base = "contigs.fasta"
else:
raise ValueError("Unknown spades_mode value: {}".format(self.spades_mode))
asm_cmd = [spades_exe, "-t", str(self.threads), "--pe1-1", self.reads1, "--pe1-2", self.reads2, "-o", self.assembler_dir] + \
spades_options
asm_ok,err = common.syscall(asm_cmd, verbose=True, verbose_filehandle=self.log_fh, shell=False, allow_fail=True)
if not asm_ok:
print('Assembly finished with errors. These are the errors:', file=self.log_fh)
print(err, file=self.log_fh)
print('\nEnd of spades errors\n', file=self.log_fh)
else:

spades_log = os.path.join(self.assembler_dir, 'spades.log')
if os.path.exists(spades_log):
self._check_spades_log_file(spades_log)

with open(spades_log) as f:
print('\n______________ SPAdes log ___________________\n', file=self.log_fh)
for line in f:
print(line.rstrip(), file=self.log_fh)
print('\n______________ End of SPAdes log _________________\n', file=self.log_fh)

spades_warnings = os.path.join(self.assembler_dir, 'warnings.log')
if os.path.exists(spades_warnings):
with open(spades_warnings) as f:
print('\n______________ SPAdes warnings ___________________\n', file=self.log_fh)
for line in f:
print(line.rstrip(), file=self.log_fh)
print('\n______________ End of SPAdes warnings _________________\n', file=self.log_fh)

## fermilight module generates contig names that look like `cluster_1.l15.c17.ctg.1` where 'cluster_1'==self.contig_name_prefix
## the whole structure of the contig name is expected in several places downstream where it is parsed into individual components.
## For example, it is parsed into to l and c parts in ref_seq_chooser (although the parts are not actually used).
## This is the code from fermilight module that generates the contig ID string:
## ofs << ">" << namePrefix << ".l" << overlap << ".c" << minCount << ".ctg." << i + 1 << '\n'
##
## We generate the same contig name structure here using dummy values for overlap and minCount, in order
## to avoid distrupting the downstream code.
## Note that the fermilight module generates multiple versions of the assembly on a grid of l and c values,
## and ref_seq_chooser then picks a single "best" (l,c) version based on coverage/identity of the nucmer
## alignment to the reference. Spades generates a single version of the assembly, so ref_seq_chooser
## can only pick that one version.

spades_out_seq = os.path.join(self.assembler_dir,spades_out_seq_base)
## No need really to use general-purpose pyfastaq.sequences.file_reader here and pay performance cost for
## its multi-format line tests since we are only replacing the IDs in a pre-defined format
if os.path.exists(spades_out_seq):
with open(spades_out_seq,"r") as inp, open(self.all_assembly_contigs_fa,"w") as out:
pref = self.contig_name_prefix
i_cont = 0
for line in inp:
if line.startswith(">"):
i_cont += 1
line = ">{}.l15.c17.ctg.{}\n".format(pref,i_cont)
out.write(line)
if i_cont > 0:
self.assembled_ok = True
if self.clean:
print('Deleting assembly directory', self.assembler_dir, file=self.log_fh)
shutil.rmtree(self.assembler_dir,ignore_errors=True)
finally:
os.chdir(cwd)


@staticmethod
def _fix_contig_orientation(contigs_fa, ref_fa, outfile, min_id=90, min_length=20, breaklen=200):
Expand Down Expand Up @@ -148,7 +255,10 @@ def _parse_bam(sequences, bam, min_scaff_depth, max_insert):


def run(self):
self._assemble_with_fermilite()
if self.assembler == 'fermilite':
self._assemble_with_fermilite()
elif self.assembler == "spades":
self._assemble_with_spades()
print('Finished running assemblies', flush=True, file=self.log_fh)
self.sequences = {}

Expand Down Expand Up @@ -208,7 +318,7 @@ def run(self):
self.reads2,
self.final_assembly_fa,
self.final_assembly_bam[:-4],
threads=1,
threads=self.threads,
sort=True,
bowtie2=self.extern_progs.exe('bowtie2'),
bowtie2_version=self.extern_progs.version('bowtie2'),
Expand Down
95 changes: 66 additions & 29 deletions ariba/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ def __init__(self,
max_allele_freq=0.90,
unique_threshold=0.03,
max_gene_nt_extend=30,
spades_other_options=None,
spades_mode="rna", #["rna","wgs"]
spades_options=None,
clean=True,
extern_progs=None,
random_seed=42,
threads_total=1
):
self.root_dir = os.path.abspath(root_dir)
self.read_store = read_store
Expand All @@ -71,7 +73,8 @@ def __init__(self,
self.sspace_k = sspace_k
self.sspace_sd = sspace_sd
self.reads_insert = reads_insert
self.spades_other_options = spades_other_options
self.spades_mode = spades_mode
self.spades_options = spades_options

self.reads_for_assembly1 = os.path.join(self.root_dir, 'reads_for_assembly_1.fq')
self.reads_for_assembly2 = os.path.join(self.root_dir, 'reads_for_assembly_2.fq')
Expand All @@ -96,6 +99,9 @@ def __init__(self,
self.status_flag = flag.Flag()
self.clean = clean

self.threads_total = threads_total
self.remaining_clusters = None

self.assembly_dir = os.path.join(self.root_dir, 'Assembly')
self.final_assembly_fa = os.path.join(self.root_dir, 'assembly.fa')
self.final_assembly_bam = os.path.join(self.root_dir, 'assembly.reads_mapped.bam')
Expand Down Expand Up @@ -138,13 +144,31 @@ def __init__(self,
for s in wanted_signals:
signal.signal(s, self._receive_signal)

def _update_threads(self):
"""Update available thread count post-construction.
To be called any number of times from run() method"""
if self.remaining_clusters is not None:
self.threads = max(1,self.threads_total//self.remaining_clusters.value)
#otherwise just keep the current (initial) value
print("{} detected {} threads available to it".format(self.name,self.threads), file = self.log_fh)

def _report_completion(self):
"""Update shared counters to signal that we are done with this cluster.
Call just before exiting run() method (in a finally clause)"""
rem_clust = self.remaining_clusters
if rem_clust is not None:
# -= is non-atomic, need to acquire a lock
with self.remaining_clusters_lock:
rem_clust.value -= 1
# we do not need this object anymore
self.remaining_clusters = None
print("{} reported completion".format(self.name), file=self.log_fh)

def _atexit(self):
if self.log_fh is not None:
pyfastaq.utils.close(self.log_fh)
self.log_fh = None


def _receive_signal(self, signum, stack):
print('Signal', signum, 'received in cluster', self.name + '... Stopping!', file=sys.stderr, flush=True)
if self.log_fh is not None:
Expand Down Expand Up @@ -190,7 +214,10 @@ def _set_up_input_files(self):
def _clean_file(self, filename):
if self.clean:
print('Deleting file', filename, file=self.log_fh)
os.unlink(filename)
try: #protect against OSError: [Errno 16] Device or resource busy: '.nfs0000000010f0f04f000003c9' and such
os.unlink(filename)
except:
pass


def _clean(self):
Expand Down Expand Up @@ -269,33 +296,39 @@ def _make_reads_for_assembly(number_of_wanted_reads, total_reads, reads_in1, rea
return total_reads


def run(self):
self._set_up_input_files()
def run(self,remaining_clusters=None,remaining_clusters_lock=None):
try:
self.remaining_clusters = remaining_clusters
self.remaining_clusters_lock = remaining_clusters_lock
self._update_threads()
self._set_up_input_files()

for fname in [self.all_reads1, self.all_reads2, self.references_fa]:
if not os.path.exists(fname):
raise Error('File ' + fname + ' not found. Cannot continue')
for fname in [self.all_reads1, self.all_reads2, self.references_fa]:
if not os.path.exists(fname):
raise Error('File ' + fname + ' not found. Cannot continue')

original_dir = os.getcwd()
os.chdir(self.root_dir)
original_dir = os.getcwd()
os.chdir(self.root_dir)

try:
self._run()
except Error as err:
os.chdir(original_dir)
print('Error running cluster! Error was:', err, sep='\n', file=self.log_fh)
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
raise Error('Error running cluster ' + self.name + '!')

try:
self._run()
except Error as err:
os.chdir(original_dir)
print('Error running cluster! Error was:', err, sep='\n', file=self.log_fh)
print('Finished', file=self.log_fh, flush=True)
print('{:_^79}'.format(' LOG FILE END ' + self.name + ' '), file=self.log_fh, flush=True)

# This stops multiprocessing complaining with the error:
# multiprocessing.pool.MaybeEncodingError: Error sending result: '[<ariba.cluster.Cluster object at 0x7ffa50f8bcd0>]'. Reason: 'TypeError("cannot serialize '_io.TextIOWrapper' object",)'
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
raise Error('Error running cluster ' + self.name + '!')

os.chdir(original_dir)
print('Finished', file=self.log_fh, flush=True)
print('{:_^79}'.format(' LOG FILE END ' + self.name + ' '), file=self.log_fh, flush=True)

# This stops multiprocessing complaining with the error:
# multiprocessing.pool.MaybeEncodingError: Error sending result: '[<ariba.cluster.Cluster object at 0x7ffa50f8bcd0>]'. Reason: 'TypeError("cannot serialize '_io.TextIOWrapper' object",)'
pyfastaq.utils.close(self.log_fh)
self.log_fh = None
finally:
self._report_completion()


def _run(self):
Expand All @@ -310,6 +343,7 @@ def _run(self):
print('\nUsing', made_reads, 'from a total of', self.total_reads, 'for assembly.', file=self.log_fh, flush=True)
print('Assembling reads:', file=self.log_fh, flush=True)

self._update_threads()
self.assembly = assembly.Assembly(
self.reads_for_assembly1,
self.reads_for_assembly2,
Expand All @@ -323,7 +357,10 @@ def _run(self):
contig_name_prefix=self.name,
assembler=self.assembler,
extern_progs=self.extern_progs,
clean=self.clean
clean=self.clean,
spades_mode=self.spades_mode,
spades_options=self.spades_options,
threads=self.threads
)

self.assembly.run()
Expand All @@ -332,7 +369,7 @@ def _run(self):
self._clean_file(self.reads_for_assembly2)
if self.clean:
print('Deleting Assembly directory', self.assembly_dir, file=self.log_fh, flush=True)
shutil.rmtree(self.assembly_dir)
shutil.rmtree(self.assembly_dir,ignore_errors=True)


if self.assembled_ok and self.assembly.ref_seq_name is not None:
Expand All @@ -342,13 +379,13 @@ def _run(self):
self.is_variant_only = '1' if is_variant_only else '0'

print('\nAssembly was successful\n\nMapping reads to assembly:', file=self.log_fh, flush=True)

self._update_threads()
mapping.run_bowtie2(
self.all_reads1,
self.all_reads2,
self.final_assembly_fa,
self.final_assembly_bam[:-4],
threads=1,
threads=self.threads,
sort=True,
bowtie2=self.extern_progs.exe('bowtie2'),
bowtie2_preset='very-sensitive-local',
Expand Down
Loading