Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using multiple threads to (de)compress hts files #74

Merged
merged 6 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ test_*.*am
v*/
figures/*
build/*
dev/
16 changes: 8 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,14 @@ add_subdirectory(${CMAKE_CURRENT_BINARY_DIR}/googletest-src
${CMAKE_CURRENT_BINARY_DIR}/googletest-build
EXCLUDE_FROM_ALL)

# leviosam_test
add_executable(leviosam_test src/leviosam_test.cpp)
target_link_libraries(leviosam_test lvsam)
target_link_libraries(leviosam_test ${HTS_LIB})
target_link_libraries(leviosam_test ${SDSL_LIB})
target_link_libraries(leviosam_test gtest gtest_main)

add_test(NAME leviosam_test COMMAND leviosam_test
# leviosam_utils_test
add_executable(leviosam_utils_test src/leviosam_utils_test.cpp)
target_link_libraries(leviosam_utils_test lvsam)
target_link_libraries(leviosam_utils_test ${HTS_LIB})
target_link_libraries(leviosam_utils_test ${SDSL_LIB})
target_link_libraries(leviosam_utils_test gtest gtest_main)

add_test(NAME leviosam_utils_test COMMAND leviosam_utils_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/testdata
)

Expand Down
131 changes: 105 additions & 26 deletions src/leviosam.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <stdio.h>
#include <zlib.h>

#include <algorithm>
#include <ctime>
#include <vector>

Expand Down Expand Up @@ -80,6 +81,48 @@ bool add_split_rule(std::vector<std::pair<std::string, float>> &split_rules,
return true;
}

/** Updates allocated threads given input args.
*
* LevioSAM2 expects either (1) --threads (-t) or (2) --hts_threads and/or
* --lift_threads is set to a non-default value. If --threads and any of
* --hts_threads and --lift_threads is set, exits with an error message.
*
* If only --threads is set, dynamically assigns --hts_threads to
* max(1, --threads/4) and --lift_threads to be --threads - --hts_threads.
* Or updates --threads to be the sum of --hts_threads and --lift_threads.
*
* @param args leviosam2 arguments
* @return 0 if successful, 1 if not.
*/
uint8_t update_thread_allocation(lift_opts &args) {
if (args.lift_threads < 1) {
std::cerr << "[E::update_thread_allocation] --lift_threads must "
">= 1.\n";
return 1;
}
if (args.threads > DEFAULT_NUM_THREADS) {
if (args.lift_threads != DEFAULT_NUM_LIFT_THREADS ||
args.hts_threads != DEFAULT_NUM_HTS_THREADS) {
std::cerr << "[E::update_thread_allocation] There are two ways to "
"set the number of threads: (1) set --threads (-t). "
"With this approach, leviosam2 assigns `max(1, t/4)` "
"threads to (de)compress I/O files and the rest to "
"the lift core. (2) set --hts_threads and/or "
"--lift_threads separately.\n";
return 1;
}
args.hts_threads = std::max(1, args.threads / 4);
args.lift_threads = args.threads - args.hts_threads;
} else if (args.lift_threads != DEFAULT_NUM_LIFT_THREADS ||
args.hts_threads != DEFAULT_NUM_HTS_THREADS) {
args.threads = args.hts_threads + args.lift_threads;
}
std::cerr << "[I::update_thread_allocation] --theads=" << args.threads
<< ", --lift_threads=" << args.lift_threads
<< ", --hts_threads=" << args.hts_threads << "\n";
return 0;
}

