From 026cfb32bd940e13037634063bc94d43dda83737 Mon Sep 17 00:00:00 2001 From: wlandau-lilly Date: Mon, 13 Aug 2018 08:56:09 -0400 Subject: [PATCH] Fix "Interrupted system call" errors on the cluster --- R/clustermq.R | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/R/clustermq.R b/R/clustermq.R index b2fcac046..0aee07832 100644 --- a/R/clustermq.R +++ b/R/clustermq.R @@ -1,15 +1,15 @@ run_clustermq <- function(config){ assert_pkgs("clustermq") - config$workers <- clustermq::workers( - n_jobs = config$jobs, - template = config$template - ) - on.exit(config$workers$cleanup()) - cmq_set_common_data(config) config$queue <- new_priority_queue(config = config, jobs = 1) - config$counter <- new.env(parent = emptyenv()) - config$counter$remaining <- config$queue$size() - cmq_master(config) + if (!config$queue$empty()){ + config$workers <- clustermq::workers( + n_jobs = config$jobs, + template = config$template + ) + on.exit(config$workers$cleanup()) + cmq_set_common_data(config) + cmq_master(config) + } } cmq_set_common_data <- function(config){ @@ -29,7 +29,7 @@ cmq_set_common_data <- function(config){ } cmq_master <- function(config){ - while (cmq_work_remains(config)){ + while (cmq_work_remains(config) || config$workers$workers_running > 0){ msg <- config$workers$receive_data() cmq_conclude_build(msg = msg, config = config) if (identical(msg$id, "WORKER_UP")){ @@ -41,7 +41,7 @@ cmq_master <- function(config){ config$workers$send_shutdown_worker() } } else if (identical(msg$id, "WORKER_DONE")) { - w$disconnect_worker() + config$workers$disconnect_worker(msg) } else if (identical(msg$id, "WORKER_ERROR")) { stop("clustermq worker error") # nocov } @@ -49,7 +49,7 @@ cmq_master <- function(config){ } cmq_work_remains <- function(config){ - config$counter$remaining > 0 + !config$queue$empty() } cmq_send_target <- function(config){ @@ -119,12 +119,6 @@ cmq_conclude_build <- function(msg, config){ if (is.null(build)){ return() } - conclude_build( - target = build$target, - value = build$value, - meta = build$meta, - config = config - ) cmq_conclude_target(target = build$target, config = config) set_attempt_flag(key = build$target, config = config) mc_wait_outfile_checksum( @@ -132,10 +126,15 @@ cmq_conclude_build <- function(msg, config){ checksum = build$checksum, config = config ) + conclude_build( + target = build$target, + value = build$value, + meta = build$meta, + config = config + ) } cmq_conclude_target <- function(target, config){ - config$counter$remaining <- config$counter$remaining - 1 revdeps <- dependencies( targets = target, config = config,