Skip to content

Commit

Permalink
Sketch #531
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Oct 6, 2018
1 parent 7e1da4c commit ab3222a
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 27 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Version 6.0.0.9000

## New features

- Enable the `caching` argument for `"clustermq"` parallelism. Now, `make(parallelism = "clustermq", caching = "master")` will do all the caching with the master process, and `make(parallelism = "clustermq", caching = "worker")` will do all the caching with the workers.

## Bug fixes

- Call `path.expand()` on the `file` argument to `render_drake_graph()` and `render_sankey_drake_graph()`. That way, tildes in file paths no longer interfere with the rendering of static image files. Compensates for https://github.com/wch/webshot.
Expand Down
1 change: 1 addition & 0 deletions R/build.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ build_check_store <- function(
if (flag_attempt && target %in% config$plan$target){
set_attempt_flag(key = target, config = config)
}
invisible()
}

build_and_store <- function(target, config, meta = NULL, announce = TRUE){
Expand Down
19 changes: 17 additions & 2 deletions R/clustermq.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,18 @@ cmq_build <- function(target, meta, deps, config){
config$envir[[dep]] <- deps[[dep]]
}
build <- just_build(target = target, meta = meta, config = config)
build$checksum <- mc_output_file_checksum(target, config)
build
if (identical(config$caching, "master")){
build$checksum <- mc_output_file_checksum(target, config)
return(build)
}
conclude_build(
target = build$target,
value = build$value,
meta = build$meta,
config = config
)
set_attempt_flag(key = build$target, config = config)
list(target = target, checksum = mc_get_checksum(target, config))
}

cmq_conclude_build <- function(msg, config){
Expand All @@ -122,6 +132,11 @@ cmq_conclude_build <- function(msg, config){
return()
}
cmq_conclude_target(target = build$target, config = config)
if (identical(config$caching, "worker")){
mc_wait_checksum(
target = build$target, checksum = build$checksum, config = config)
return()
}
set_attempt_flag(key = build$target, config = config)
mc_wait_outfile_checksum(
target = build$target,
Expand Down
5 changes: 3 additions & 2 deletions R/config.R
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@
#' To reset the random number generator seed for a project,
#' use `clean(destroy = TRUE)`.
#'
#' @param caching character string, only applies to `"future"` parallelism.
#' Can be either `"master"` or `"worker"`.
#' @param caching character string, only applies to
#' `"clustermq"` and `"future"` parallelism.
#' The `caching` argument can be either `"master"` or `"worker"`.
#' - `"master"`: Targets are built by remote workers and sent back to
#' the master process. Then, the master process saves them to the
#' cache (`config$cache`, usually a file system `storr`).
Expand Down
5 changes: 3 additions & 2 deletions man/drake_config.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions man/make.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 23 additions & 19 deletions tests/testthat/test-clustermq.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,29 @@ test_with_dir("clustermq parallelism", {
jobs <- scenario$jobs # ignoring for now, using 2 jobs
load_mtcars_example(envir = e)
for (parallelism in c("clustermq", "clustermq_staged")){
make(
e$my_plan,
parallelism = parallelism,
jobs = jobs,
envir = e,
verbose = 4,
garbage_collection = TRUE
)
config <- drake_config(e$my_plan, envir = e)
expect_equal(outdated(config), character(0))
make(
e$my_plan,
parallelism = parallelism,
jobs = jobs,
envir = e,
verbose = 4
)
expect_equal(justbuilt(config), character(0))
clean(destroy = TRUE)
for (caching in c("master", "worker")){
make(
e$my_plan,
parallelism = parallelism,
jobs = jobs,
caching = caching,
envir = e,
verbose = 4,
garbage_collection = TRUE
)
config <- drake_config(e$my_plan, envir = e)
expect_equal(outdated(config), character(0))
make(
e$my_plan,
parallelism = parallelism,
jobs = jobs,
caching = caching,
envir = e,
verbose = 4
)
expect_equal(justbuilt(config), character(0))
clean(destroy = TRUE)
}
}
if ("package:clustermq" %in% search()){
eval(parse(text = "detach('package:clustermq', unload = TRUE)"))
Expand Down

0 comments on commit ab3222a

Please sign in to comment.