NameMap parse_name_map(const char *fname) {
NameMap names;
FILE *fp = fopen(fname, "r");
Expand All @@ -106,8 +149,8 @@ void serialize_run(lift_opts args) {
exit(1);
}
if (args.length_map.size() == 0) {
std::cerr
<< "[E::serialize_run] No length map is found. Please set `-F`.\n";
std::cerr << "[E::serialize_run] No length map is found. Please "
"set `-F`.\n";
print_serialize_help_msg();
exit(1);
}
Expand All @@ -132,9 +175,8 @@ void serialize_run(lift_opts args) {
std::cerr << "[I::serialize_run] levioSAM ChainMap saved to "
<< fn_index << "\n";
} else {
std::cerr
<< "[E::serialize_run] Cannot build a levioSAM index. Please set "
"-v or -c properly\n";
std::cerr << "[E::serialize_run] Cannot build a levioSAM index. "
"Please set -v or -c properly\n";
print_serialize_help_msg();
exit(1);
}
Expand Down Expand Up @@ -177,7 +219,8 @@ void read_and_lift(T *lift_map, std::mutex *mutex_fread,
std::string null_dest_contig;
lift_map->lift_aln(aln_vec[i], hdr_source, hdr_dest,
null_dest_contig, args.keep_mapq);
// Skip update MD, NM tags and realignment if `md_flag` is not set
// Skip update MD, NM tags and realignment if `md_flag` is not
// set
if (!args.md_flag) {
// strip MD and NM tags if `md_flag` not set bc liftover
// invalidates them
Expand Down Expand Up @@ -360,6 +403,10 @@ void lift_run(lift_opts args) {
samFile *sam_fp = (args.sam_fname == "")
? sam_open("-", "r")
: sam_open(args.sam_fname.data(), "r");

// Gets extra threads to decompress HTS files when requested
if (args.hts_threads > 0) hts_set_threads(sam_fp, args.hts_threads);

if (!sam_fp) {
std::cerr << "[E::lift_run] Invalid alignment input\n";
exit(1);
Expand All @@ -377,6 +424,8 @@ void lift_run(lift_opts args) {
std::cerr << "[E::lift_run] Invalid alignment output\n";
exit(1);
}
// Gets extra threads to compress HTS files when requested
if (args.hts_threads > 0) hts_set_threads(out_sam_fp, args.hts_threads);

sam_hdr_t *hdr_orig = sam_hdr_read(sam_fp);
if (!hdr_orig) {
Expand Down Expand Up @@ -417,7 +466,7 @@ void lift_run(lift_opts args) {

std::vector<std::thread> threads;
std::mutex mutex_fread, mutex_fwrite;
for (int j = 0; j < args.threads; j++) {
for (int j = 0; j < args.lift_threads; j++) {
if (args.chain_fname == "" && args.chainmap_fname == "") {
threads.push_back(std::thread(read_and_lift<lift::LiftMap>,
&lift_map, &mutex_fread,
Expand All @@ -430,7 +479,7 @@ void lift_run(lift_opts args) {
hdr_orig, hdr, &ref_dict, &wd, args));
}
}
for (int j = 0; j < args.threads; j++) {
for (int j = 0; j < args.lift_threads; j++) {
if (threads[j].joinable()) threads[j].join();
}
threads.clear();
Expand Down Expand Up @@ -478,18 +527,39 @@ void print_lift_help_msg() {
"to be lifted. \n";
std::cerr << " Leave empty or set to \"-\" to read "
"from stdin.\n";
std::cerr << " -t INT Number of threads used. [1] \n";
std::cerr << " -m add MD and NM to output alignment records "
"(requires -f option)\n";
std::cerr << " -f path Path to the FASTA file of the target "
"reference. \n";
std::cerr << " -t INT Number of threads used.\n"
" If -t is not set, the value would "
"be the sum of\n"
" --hts_threads and --lift_threads. [1] \n";
std::cerr << " --lift_threads INT "
"Number of threads used for lifting reads. \n"
" " // align
"If -t is set, the value should be left unset.\n"
" " // align
"The value would be inferred as `t - max(1, t/4)`. [1]\n";
std::cerr << " --hts_threads INT "
"Number of threads used to compress/decompress HTS files.\n"
" " // align
"This can improve thread scaling.\n"
" " // align
"If -t is set, the value should be left unset.\n"
" " // align
"The value would be inferred as `max(1, t/4)`. [0]\n";
std::cerr << " -m "
"Add MD and NM to output alignment records (requires -f)\n";
std::cerr << " -f path "
"Path to the FASTA file of the target reference. \n";
std::cerr << " -x path Re-alignment preset. [] \n";
std::cerr << " -G INT Number of allowed CIGAR changes for one "
"alingment. [0]\n";
std::cerr << " -T INT Chunk size for each thread. [256] \n";
std::cerr << " Each thread queries <-T> reads, lifts, "
"and writes.\n";
std::cerr << " Setting a higher <-T> uses slightly more "
std::cerr << " -G INT "
"Number of allowed CIGAR changes (in base pairs) for one "
"alignment. [0]\n";
std::cerr << " -T INT "
"Chunk size for each thread. [256] \n"
" " // align
"Each thread queries <-T> reads, lifts, "
"and writes.\n"
" " // align
"Setting a larger -T uses slightly more "
"memory but might benefit thread scaling.\n";
std::cerr << "\n";
std::cerr << " Commit/defer rule options:\n";
Expand Down Expand Up @@ -591,10 +661,12 @@ int main(int argc, char **argv) {
{"vcf", required_argument, 0, 'v'},
{"verbose", required_argument, 0, 'V'},
{"realign_yaml", required_argument, 0, 'x'},
{"keep_mapq", no_argument, 0, OPT_KEEP_MAPQ},
{"lift_threads", required_argument, 0, OPT_LIFT_THREADS},
{"hts_threads", required_argument, 0, OPT_HTS_THREADS},
{"ultima_genomics", no_argument, 0, OPT_ULTIMA_GENOMICS},
{"help", no_argument, 0, OPT_HELP},
{"version", no_argument, 0, OPT_VERSION},
{"keep_mapq", no_argument, 0, OPT_KEEP_MAPQ}};
{"version", no_argument, 0, OPT_VERSION}};
int long_index = 0;
while ((c = getopt_long(argc, argv,
"hma:B:c:C:d:D:f:F:g:G:l:n:O:p:r:R:s:S:t:T:v:V:x:",
Expand Down Expand Up @@ -691,6 +763,12 @@ int main(int argc, char **argv) {
case OPT_KEEP_MAPQ:
args.keep_mapq = true;
break;
case OPT_LIFT_THREADS:
args.lift_threads = atoi(optarg);
break;
case OPT_HTS_THREADS:
args.hts_threads = atoi(optarg);
break;
default:
std::cerr << "[E::main] Invalid flag value " << c << "\n";
exit(1);
Expand Down Expand Up @@ -730,8 +808,8 @@ int main(int argc, char **argv) {
}
if (args.realign_yaml != "") {
if (args.ref_name == "") {
std::cerr
<< "[E::main] Option `-f` must be set when `-x` is not empty\n";
std::cerr << "[E::main] Option `-f` must be non-empty when `-x` "
"is not empty\n";
exit(1);
} else {
const char *fc = args.realign_yaml.c_str();
Expand All @@ -740,12 +818,13 @@ int main(int argc, char **argv) {
ryml::parse_in_arena(ryml::to_csubstr(yaml));
args.aln_opts.deserialize_realn(realign_tree);
args.aln_opts.print_parameters();
// if (args.verbose >= VERBOSE_INFO) {
// args.aln_opts.print_parameters();
// }
}
}

if (update_thread_allocation(args)) {
exit(ERROR_THREADS_ALLOCATION);
}

if (!strcmp(argv[optind], "lift")) {
lift_run(args);
} else if (!strcmp(argv[optind], "serialize") ||
Expand Down
15 changes: 13 additions & 2 deletions src/leviosam.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

#include "aln.hpp"
#include "bed.hpp"
#include "cigar.hpp"
#include "chain.hpp"
#include "cigar.hpp"
#include "leviosam_utils.hpp"
#include "version.hpp"
#include "yaml.hpp"
Expand All @@ -38,8 +38,16 @@
#define OPT_HELP 1000
#define OPT_VERSION 1001
#define OPT_KEEP_MAPQ 1002
#define OPT_LIFT_THREADS 1100
#define OPT_HTS_THREADS 1101
#define OPT_ULTIMA_GENOMICS 2001

#define ERROR_THREADS_ALLOCATION 21

const int DEFAULT_NUM_THREADS = 1;
const int DEFAULT_NUM_LIFT_THREADS = 1;
const int DEFAULT_NUM_HTS_THREADS = 0;

using NameMap = std::vector<std::pair<std::string, std::string>>;
using LengthMap = std::vector<std::pair<std::string, int32_t>>;
using BedMap =
Expand Down Expand Up @@ -68,7 +76,9 @@ struct lift_opts {
std::string split_mode = "";
std::vector<std::pair<std::string, float>> split_rules;
int allowed_cigar_changes = 0;
int threads = 1;
uint16_t threads = DEFAULT_NUM_THREADS;
uint16_t lift_threads = DEFAULT_NUM_LIFT_THREADS;
uint16_t hts_threads = DEFAULT_NUM_HTS_THREADS;
int chunk_size = 256;
int verbose = 0;
int md_flag = 0;
Expand Down Expand Up @@ -108,6 +118,7 @@ void print_serialize_help_msg();
bool check_split_rule(std::string rule);
bool add_split_rule(std::vector<std::pair<std::string, float>> &split_rules,
std::string s);
uint8_t update_thread_allocation(lift_opts &args);

namespace lift {
// Serialization
Expand Down
8 changes: 4 additions & 4 deletions src/leviosam_test.cpp → src/leviosam_utils_test.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/*
* leviosam_test.cpp
* leviosam_utils_test.cpp
*
* Test levioSAM functions
* Test levioSAM utility functions
*
* Authors: Taher Mun, Nae-Chyun Chen, Ben Langmead
* Dept. of Computer Science, Johns Hopkins University
*
* Distributed under the MIT license
* https://github.com/milkschen/leviosam2
*/
#include "leviosam.hpp"
#include "leviosam_utils.hpp"

#include <unistd.h>

Expand All @@ -19,7 +19,7 @@
#include "bed.hpp"
#include "gtest/gtest.h"
#include "htslib/sam.h"
#include "leviosam_utils.hpp"
#include "leviosam.hpp"

//
// bit_vector tests
Expand Down
2 changes: 1 addition & 1 deletion src/version.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#ifndef VERSION
#define VERSION "0.4.2-02b4ab7"
#define VERSION "0.4.2-30dcb4a"
#endif
Loading