diff --git a/NEWS.md b/NEWS.md index 434d3c2cc..eab2419dd 100644 --- a/NEWS.md +++ b/NEWS.md @@ -2,7 +2,7 @@ ## New features -- Enable the `caching` argument for `"clustermq"` parallelism. Now, `make(parallelism = "clustermq", caching = "master")` will do all the caching with the master process, and `make(parallelism = "clustermq", caching = "worker")` will do all the caching with the workers. +- Enable the `caching` argument for the `"clustermq"` and `"clustermq_staged"` parallel backends. Now, `make(parallelism = "clustermq", caching = "master")` will do all the caching with the master process, and `make(parallelism = "clustermq", caching = "worker")` will do all the caching with the workers. The same is true for `parallelism = "clustermq_staged"`. ## Bug fixes diff --git a/R/clustermq.R b/R/clustermq.R index aac73333b..d8ca451bc 100644 --- a/R/clustermq.R +++ b/R/clustermq.R @@ -21,6 +21,7 @@ cmq_set_common_data <- function(config){ if (identical(config$envir, globalenv())){ export <- as.list(config$envir, all.names = TRUE) # nocov } + config$cache$flush_cache() export$config <- config config$workers$set_common_data( export = export, @@ -134,7 +135,10 @@ cmq_conclude_build <- function(msg, config){ cmq_conclude_target(target = build$target, config = config) if (identical(config$caching, "worker")){ mc_wait_checksum( - target = build$target, checksum = build$checksum, config = config) + target = build$target, + checksum = build$checksum, + config = config + ) return() } set_attempt_flag(key = build$target, config = config) diff --git a/R/config.R b/R/config.R index 61eaa6e97..c8c03166a 100644 --- a/R/config.R +++ b/R/config.R @@ -305,7 +305,7 @@ #' use `clean(destroy = TRUE)`. #' #' @param caching character string, only applies to -#' `"clustermq"` and `"future"` parallelism. +#' `"clustermq"`, `"clustermq_staged"`, and `"future"` parallel backends. #' The `caching` argument can be either `"master"` or `"worker"`. #' - `"master"`: Targets are built by remote workers and sent back to #' the master process. Then, the master process saves them to the diff --git a/R/staged.R b/R/staged.R index f31b23479..75f816976 100644 --- a/R/staged.R +++ b/R/staged.R @@ -180,6 +180,7 @@ run_clustermq_staged <- function(config){ assert_pkg("clustermq", version = "0.8.4.99") schedule <- config$schedule workers <- NULL + config$cache$flush_cache() while (length(V(schedule)$name)){ stage <- next_stage( config = config, @@ -238,23 +239,37 @@ run_clustermq_staged <- function(config){ workers = workers, export = export ) - lightly_parallelize( - X = builds, - FUN = function(build){ - mc_wait_outfile_checksum( - target = build$target, - checksum = build$checksum, - config = config - ) - conclude_build( - target = build$target, - value = build$value, - meta = build$meta, - config = config - ) - }, - jobs = 1 # config$jobs_imports # nolint - ) + if (identical(config$caching, "master")){ + lightly_parallelize( + X = builds, + FUN = function(build){ + mc_wait_outfile_checksum( + target = build$target, + checksum = build$checksum, + config = config + ) + conclude_build( + target = build$target, + value = build$value, + meta = build$meta, + config = config + ) + }, + jobs = 1 # config$jobs_imports # nolint + ) + } else if (identical(config$caching, "worker")){ + lightly_parallelize( + X = builds, + FUN = function(build){ + mc_wait_checksum( + target = build$target, + checksum = build$checksum, + config = config + ) + }, + jobs = 1 # config$jobs_imports # nolint + ) + } } if (!is.null(workers) && workers$cleanup()){ on.exit() @@ -278,7 +293,16 @@ cmq_staged_build <- function(target, meta_list, config){ meta = meta_list[[target]], config = config ) - build$checksum <- mc_output_file_checksum(target, config) - build + if (identical(config$caching, "master")){ + build$checksum <- mc_output_file_checksum(target, config) + return(build) + } + conclude_build( + target = build$target, + value = build$value, + meta = build$meta, + config = config + ) + list(target = target, checksum = mc_get_checksum(target, config)) # nocov end } diff --git a/inst/testing/scenarios.csv b/inst/testing/scenarios.csv index 84706c2e5..2aa53a8ca 100644 --- a/inst/testing/scenarios.csv +++ b/inst/testing/scenarios.csv @@ -5,8 +5,8 @@ envir,parallelism,jobs,backend,caching "local","mclapply",1,, "local","mclapply",9,, "local","mclapply_staged",2,, -"local","clustermq",2,, -"local","clustermq_staged",9,, +"local","clustermq",2,,"master" +"local","clustermq_staged",9,,"worker" "local","Makefile",9,, "local","future_lapply",2,"future::multisession", "local","future_lapply_staged",1,"future::multisession", @@ -17,8 +17,8 @@ envir,parallelism,jobs,backend,caching "global","mclapply",1,, "global","mclapply",2,, "global","mclapply_staged",9,, -"global","clustermq",9,, -"global","clustermq_staged",2,, +"global","clustermq",9,,"worker" +"global","clustermq_staged",2,,"master" "global","Makefile",2,, "global","future_lapply",1,"future::multisession", "global","future_lapply_staged",2,"future::multisession", diff --git a/man/drake_config.Rd b/man/drake_config.Rd index a2b35a750..c7907869c 100644 --- a/man/drake_config.Rd +++ b/man/drake_config.Rd @@ -315,7 +315,7 @@ To reset the random number generator seed for a project, use \code{clean(destroy = TRUE)}.} \item{caching}{character string, only applies to -\code{"clustermq"} and \code{"future"} parallelism. +\code{"clustermq"}, \code{"clustermq_staged"}, and \code{"future"} parallel backends. The \code{caching} argument can be either \code{"master"} or \code{"worker"}. \itemize{ \item \code{"master"}: Targets are built by remote workers and sent back to diff --git a/man/make.Rd b/man/make.Rd index f43090319..141ae8362 100644 --- a/man/make.Rd +++ b/man/make.Rd @@ -322,7 +322,7 @@ To reset the random number generator seed for a project, use \code{clean(destroy = TRUE)}.} \item{caching}{character string, only applies to -\code{"clustermq"} and \code{"future"} parallelism. +\code{"clustermq"}, \code{"clustermq_staged"}, and \code{"future"} parallel backends. The \code{caching} argument can be either \code{"master"} or \code{"worker"}. \itemize{ \item \code{"master"}: Targets are built by remote workers and sent back to diff --git a/tests/testthat/test-clustermq.R b/tests/testthat/test-clustermq.R index 3c51b0b43..6eb911168 100644 --- a/tests/testthat/test-clustermq.R +++ b/tests/testthat/test-clustermq.R @@ -14,6 +14,8 @@ test_with_dir("clustermq parallelism", { load_mtcars_example(envir = e) for (parallelism in c("clustermq", "clustermq_staged")){ for (caching in c("master", "worker")){ + config <- drake_config(e$my_plan, envir = e) + expect_equal(length(outdated(config)), nrow(config$plan)) make( e$my_plan, parallelism = parallelism, @@ -23,7 +25,6 @@ test_with_dir("clustermq parallelism", { verbose = 4, garbage_collection = TRUE ) - config <- drake_config(e$my_plan, envir = e) expect_equal(outdated(config), character(0)) make( e$my_plan,