diff --git a/.gitignore b/.gitignore index 986e9d3..8a1711c 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ test_*.*am v*/ figures/* build/* +dev/ \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 859f2d3..6f81970 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -229,14 +229,14 @@ add_subdirectory(${CMAKE_CURRENT_BINARY_DIR}/googletest-src ${CMAKE_CURRENT_BINARY_DIR}/googletest-build EXCLUDE_FROM_ALL) -# leviosam_test -add_executable(leviosam_test src/leviosam_test.cpp) -target_link_libraries(leviosam_test lvsam) -target_link_libraries(leviosam_test ${HTS_LIB}) -target_link_libraries(leviosam_test ${SDSL_LIB}) -target_link_libraries(leviosam_test gtest gtest_main) - -add_test(NAME leviosam_test COMMAND leviosam_test +# leviosam_utils_test +add_executable(leviosam_utils_test src/leviosam_utils_test.cpp) +target_link_libraries(leviosam_utils_test lvsam) +target_link_libraries(leviosam_utils_test ${HTS_LIB}) +target_link_libraries(leviosam_utils_test ${SDSL_LIB}) +target_link_libraries(leviosam_utils_test gtest gtest_main) + +add_test(NAME leviosam_utils_test COMMAND leviosam_utils_test WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/testdata ) diff --git a/src/leviosam.cpp b/src/leviosam.cpp index 43a03d9..be733d3 100644 --- a/src/leviosam.cpp +++ b/src/leviosam.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -80,6 +81,48 @@ bool add_split_rule(std::vector> &split_rules, return true; } +/** Updates allocated threads given input args. + * + * LevioSAM2 expects either (1) --threads (-t) or (2) --hts_threads and/or + * --lift_threads is set to a non-default value. If --threads and any of + * --hts_threads and --lift_threads is set, exits with an error message. + * + * If only --threads is set, dynamically assigns --hts_threads to + * max(1, --threads/4) and --lift_threads to be --threads - --hts_threads. + * Or updates --threads to be the sum of --hts_threads and --lift_threads. + * + * @param args leviosam2 arguments + * @return 0 if successful, 1 if not. + */ +uint8_t update_thread_allocation(lift_opts &args) { + if (args.lift_threads < 1) { + std::cerr << "[E::update_thread_allocation] --lift_threads must " + ">= 1.\n"; + return 1; + } + if (args.threads > DEFAULT_NUM_THREADS) { + if (args.lift_threads != DEFAULT_NUM_LIFT_THREADS || + args.hts_threads != DEFAULT_NUM_HTS_THREADS) { + std::cerr << "[E::update_thread_allocation] There are two ways to " + "set the number of threads: (1) set --threads (-t). " + "With this approach, leviosam2 assigns `max(1, t/4)` " + "threads to (de)compress I/O files and the rest to " + "the lift core. (2) set --hts_threads and/or " + "--lift_threads separately.\n"; + return 1; + } + args.hts_threads = std::max(1, args.threads / 4); + args.lift_threads = args.threads - args.hts_threads; + } else if (args.lift_threads != DEFAULT_NUM_LIFT_THREADS || + args.hts_threads != DEFAULT_NUM_HTS_THREADS) { + args.threads = args.hts_threads + args.lift_threads; + } + std::cerr << "[I::update_thread_allocation] --theads=" << args.threads + << ", --lift_threads=" << args.lift_threads + << ", --hts_threads=" << args.hts_threads << "\n"; + return 0; +} + NameMap parse_name_map(const char *fname) { NameMap names; FILE *fp = fopen(fname, "r"); @@ -106,8 +149,8 @@ void serialize_run(lift_opts args) { exit(1); } if (args.length_map.size() == 0) { - std::cerr - << "[E::serialize_run] No length map is found. Please set `-F`.\n"; + std::cerr << "[E::serialize_run] No length map is found. Please " + "set `-F`.\n"; print_serialize_help_msg(); exit(1); } @@ -132,9 +175,8 @@ void serialize_run(lift_opts args) { std::cerr << "[I::serialize_run] levioSAM ChainMap saved to " << fn_index << "\n"; } else { - std::cerr - << "[E::serialize_run] Cannot build a levioSAM index. Please set " - "-v or -c properly\n"; + std::cerr << "[E::serialize_run] Cannot build a levioSAM index. " + "Please set -v or -c properly\n"; print_serialize_help_msg(); exit(1); } @@ -177,7 +219,8 @@ void read_and_lift(T *lift_map, std::mutex *mutex_fread, std::string null_dest_contig; lift_map->lift_aln(aln_vec[i], hdr_source, hdr_dest, null_dest_contig, args.keep_mapq); - // Skip update MD, NM tags and realignment if `md_flag` is not set + // Skip update MD, NM tags and realignment if `md_flag` is not + // set if (!args.md_flag) { // strip MD and NM tags if `md_flag` not set bc liftover // invalidates them @@ -360,6 +403,10 @@ void lift_run(lift_opts args) { samFile *sam_fp = (args.sam_fname == "") ? sam_open("-", "r") : sam_open(args.sam_fname.data(), "r"); + + // Gets extra threads to decompress HTS files when requested + if (args.hts_threads > 0) hts_set_threads(sam_fp, args.hts_threads); + if (!sam_fp) { std::cerr << "[E::lift_run] Invalid alignment input\n"; exit(1); @@ -377,6 +424,8 @@ void lift_run(lift_opts args) { std::cerr << "[E::lift_run] Invalid alignment output\n"; exit(1); } + // Gets extra threads to compress HTS files when requested + if (args.hts_threads > 0) hts_set_threads(out_sam_fp, args.hts_threads); sam_hdr_t *hdr_orig = sam_hdr_read(sam_fp); if (!hdr_orig) { @@ -417,7 +466,7 @@ void lift_run(lift_opts args) { std::vector threads; std::mutex mutex_fread, mutex_fwrite; - for (int j = 0; j < args.threads; j++) { + for (int j = 0; j < args.lift_threads; j++) { if (args.chain_fname == "" && args.chainmap_fname == "") { threads.push_back(std::thread(read_and_lift, &lift_map, &mutex_fread, @@ -430,7 +479,7 @@ void lift_run(lift_opts args) { hdr_orig, hdr, &ref_dict, &wd, args)); } } - for (int j = 0; j < args.threads; j++) { + for (int j = 0; j < args.lift_threads; j++) { if (threads[j].joinable()) threads[j].join(); } threads.clear(); @@ -478,18 +527,39 @@ void print_lift_help_msg() { "to be lifted. \n"; std::cerr << " Leave empty or set to \"-\" to read " "from stdin.\n"; - std::cerr << " -t INT Number of threads used. [1] \n"; - std::cerr << " -m add MD and NM to output alignment records " - "(requires -f option)\n"; - std::cerr << " -f path Path to the FASTA file of the target " - "reference. \n"; + std::cerr << " -t INT Number of threads used.\n" + " If -t is not set, the value would " + "be the sum of\n" + " --hts_threads and --lift_threads. [1] \n"; + std::cerr << " --lift_threads INT " + "Number of threads used for lifting reads. \n" + " " // align + "If -t is set, the value should be left unset.\n" + " " // align + "The value would be inferred as `t - max(1, t/4)`. [1]\n"; + std::cerr << " --hts_threads INT " + "Number of threads used to compress/decompress HTS files.\n" + " " // align + "This can improve thread scaling.\n" + " " // align + "If -t is set, the value should be left unset.\n" + " " // align + "The value would be inferred as `max(1, t/4)`. [0]\n"; + std::cerr << " -m " + "Add MD and NM to output alignment records (requires -f)\n"; + std::cerr << " -f path " + "Path to the FASTA file of the target reference. \n"; std::cerr << " -x path Re-alignment preset. [] \n"; - std::cerr << " -G INT Number of allowed CIGAR changes for one " - "alingment. [0]\n"; - std::cerr << " -T INT Chunk size for each thread. [256] \n"; - std::cerr << " Each thread queries <-T> reads, lifts, " - "and writes.\n"; - std::cerr << " Setting a higher <-T> uses slightly more " + std::cerr << " -G INT " + "Number of allowed CIGAR changes (in base pairs) for one " + "alignment. [0]\n"; + std::cerr << " -T INT " + "Chunk size for each thread. [256] \n" + " " // align + "Each thread queries <-T> reads, lifts, " + "and writes.\n" + " " // align + "Setting a larger -T uses slightly more " "memory but might benefit thread scaling.\n"; std::cerr << "\n"; std::cerr << " Commit/defer rule options:\n"; @@ -591,10 +661,12 @@ int main(int argc, char **argv) { {"vcf", required_argument, 0, 'v'}, {"verbose", required_argument, 0, 'V'}, {"realign_yaml", required_argument, 0, 'x'}, + {"keep_mapq", no_argument, 0, OPT_KEEP_MAPQ}, + {"lift_threads", required_argument, 0, OPT_LIFT_THREADS}, + {"hts_threads", required_argument, 0, OPT_HTS_THREADS}, {"ultima_genomics", no_argument, 0, OPT_ULTIMA_GENOMICS}, {"help", no_argument, 0, OPT_HELP}, - {"version", no_argument, 0, OPT_VERSION}, - {"keep_mapq", no_argument, 0, OPT_KEEP_MAPQ}}; + {"version", no_argument, 0, OPT_VERSION}}; int long_index = 0; while ((c = getopt_long(argc, argv, "hma:B:c:C:d:D:f:F:g:G:l:n:O:p:r:R:s:S:t:T:v:V:x:", @@ -691,6 +763,12 @@ int main(int argc, char **argv) { case OPT_KEEP_MAPQ: args.keep_mapq = true; break; + case OPT_LIFT_THREADS: + args.lift_threads = atoi(optarg); + break; + case OPT_HTS_THREADS: + args.hts_threads = atoi(optarg); + break; default: std::cerr << "[E::main] Invalid flag value " << c << "\n"; exit(1); @@ -730,8 +808,8 @@ int main(int argc, char **argv) { } if (args.realign_yaml != "") { if (args.ref_name == "") { - std::cerr - << "[E::main] Option `-f` must be set when `-x` is not empty\n"; + std::cerr << "[E::main] Option `-f` must be non-empty when `-x` " + "is not empty\n"; exit(1); } else { const char *fc = args.realign_yaml.c_str(); @@ -740,12 +818,13 @@ int main(int argc, char **argv) { ryml::parse_in_arena(ryml::to_csubstr(yaml)); args.aln_opts.deserialize_realn(realign_tree); args.aln_opts.print_parameters(); - // if (args.verbose >= VERBOSE_INFO) { - // args.aln_opts.print_parameters(); - // } } } + if (update_thread_allocation(args)) { + exit(ERROR_THREADS_ALLOCATION); + } + if (!strcmp(argv[optind], "lift")) { lift_run(args); } else if (!strcmp(argv[optind], "serialize") || diff --git a/src/leviosam.hpp b/src/leviosam.hpp index 16295b3..23712b7 100644 --- a/src/leviosam.hpp +++ b/src/leviosam.hpp @@ -24,8 +24,8 @@ #include "aln.hpp" #include "bed.hpp" -#include "cigar.hpp" #include "chain.hpp" +#include "cigar.hpp" #include "leviosam_utils.hpp" #include "version.hpp" #include "yaml.hpp" @@ -38,8 +38,16 @@ #define OPT_HELP 1000 #define OPT_VERSION 1001 #define OPT_KEEP_MAPQ 1002 +#define OPT_LIFT_THREADS 1100 +#define OPT_HTS_THREADS 1101 #define OPT_ULTIMA_GENOMICS 2001 +#define ERROR_THREADS_ALLOCATION 21 + +const int DEFAULT_NUM_THREADS = 1; +const int DEFAULT_NUM_LIFT_THREADS = 1; +const int DEFAULT_NUM_HTS_THREADS = 0; + using NameMap = std::vector>; using LengthMap = std::vector>; using BedMap = @@ -68,7 +76,9 @@ struct lift_opts { std::string split_mode = ""; std::vector> split_rules; int allowed_cigar_changes = 0; - int threads = 1; + uint16_t threads = DEFAULT_NUM_THREADS; + uint16_t lift_threads = DEFAULT_NUM_LIFT_THREADS; + uint16_t hts_threads = DEFAULT_NUM_HTS_THREADS; int chunk_size = 256; int verbose = 0; int md_flag = 0; @@ -108,6 +118,7 @@ void print_serialize_help_msg(); bool check_split_rule(std::string rule); bool add_split_rule(std::vector> &split_rules, std::string s); +uint8_t update_thread_allocation(lift_opts &args); namespace lift { // Serialization diff --git a/src/leviosam_test.cpp b/src/leviosam_utils_test.cpp similarity index 99% rename from src/leviosam_test.cpp rename to src/leviosam_utils_test.cpp index 7482500..b2c5d5d 100644 --- a/src/leviosam_test.cpp +++ b/src/leviosam_utils_test.cpp @@ -1,7 +1,7 @@ /* - * leviosam_test.cpp + * leviosam_utils_test.cpp * - * Test levioSAM functions + * Test levioSAM utility functions * * Authors: Taher Mun, Nae-Chyun Chen, Ben Langmead * Dept. of Computer Science, Johns Hopkins University @@ -9,7 +9,7 @@ * Distributed under the MIT license * https://github.com/milkschen/leviosam2 */ -#include "leviosam.hpp" +#include "leviosam_utils.hpp" #include @@ -19,7 +19,7 @@ #include "bed.hpp" #include "gtest/gtest.h" #include "htslib/sam.h" -#include "leviosam_utils.hpp" +#include "leviosam.hpp" // // bit_vector tests diff --git a/src/version.hpp b/src/version.hpp index 0d8ab9d..2176556 100644 --- a/src/version.hpp +++ b/src/version.hpp @@ -1,3 +1,3 @@ #ifndef VERSION -#define VERSION "0.4.2-02b4ab7" +#define VERSION "0.4.2-30dcb4a" #endif \ No newline at end of file