From 523f053b851126553a56057b16696d659997c282 Mon Sep 17 00:00:00 2001 From: Luis Pedro Coelho Date: Mon, 26 Feb 2024 13:19:40 +1000 Subject: [PATCH 1/2] TST Make test_bin more robust Equivalent clusterings should be considered a pass --- test/test_bin.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/test_bin.py b/test/test_bin.py index 900adee..f288f04 100644 --- a/test/test_bin.py +++ b/test/test_bin.py @@ -94,6 +94,14 @@ def test_cluster(): res, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) +def _renumber_cluster(xs): + ys = np.zeros_like(xs) + renumber = {} + for ix,x in enumerate(xs): + if x not in renumber: + renumber[x] = len(renumber) + ys[ix] = renumber[x] + return ys def test_recluster(): contig_dict = {h:seq for h,seq in fasta_iter('test/bin_data/input.fasta')} @@ -116,10 +124,12 @@ def test_recluster(): random_seed=123) # Computed with a previous version - np.testing.assert_array_equal( - reclustered, - np.array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 29, 21, 22, 23, 24, 25, 26, + expected = np.array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 29, 21, 22, 23, 24, 25, 26, 26, 28, 27, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 10, 11, 12, 13, - 14, 15, 16, 17, 18, 19])) + 14, 15, 16, 17, 18, 19]) + + np.testing.assert_array_equal( + _renumber_cluster(reclustered), + _renumber_cluster(expected)) From 02eb2c8072ecf5cf96bfa6a8b5380f0102e82746 Mon Sep 17 00:00:00 2001 From: Luis Pedro Coelho Date: Mon, 26 Feb 2024 13:05:00 +1000 Subject: [PATCH 2/2] ENH Do not start more processes than we can use With large machines, it is easy to request more processes than SemiBin can actually meaningfully use (see https://github.com/BigDataBiology/SemiBin/issues/155). We thus limit parallelism to the maximum number of CPUs that we can actually use (typically the number of input files) /cc #155 --- ChangeLog | 1 + SemiBin/cluster.py | 4 ++-- SemiBin/main.py | 4 ++-- SemiBin/naive_orffinder.py | 8 +++++++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/ChangeLog b/ChangeLog index a9db11d..b93a2e5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -3,6 +3,7 @@ Unreleased * SemiBin1: Introduce separate SemiBin1 command * internal: Code simplification and refactor * deprecation: Deprecate --orf-finder=fraggenescan option + * SemiBin: do not use more processes than can be taken advantage of (#155) Version 2.0.2 Oct 31 2023 by BigDataBiology * multi_easy_bin: Fix multi_easy_bin with --write-pre-recluster (#128) diff --git a/SemiBin/cluster.py b/SemiBin/cluster.py index 791fca2..dc9759a 100644 --- a/SemiBin/cluster.py +++ b/SemiBin/cluster.py @@ -17,8 +17,8 @@ def run_infomap(g, edge_weights, vertex_weights, num_process): '''Run infomap, using multiple processors (if available)''' if num_process == 1: return g.community_infomap(edge_weights=edge_weights, vertex_weights=vertex_weights, trials=NR_INFOMAP_TRIALS) - import multiprocessing - with multiprocessing.Pool(num_process) as p: + import multiprocessing as mp + with mp.Pool(min(num_process, NR_INFOMAP_TRIALS)) as p: rs = [p.apply_async(run_infomap1, (g, edge_weights, vertex_weights, 1)) for _ in range(NR_INFOMAP_TRIALS)] p.close() diff --git a/SemiBin/main.py b/SemiBin/main.py index 09b7a33..056451e 100644 --- a/SemiBin/main.py +++ b/SemiBin/main.py @@ -764,7 +764,7 @@ def generate_sequence_features_single(logger, contig_fasta, logger.info('Calculating coverage for every sample.') - with Pool(num_process if num_process != 0 else None) as pool: + with Pool(min(num_process, len(bams))) as pool: results = [ pool.apply_async( generate_cov, @@ -874,7 +874,7 @@ def fasta_sample_iter(fn): os.path.join(args.output, f'samples/{sample}.fa'), args.ratio) - with Pool(args.num_process if args.num_process != 0 else None) as pool: + with Pool(min(args.num_process, len(args.bams))) as pool: results = [ pool.apply_async( generate_cov, diff --git a/SemiBin/naive_orffinder.py b/SemiBin/naive_orffinder.py index 2227ead..0f5791b 100644 --- a/SemiBin/naive_orffinder.py +++ b/SemiBin/naive_orffinder.py @@ -9,6 +9,12 @@ STOP_CODONS = ['TAA', 'TAG', 'TGA'] MIN_LEN = 90 +# See https://github.com/BigDataBiology/SemiBin/issues/155 +# A bit of testing showed that scaling tops up at 10~12 processes (on a 16-core +# machine). This leaves a little leeway for a few more processes, but still +# caps it to avoid #155 +MAX_PROCESS_ORFS = 24 + def reverse_complement(seq): return seq[::-1].translate(str.maketrans('ATGC', 'TACG')) @@ -123,7 +129,7 @@ def run_naiveorf(fasta_path, num_process, output): out.write(f'{translate(extract(seq, orf))}\n') else: ctx = mp.get_context('spawn') - with ctx.Pool(processes=num_process) as p: + with ctx.Pool(processes=min(num_process, MAX_PROCESS_ORFS)) as p: outs = p.imap(get_orfs, fasta.fasta_iter(fasta_path), chunksize=8)