diff --git a/DESCRIPTION b/DESCRIPTION index 03c3b4bef..7dfe9b62c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -107,6 +107,8 @@ Suggests: styler, visNetwork, webshot +Remotes: + mschubert/clustermq@develop VignetteBuilder: knitr Encoding: UTF-8 RoxygenNote: 6.1.0 diff --git a/NAMESPACE b/NAMESPACE index acb4e0712..3ab4c2713 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -26,6 +26,7 @@ export(check) export(check_plan) export(clean) export(cleaned_namespaces) +export(cmq_build) export(cmq_staged_build) export(code_to_plan) export(config) diff --git a/NEWS.md b/NEWS.md index 530020f1c..028026b6c 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,6 @@ # Version 5.4.0.9000 +- Add a proper [`clustermq`](https://github.com/mschubert/clustermq)-based parallel backend: `make(parallelism = "clustermq")`. - Smooth the edges in `vis_drake_graph()` and `render_drake_graph()`. - Make hover text slightly more readable in in `vis_drake_graph()` and `render_drake_graph()`. - Align hover text properly in `vis_drake_graph()` using the "title" node column. diff --git a/R/clustermq.R b/R/clustermq.R new file mode 100644 index 000000000..2759a1274 --- /dev/null +++ b/R/clustermq.R @@ -0,0 +1,145 @@ +run_clustermq <- function(config){ + assert_pkg("clustermq", version = "0.8.4.99") + config$queue <- new_priority_queue(config = config, jobs = 1) + if (!config$queue$empty()){ + config$workers <- clustermq::workers( + n_jobs = config$jobs, + template = config$template + ) + cmq_set_common_data(config) + config$counter <- new.env(parent = emptyenv()) + config$counter$remaining <- config$queue$size() + cmq_master(config) + } +} + +cmq_set_common_data <- function(config){ + export <- list() + if (identical(config$envir, globalenv())){ + export <- as.list(config$envir, all.names = TRUE) # nocov + } + export$config <- config + config$workers$set_common_data( + export = export, + fun = identity, + const = list(), + rettype = list(), + common_seed = config$seed, + token = "set_common_data_token" + ) +} + +cmq_master <- function(config){ + on.exit(config$workers$finalize()) + while (config$counter$remaining > 0){ + msg <- config$workers$receive_data() + cmq_conclude_build(msg = msg, config = config) + if (!identical(msg$token, "set_common_data_token")){ + config$workers$send_common_data() + } else if (!config$queue$empty()){ + cmq_send_target(config) + } else { + config$workers$send_shutdown_worker() + } + } + if (config$workers$cleanup()){ + on.exit() + } +} + +cmq_send_target <- function(config){ + target <- config$queue$pop0() + # Longer tests will catch this: + if (!length(target)){ + config$workers$send_wait() # nocov + return() # nocov + } + meta <- drake_meta(target = target, config = config) + # Target should not even be in the priority queue + # nocov start + if (!should_build_target(target = target, meta = meta, config = config)){ + console_skip(target = target, config = config) + cmq_conclude_target(target = target, config = config) + config$workers$send_wait() + return() + } + # nocov end + meta$start <- proc.time() + announce_build(target = target, meta = meta, config = config) + prune_envir(targets = target, config = config, jobs = 1) + deps <- cmq_deps_list(target = target, config = config) + config$workers$send_call( + expr = drake::cmq_build( + target = target, + meta = meta, + deps = deps, + config = config + ), + env = list(target = target, meta = meta, deps = deps) + ) +} + +cmq_deps_list <- function(target, config){ + deps <- dependencies(targets = target, config = config) %>% + intersect(config$plan$target) + lapply( + X = deps, + FUN = function(name){ + config$envir[[name]] + } + ) %>% + setNames(deps) +} + +#' @title Build a target using the clustermq backend +#' @description For internal use only +#' @export +#' @keywords internal +#' @inheritParams drake_build +#' @param target target name +#' @param meta list of metadata +#' @param deps named list of target dependencies +#' @param config a [drake_config()] list +cmq_build <- function(target, meta, deps, config){ + if (identical(config$garbage_collection, TRUE)){ + gc() + } + do_prework(config = config, verbose_packages = FALSE) + for (dep in names(deps)){ + config$envir[[dep]] <- deps[[dep]] + } + build <- just_build(target = target, meta = meta, config = config) + build$checksum <- mc_output_file_checksum(target, config) + build +} + +cmq_conclude_build <- function(msg, config){ + build <- msg$result + if (is.null(build)){ + return() + } + cmq_conclude_target(target = build$target, config = config) + set_attempt_flag(key = build$target, config = config) + mc_wait_outfile_checksum( + target = build$target, + checksum = build$checksum, + config = config + ) + conclude_build( + target = build$target, + value = build$value, + meta = build$meta, + config = config + ) +} + +cmq_conclude_target <- function(target, config){ + revdeps <- dependencies( + targets = target, + config = config, + reverse = TRUE + ) %>% + intersect(y = config$queue$list()) + config$queue$decrease_key(targets = revdeps) + config$counter$remaining <- config$counter$remaining - 1 +} diff --git a/R/mclapply.R b/R/mclapply.R index e22e42ffc..9e77743a5 100644 --- a/R/mclapply.R +++ b/R/mclapply.R @@ -4,7 +4,7 @@ run_mclapply <- function(config){ } assert_pkg("txtq") mc_init_worker_cache(config) - tmp <- mclapply( + parallel::mclapply( X = mc_worker_id(c(0, seq_len(config$jobs))), FUN = mc_process, mc.cores = config$jobs + 1, diff --git a/R/parallel.R b/R/parallel.R index eaab08e05..0f95448b2 100644 --- a/R/parallel.R +++ b/R/parallel.R @@ -16,7 +16,7 @@ lightly_parallelize <- function(X, FUN, jobs = 1, ...) { if (is.atomic(X)){ lightly_parallelize_atomic(X = X, FUN = FUN, jobs = jobs, ...) } else { - mclapply(X = X, FUN = FUN, mc.cores = jobs, ...) + safe_mclapply(X = X, FUN = FUN, mc.cores = jobs, ...) } } @@ -24,10 +24,20 @@ lightly_parallelize_atomic <- function(X, FUN, jobs = 1, ...){ jobs <- safe_jobs_imports(jobs) keys <- unique(X) index <- match(X, keys) - values <- mclapply(X = keys, FUN = FUN, mc.cores = jobs, ...) + values <- safe_mclapply(X = keys, FUN = FUN, mc.cores = jobs, ...) values[index] } +# Avoid SIGCHLD handler when mc.cores is 1. +# Could help avoid zeromq interrupted system call errors. +safe_mclapply <- function(X, FUN, mc.cores, ...){ + if (mc.cores > 1){ + parallel::mclapply(X = X, FUN = FUN, mc.cores = mc.cores, ...) + } else { + lapply(X = X, FUN = FUN, ...) + } +} + # x is parallelism or jobs imports_setting <- function(x){ if (length(x) < 2){ diff --git a/R/parallel_ui.R b/R/parallel_ui.R index b0b3ff2b9..985d712f6 100644 --- a/R/parallel_ui.R +++ b/R/parallel_ui.R @@ -105,6 +105,7 @@ parallelism_choices <- function(distributed_only = FALSE) { "parLapply_staged" ) distributed <- c( + "clustermq", "clustermq_staged", "future", "future_lapply", diff --git a/R/priority_queue.R b/R/priority_queue.R index d7e85e184..4427cf758 100644 --- a/R/priority_queue.R +++ b/R/priority_queue.R @@ -1,4 +1,4 @@ -new_priority_queue <- function(config){ +new_priority_queue <- function(config, jobs = config$jobs_imports){ config$graph <- config$schedule targets <- V(config$graph)$name if (!length(targets)){ @@ -9,7 +9,7 @@ new_priority_queue <- function(config){ FUN = function(target){ length(dependencies(targets = target, config = config)) }, - jobs = config$jobs_imports + jobs = jobs ) %>% unlist priorities <- rep(Inf, length(targets)) diff --git a/R/staged.R b/R/staged.R index 37931a8b0..f31b23479 100644 --- a/R/staged.R +++ b/R/staged.R @@ -1,4 +1,4 @@ -next_stage <- function(config, schedule) { +next_stage <- function(config, schedule, jobs) { targets <- character(0) old_leaves <- NULL meta_list <- list() @@ -14,7 +14,7 @@ next_stage <- function(config, schedule) { new_meta <- lightly_parallelize( X = new_leaves, FUN = drake_meta, - jobs = config$jobs_imports, + jobs = jobs, config = config ) names(new_meta) <- new_leaves @@ -27,7 +27,7 @@ next_stage <- function(config, schedule) { config = config ) }, - jobs = config$jobs_imports + jobs = jobs ) %>% unlist targets <- c(targets, new_leaves[do_build]) @@ -47,7 +47,11 @@ run_mclapply_staged <- function(config){ config$jobs <- safe_jobs(config$jobs) schedule <- config$schedule while (length(V(schedule)$name)){ - stage <- next_stage(config = config, schedule = schedule) + stage <- next_stage( + config = config, + schedule = schedule, + jobs = config$jobs + ) schedule <- stage$schedule if (!length(stage$targets)){ break @@ -55,7 +59,7 @@ run_mclapply_staged <- function(config){ set_attempt_flag(key = "_attempt", config = config) } prune_envir(targets = stage$targets, config = config, jobs = config$jobs) - tmp <- mclapply( + parallel::mclapply( X = stage$targets, FUN = function(target){ build_and_store( @@ -100,7 +104,11 @@ run_parLapply_staged <- function(config) { # nolint ) schedule <- config$schedule while (length(V(schedule)$name)){ - stage <- next_stage(config = config, schedule = schedule) + stage <- next_stage( + config = config, + schedule = schedule, + jobs = config$jobs + ) schedule <- stage$schedule if (!length(stage$targets)){ break @@ -146,7 +154,11 @@ run_future_lapply_staged <- function(config){ prepare_distributed(config = config) schedule <- config$schedule while (length(V(schedule)$name)){ - stage <- next_stage(config = config, schedule = schedule) + stage <- next_stage( + config = config, + schedule = schedule, + jobs = config$jobs_imports + ) schedule <- stage$schedule if (!length(stage$targets)){ # Keep in case outdated targets are ever back in the schedule. @@ -165,26 +177,33 @@ run_future_lapply_staged <- function(config){ } run_clustermq_staged <- function(config){ - assert_pkg("clustermq") + assert_pkg("clustermq", version = "0.8.4.99") schedule <- config$schedule - workers <- clustermq::workers( - n_jobs = config$jobs, - template = config$template - ) - on.exit(workers$finalize()) + workers <- NULL while (length(V(schedule)$name)){ - stage <- next_stage(config = config, schedule = schedule) + stage <- next_stage( + config = config, + schedule = schedule, + jobs = 1 # config$jobs # nolint + ) schedule <- stage$schedule if (!length(stage$targets)){ # Keep in case outdated targets are ever back in the schedule. break # nocov - } else if (any(stage$targets %in% config$plan$target)){ + } else if (is.null(workers)){ + workers <- clustermq::workers( + n_jobs = config$jobs, + template = config$template + ) + on.exit(workers$finalize()) + } + if (any(stage$targets %in% config$plan$target)){ set_attempt_flag(key = "_attempt", config = config) } prune_envir( targets = stage$targets, config = config, - jobs = config$jobs_imports + jobs = 1 # config$jobs_imports # nolint ) export <- list() if (identical(config$envir, globalenv())){ @@ -202,7 +221,7 @@ run_clustermq_staged <- function(config){ config = config ) }, - jobs = config$jobs_imports + jobs = 1 # config$jobs_imports # nolint ) builds <- clustermq::Q( stage$targets, @@ -234,13 +253,16 @@ run_clustermq_staged <- function(config){ config = config ) }, - jobs = config$jobs_imports + jobs = 1 # config$jobs_imports # nolint ) } + if (!is.null(workers) && workers$cleanup()){ + on.exit() + } invisible() } -#' @title Build a target using the clustermq backend +#' @title Build a target using the clustermq_staged backend #' @description For internal use only #' @export #' @keywords internal diff --git a/inst/testing/scenarios.csv b/inst/testing/scenarios.csv index f60d66a1f..84706c2e5 100644 --- a/inst/testing/scenarios.csv +++ b/inst/testing/scenarios.csv @@ -5,6 +5,7 @@ envir,parallelism,jobs,backend,caching "local","mclapply",1,, "local","mclapply",9,, "local","mclapply_staged",2,, +"local","clustermq",2,, "local","clustermq_staged",9,, "local","Makefile",9,, "local","future_lapply",2,"future::multisession", @@ -16,6 +17,7 @@ envir,parallelism,jobs,backend,caching "global","mclapply",1,, "global","mclapply",2,, "global","mclapply_staged",9,, +"global","clustermq",9,, "global","clustermq_staged",2,, "global","Makefile",2,, "global","future_lapply",1,"future::multisession", diff --git a/man/cmq_build.Rd b/man/cmq_build.Rd new file mode 100644 index 000000000..ce69c8bdf --- /dev/null +++ b/man/cmq_build.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/clustermq.R +\name{cmq_build} +\alias{cmq_build} +\title{Build a target using the clustermq backend} +\usage{ +cmq_build(target, meta, deps, config) +} +\arguments{ +\item{target}{target name} + +\item{meta}{list of metadata} + +\item{deps}{named list of target dependencies} + +\item{config}{a \code{\link[=drake_config]{drake_config()}} list} +} +\description{ +For internal use only +} +\keyword{internal} diff --git a/man/cmq_staged_build.Rd b/man/cmq_staged_build.Rd index 09cd93cba..a59de5df3 100644 --- a/man/cmq_staged_build.Rd +++ b/man/cmq_staged_build.Rd @@ -2,7 +2,7 @@ % Please edit documentation in R/staged.R \name{cmq_staged_build} \alias{cmq_staged_build} -\title{Build a target using the clustermq backend} +\title{Build a target using the clustermq_staged backend} \usage{ cmq_staged_build(target, meta_list, config) } diff --git a/tests/testthat/test-clustermq.R b/tests/testthat/test-clustermq.R index 8c9ca535c..c46e0eca6 100644 --- a/tests/testthat/test-clustermq.R +++ b/tests/testthat/test-clustermq.R @@ -1,6 +1,6 @@ drake_context("clustermq") -test_with_dir("clustermq_staged parallelism", { +test_with_dir("clustermq parallelism", { skip_on_cran() if ("package:clustermq" %in% search()){ eval(parse(text = "detach('package:clustermq', unload = TRUE)")) @@ -10,25 +10,29 @@ test_with_dir("clustermq_staged parallelism", { skip_on_os("windows") scenario <- get_testing_scenario() e <- eval(parse(text = scenario$envir)) - jobs <- scenario$jobs + jobs <- scenario$jobs # ignoring for now, using 2 jobs load_mtcars_example(envir = e) - make( - e$my_plan, - parallelism = "clustermq_staged", - jobs = jobs, - envir = e, - verbose = 4 - ) - config <- drake_config(e$my_plan, envir = e) - expect_equal(outdated(config), character(0)) - make( - e$my_plan, - parallelism = "clustermq_staged", - jobs = jobs, - envir = e, - verbose = 4 - ) - expect_equal(justbuilt(config), character(0)) + for (parallelism in c("clustermq", "clustermq_staged")){ + make( + e$my_plan, + parallelism = parallelism, + jobs = 2, + envir = e, + verbose = 4, + garbage_collection = TRUE + ) + config <- drake_config(e$my_plan, envir = e) + expect_equal(outdated(config), character(0)) + make( + e$my_plan, + parallelism = parallelism, + jobs = 2, + envir = e, + verbose = 4 + ) + expect_equal(justbuilt(config), character(0)) + clean(destroy = TRUE) + } if ("package:clustermq" %in% search()){ eval(parse(text = "detach('package:clustermq', unload = TRUE)")) }