Skip to content

Commit

Permalink
Fix "Interrupted system call" errors on the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Aug 13, 2018
1 parent 81943c1 commit 026cfb3
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions R/clustermq.R
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
run_clustermq <- function(config){
assert_pkgs("clustermq")
config$workers <- clustermq::workers(
n_jobs = config$jobs,
template = config$template
)
on.exit(config$workers$cleanup())
cmq_set_common_data(config)
config$queue <- new_priority_queue(config = config, jobs = 1)
config$counter <- new.env(parent = emptyenv())
config$counter$remaining <- config$queue$size()
cmq_master(config)
if (!config$queue$empty()){
config$workers <- clustermq::workers(
n_jobs = config$jobs,
template = config$template
)
on.exit(config$workers$cleanup())
cmq_set_common_data(config)
cmq_master(config)
}
}

cmq_set_common_data <- function(config){
Expand All @@ -29,7 +29,7 @@ cmq_set_common_data <- function(config){
}

cmq_master <- function(config){
while (cmq_work_remains(config)){
while (cmq_work_remains(config) || config$workers$workers_running > 0){
msg <- config$workers$receive_data()
cmq_conclude_build(msg = msg, config = config)
if (identical(msg$id, "WORKER_UP")){
Expand All @@ -41,15 +41,15 @@ cmq_master <- function(config){
config$workers$send_shutdown_worker()
}
} else if (identical(msg$id, "WORKER_DONE")) {
w$disconnect_worker()
config$workers$disconnect_worker(msg)
} else if (identical(msg$id, "WORKER_ERROR")) {
stop("clustermq worker error") # nocov
}
}
}

cmq_work_remains <- function(config){
config$counter$remaining > 0
!config$queue$empty()
}

cmq_send_target <- function(config){
Expand Down Expand Up @@ -119,23 +119,22 @@ cmq_conclude_build <- function(msg, config){
if (is.null(build)){
return()
}
conclude_build(
target = build$target,
value = build$value,
meta = build$meta,
config = config
)
cmq_conclude_target(target = build$target, config = config)
set_attempt_flag(key = build$target, config = config)
mc_wait_outfile_checksum(
target = build$target,
checksum = build$checksum,
config = config
)
conclude_build(
target = build$target,
value = build$value,
meta = build$meta,
config = config
)
}

cmq_conclude_target <- function(target, config){
config$counter$remaining <- config$counter$remaining - 1
revdeps <- dependencies(
targets = target,
config = config,
Expand Down

0 comments on commit 026cfb3

Please sign in to comment.