Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper clustermq parallelism #501

Merged
merged 33 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ec01141
Sketch non-staged clustermq backend
wlandau-lilly Aug 10, 2018
068b428
Add gc()
wlandau-lilly Aug 10, 2018
0d07740
Do some more work on cmq backend
wlandau-lilly Aug 10, 2018
2759076
Expose interrupted system calls
wlandau-lilly Aug 10, 2018
b5a37e9
Work on cmq_master()
wlandau-lilly Aug 10, 2018
6647c1e
Rm with_preserve_seed
wlandau-lilly Aug 10, 2018
5aaa187
Check the correct output file checksum
wlandau-lilly Aug 11, 2018
0bd244e
Figure out common data later
wlandau-lilly Aug 11, 2018
02cc8b7
Improvements
wlandau-lilly Aug 11, 2018
9de92ea
Send NULL call when skipping a target
wlandau-lilly Aug 11, 2018
75a4dfa
Include clustermq parallelism in test-clustermq.R
wlandau-lilly Aug 11, 2018
8f84aa7
Update news
wlandau-lilly Aug 11, 2018
722fa8b
Add clustermq to remotes
wlandau-lilly Aug 11, 2018
6400868
Try to handle signal errors in cmq_get_msg()
wlandau-lilly Aug 12, 2018
26e871b
Just receive_data() normally
wlandau-lilly Aug 12, 2018
7228b3b
Avoid mclapply calls while clustermq is running
wlandau-lilly Aug 13, 2018
81943c1
Add a manual counter of targets that have not completed
wlandau-lilly Aug 13, 2018
026cfb3
Fix "Interrupted system call" errors on the cluster
wlandau-lilly Aug 13, 2018
4e806ce
Require specific clustermq commit hash
wlandau-lilly Aug 13, 2018
f660108
Remove redundant cleanup call
wlandau-lilly Aug 13, 2018
c471449
Add testing scenarios for persistent clustermq
wlandau-lilly Aug 14, 2018
7b037c7
Improve package version assertions [ci-skip]
wlandau-lilly Aug 18, 2018
898e626
Tweak a test
wlandau-lilly Aug 18, 2018
8ee84b0
Update for the new clustermq worker API
wlandau-lilly Aug 21, 2018
0489ccf
Check that all the results are in
wlandau-lilly Aug 22, 2018
e339d84
Fix the cmq_master() loop
wlandau-lilly Aug 22, 2018
6ad3ed1
Do not begin clustermq_staged workers until the last moment
wlandau-lilly Aug 22, 2018
c91d7ac
Requre clustermq@wapi
wlandau-lilly Aug 22, 2018
622df37
Merge branch 'master' into clustermq
wlandau-lilly Aug 22, 2018
8b1a966
Exempt/cover lines
wlandau-lilly Aug 22, 2018
a2dcf02
Always call cleanup() then finalize() for cmq workers
wlandau-lilly Aug 23, 2018
dbfd6c3
Merge branch 'clustermq' of github.com:ropensci/drake into clustermq
wlandau-lilly Aug 23, 2018
9369d83
Fix #425
wlandau-lilly Aug 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ Suggests:
styler,
visNetwork,
webshot
Remotes:
mschubert/clustermq@develop
VignetteBuilder: knitr
Encoding: UTF-8
RoxygenNote: 6.1.0
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export(check)
export(check_plan)
export(clean)
export(cleaned_namespaces)
export(cmq_build)
export(cmq_staged_build)
export(code_to_plan)
export(config)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Version 5.4.0.9000

