Skip to content

Commit

Permalink
Add a manual counter of targets that have not completed
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Aug 13, 2018
1 parent 7228b3b commit 81943c1
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions R/clustermq.R
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
run_clustermq <- function(config){
assert_pkgs("clustermq")
config$queue <- new_priority_queue(config = config, jobs = 1)
config$workers <- clustermq::workers(
n_jobs = config$jobs,
template = config$template
)
on.exit(config$workers$cleanup())
cmq_set_common_data(config)
config$queue <- new_priority_queue(config = config, jobs = 1)
config$counter <- new.env(parent = emptyenv())
config$counter$remaining <- config$queue$size()
cmq_master(config)
}

Expand All @@ -29,7 +31,7 @@ cmq_set_common_data <- function(config){
cmq_master <- function(config){
while (cmq_work_remains(config)){
msg <- config$workers$receive_data()
cmq_conclude_job(msg = msg, config = config)
cmq_conclude_build(msg = msg, config = config)
if (identical(msg$id, "WORKER_UP")){
config$workers$send_common_data()
} else if (identical(msg$id, "WORKER_READY")) {
Expand All @@ -47,7 +49,7 @@ cmq_master <- function(config){
}

cmq_work_remains <- function(config){
!config$queue$empty()
config$counter$remaining > 0
}

cmq_send_target <- function(config){
Expand All @@ -59,6 +61,7 @@ cmq_send_target <- function(config){
meta <- drake_meta(target = target, config = config)
if (!should_build_target(target = target, meta = meta, config = config)){
console_skip(target = target, config = config)
cmq_conclude_target(target = target, config = config)
config$workers$send_wait()
return()
}
Expand Down Expand Up @@ -111,7 +114,7 @@ cmq_build <- function(target, meta, deps, config){
build
}

cmq_conclude_job <- function(msg, config){
cmq_conclude_build <- function(msg, config){
build <- msg$result
if (is.null(build)){
return()
Expand All @@ -122,17 +125,22 @@ cmq_conclude_job <- function(msg, config){
meta = build$meta,
config = config
)
revdeps <- dependencies(
targets = build$target,
config = config,
reverse = TRUE
) %>%
intersect(y = config$queue$list())
config$queue$decrease_key(targets = revdeps)
cmq_conclude_target(target = build$target, config = config)
set_attempt_flag(key = build$target, config = config)
mc_wait_outfile_checksum(
target = build$target,
checksum = build$checksum,
config = config
)
}

cmq_conclude_target <- function(target, config){
config$counter$remaining <- config$counter$remaining - 1
revdeps <- dependencies(
targets = target,
config = config,
reverse = TRUE
) %>%
intersect(y = config$queue$list())
config$queue$decrease_key(targets = revdeps)
}

0 comments on commit 81943c1

Please sign in to comment.