- Add a proper [`clustermq`](https://github.com/mschubert/clustermq)-based parallel backend: `make(parallelism = "clustermq")`.
- Smooth the edges in `vis_drake_graph()` and `render_drake_graph()`.
- Make hover text slightly more readable in in `vis_drake_graph()` and `render_drake_graph()`.
- Align hover text properly in `vis_drake_graph()` using the "title" node column.
Expand Down
145 changes: 145 additions & 0 deletions R/clustermq.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
run_clustermq <- function(config){
assert_pkg("clustermq", version = "0.8.4.99")
config$queue <- new_priority_queue(config = config, jobs = 1)
if (!config$queue$empty()){
config$workers <- clustermq::workers(
n_jobs = config$jobs,
template = config$template
)
cmq_set_common_data(config)
config$counter <- new.env(parent = emptyenv())
config$counter$remaining <- config$queue$size()
cmq_master(config)
}
}

cmq_set_common_data <- function(config){
export <- list()
if (identical(config$envir, globalenv())){
export <- as.list(config$envir, all.names = TRUE) # nocov
}
export$config <- config
config$workers$set_common_data(
export = export,
fun = identity,
const = list(),
rettype = list(),
common_seed = config$seed,
token = "set_common_data_token"
)
}

cmq_master <- function(config){
on.exit(config$workers$finalize())
while (config$counter$remaining > 0){
msg <- config$workers$receive_data()
cmq_conclude_build(msg = msg, config = config)
if (!identical(msg$token, "set_common_data_token")){
config$workers$send_common_data()
} else if (!config$queue$empty()){
cmq_send_target(config)
} else {
config$workers$send_shutdown_worker()
}
}
if (config$workers$cleanup()){
on.exit()
}
}

cmq_send_target <- function(config){
target <- config$queue$pop0()
# Longer tests will catch this:
if (!length(target)){
config$workers$send_wait() # nocov
return() # nocov
}
meta <- drake_meta(target = target, config = config)
# Target should not even be in the priority queue
# nocov start
if (!should_build_target(target = target, meta = meta, config = config)){
console_skip(target = target, config = config)
cmq_conclude_target(target = target, config = config)
config$workers$send_wait()
return()
}
# nocov end
meta$start <- proc.time()
announce_build(target = target, meta = meta, config = config)
prune_envir(targets = target, config = config, jobs = 1)
deps <- cmq_deps_list(target = target, config = config)
config$workers$send_call(
expr = drake::cmq_build(
target = target,
meta = meta,
deps = deps,
config = config
),
env = list(target = target, meta = meta, deps = deps)
)
}

cmq_deps_list <- function(target, config){
deps <- dependencies(targets = target, config = config) %>%
intersect(config$plan$target)
lapply(
X = deps,
FUN = function(name){
config$envir[[name]]
}
) %>%
setNames(deps)
}

#' @title Build a target using the clustermq backend
#' @description For internal use only
#' @export
#' @keywords internal
#' @inheritParams drake_build
#' @param target target name
#' @param meta list of metadata
#' @param deps named list of target dependencies
#' @param config a [drake_config()] list
cmq_build <- function(target, meta, deps, config){
if (identical(config$garbage_collection, TRUE)){
gc()
}
do_prework(config = config, verbose_packages = FALSE)
for (dep in names(deps)){
config$envir[[dep]] <- deps[[dep]]
}
build <- just_build(target = target, meta = meta, config = config)
build$checksum <- mc_output_file_checksum(target, config)
build
}

cmq_conclude_build <- function(msg, config){
build <- msg$result
if (is.null(build)){
return()
}
cmq_conclude_target(target = build$target, config = config)
set_attempt_flag(key = build$target, config = config)
mc_wait_outfile_checksum(
target = build$target,
checksum = build$checksum,
config = config
)
conclude_build(
target = build$target,
value = build$value,
meta = build$meta,
config = config
)
}

cmq_conclude_target <- function(target, config){
revdeps <- dependencies(
targets = target,
config = config,
reverse = TRUE
) %>%
intersect(y = config$queue$list())
config$queue$decrease_key(targets = revdeps)
config$counter$remaining <- config$counter$remaining - 1
}
2 changes: 1 addition & 1 deletion R/mclapply.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ run_mclapply <- function(config){
}
assert_pkg("txtq")
mc_init_worker_cache(config)
tmp <- mclapply(
parallel::mclapply(
X = mc_worker_id(c(0, seq_len(config$jobs))),
FUN = mc_process,
mc.cores = config$jobs + 1,
Expand Down
14 changes: 12 additions & 2 deletions R/parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,28 @@ lightly_parallelize <- function(X, FUN, jobs = 1, ...) {
if (is.atomic(X)){
lightly_parallelize_atomic(X = X, FUN = FUN, jobs = jobs, ...)
} else {
mclapply(X = X, FUN = FUN, mc.cores = jobs, ...)
safe_mclapply(X = X, FUN = FUN, mc.cores = jobs, ...)
}
}

lightly_parallelize_atomic <- function(X, FUN, jobs = 1, ...){
jobs <- safe_jobs_imports(jobs)
keys <- unique(X)
index <- match(X, keys)
values <- mclapply(X = keys, FUN = FUN, mc.cores = jobs, ...)
values <- safe_mclapply(X = keys, FUN = FUN, mc.cores = jobs, ...)
values[index]
}

# Avoid SIGCHLD handler when mc.cores is 1.
# Could help avoid zeromq interrupted system call errors.
safe_mclapply <- function(X, FUN, mc.cores, ...){
if (mc.cores > 1){
parallel::mclapply(X = X, FUN = FUN, mc.cores = mc.cores, ...)
} else {
lapply(X = X, FUN = FUN, ...)
}
}

# x is parallelism or jobs
imports_setting <- function(x){
if (length(x) < 2){
Expand Down
1 change: 1 addition & 0 deletions R/parallel_ui.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ parallelism_choices <- function(distributed_only = FALSE) {
"parLapply_staged"
)
distributed <- c(
"clustermq",
"clustermq_staged",
"future",
"future_lapply",
Expand Down
4 changes: 2 additions & 2 deletions R/priority_queue.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
new_priority_queue <- function(config){
new_priority_queue <- function(config, jobs = config$jobs_imports){
config$graph <- config$schedule
targets <- V(config$graph)$name
if (!length(targets)){
Expand All @@ -9,7 +9,7 @@ new_priority_queue <- function(config){
FUN = function(target){
length(dependencies(targets = target, config = config))
},
jobs = config$jobs_imports
jobs = jobs
) %>%
unlist
priorities <- rep(Inf, length(targets))
Expand Down
60 changes: 41 additions & 19 deletions R/staged.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
next_stage <- function(config, schedule) {
next_stage <- function(config, schedule, jobs) {
targets <- character(0)
old_leaves <- NULL
meta_list <- list()
Expand All @@ -14,7 +14,7 @@ next_stage <- function(config, schedule) {
new_meta <- lightly_parallelize(
X = new_leaves,
FUN = drake_meta,
jobs = config$jobs_imports,
jobs = jobs,
config = config
)
names(new_meta) <- new_leaves
Expand All @@ -27,7 +27,7 @@ next_stage <- function(config, schedule) {
config = config
)
},
jobs = config$jobs_imports
jobs = jobs
) %>%
unlist
targets <- c(targets, new_leaves[do_build])
Expand All @@ -47,15 +47,19 @@ run_mclapply_staged <- function(config){
config$jobs <- safe_jobs(config$jobs)
schedule <- config$schedule
while (length(V(schedule)$name)){
stage <- next_stage(config = config, schedule = schedule)
stage <- next_stage(
config = config,
schedule = schedule,
jobs = config$jobs
)
schedule <- stage$schedule
if (!length(stage$targets)){
break
} else if (any(stage$targets %in% config$plan$target)){
set_attempt_flag(key = "_attempt", config = config)
}
prune_envir(targets = stage$targets, config = config, jobs = config$jobs)
tmp <- mclapply(
parallel::mclapply(
X = stage$targets,
FUN = function(target){
build_and_store(
Expand Down Expand Up @@ -100,7 +104,11 @@ run_parLapply_staged <- function(config) { # nolint
)
schedule <- config$schedule
while (length(V(schedule)$name)){
stage <- next_stage(config = config, schedule = schedule)
stage <- next_stage(
config = config,
schedule = schedule,
jobs = config$jobs
)
schedule <- stage$schedule
if (!length(stage$targets)){
break
Expand Down Expand Up @@ -146,7 +154,11 @@ run_future_lapply_staged <- function(config){
prepare_distributed(config = config)
schedule <- config$schedule
while (length(V(schedule)$name)){
stage <- next_stage(config = config, schedule = schedule)
stage <- next_stage(
config = config,
schedule = schedule,
jobs = config$jobs_imports
)
schedule <- stage$schedule
if (!length(stage$targets)){
# Keep in case outdated targets are ever back in the schedule.
Expand All @@ -165,26 +177,33 @@ run_future_lapply_staged <- function(config){
}

run_clustermq_staged <- function(config){
assert_pkg("clustermq")
assert_pkg("clustermq", version = "0.8.4.99")
schedule <- config$schedule
workers <- clustermq::workers(
n_jobs = config$jobs,
template = config$template
)
on.exit(workers$finalize())
workers <- NULL
while (length(V(schedule)$name)){
stage <- next_stage(config = config, schedule = schedule)
stage <- next_stage(
config = config,
schedule = schedule,
jobs = 1 # config$jobs # nolint
)
schedule <- stage$schedule
if (!length(stage$targets)){
# Keep in case outdated targets are ever back in the schedule.
break # nocov
} else if (any(stage$targets %in% config$plan$target)){
} else if (is.null(workers)){
workers <- clustermq::workers(
n_jobs = config$jobs,
template = config$template
)
on.exit(workers$finalize())
}
if (any(stage$targets %in% config$plan$target)){
set_attempt_flag(key = "_attempt", config = config)
}
prune_envir(
targets = stage$targets,
config = config,
jobs = config$jobs_imports
jobs = 1 # config$jobs_imports # nolint
)
export <- list()
if (identical(config$envir, globalenv())){
Expand All @@ -202,7 +221,7 @@ run_clustermq_staged <- function(config){
config = config
)
},
jobs = config$jobs_imports
jobs = 1 # config$jobs_imports # nolint
)
builds <- clustermq::Q(
stage$targets,
Expand Down Expand Up @@ -234,13 +253,16 @@ run_clustermq_staged <- function(config){
config = config
)
},
jobs = config$jobs_imports
jobs = 1 # config$jobs_imports # nolint
)
}
if (!is.null(workers) && workers$cleanup()){
on.exit()
}
invisible()
}

#' @title Build a target using the clustermq backend
#' @title Build a target using the clustermq_staged backend
#' @description For internal use only
#' @export
#' @keywords internal
Expand Down
2 changes: 2 additions & 0 deletions inst/testing/scenarios.csv
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ envir,parallelism,jobs,backend,caching
"local","mclapply",1,,
"local","mclapply",9,,
"local","mclapply_staged",2,,
"local","clustermq",2,,
"local","clustermq_staged",9,,
"local","Makefile",9,,
"local","future_lapply",2,"future::multisession",
Expand All @@ -16,6 +17,7 @@ envir,parallelism,jobs,backend,caching
"global","mclapply",1,,
"global","mclapply",2,,
"global","mclapply_staged",9,,
"global","clustermq",9,,
"global","clustermq_staged",2,,
"global","Makefile",2,,
"global","future_lapply",1,"future::multisession",
Expand Down
Loading