Skip to content

Commit

Permalink
Address #124
Browse files Browse the repository at this point in the history
build_distributed() now returns invisible()
  • Loading branch information
wlandau-lilly committed Oct 31, 2017
1 parent 2c0fcbd commit 1e5daed
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 26 deletions.
6 changes: 3 additions & 3 deletions R/Makefile.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ run_Makefile <- function( #nolint: we want Makefile capitalized.
run = TRUE,
debug = FALSE
){
config <- prepare_distributed(config = config)
prepare_distributed(config = config)
with_output_sink(
new = "Makefile",
code = {
Expand Down Expand Up @@ -95,8 +95,8 @@ mk <- function(
target = character(0),
cache_path = drake::default_cache_path()
){
config <- build_distributed(target = target, cache_path = cache_path)
config <- inventory(config)
build_distributed(target = target, cache_path = cache_path)
config <- recover_config(cache_path)
new_hash <- self_hash(target = target, config = config)
if (!identical(config$old_hash, new_hash)){
file <- time_stamp_file(target = target, config = config)
Expand Down
3 changes: 2 additions & 1 deletion R/console.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ console_up_to_date <- function(config){
if (config$imports_only){
return(invisible())
}
if (config$verbose && !length(config$attempted_targets)){
any_attempted <- length(config$cache$list(namespace = "target_attempts"))
if (config$verbose && !any_attempted){
color("All targets are already up to date.\n", colors["target"]) %>%
cat
}
Expand Down
6 changes: 6 additions & 0 deletions R/current.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,9 @@ file_current <- function(target, hashes, config){
}
identical(config$cache$get(target)$value, hashes$file)
}

log_target_attempts <- Vectorize(function(targets, config){
config$cache$set(key = targets, value = targets,
namespace = "target_attempts")
},
"targets")
27 changes: 16 additions & 11 deletions R/distributed.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ prepare_distributed <- function(config){
file = globalenv_file(this_cache_path)
)
}
config$attempted_targets <- outdated(
target_attempts <- outdated(
plan = config$plan,
targets = config$targets,
envir = config$envir,
Expand All @@ -18,19 +18,13 @@ prepare_distributed <- function(config){
packages = config$packages,
prework = config$prework
)
log_target_attempts(targets = target_attempts, config = config)
config$cache$set("config", config, namespace = "distributed")
invisible(config)
invisible()
}

build_distributed <- function(target, cache_path){
cache <- this_cache(cache_path)
config <- cache$get("config", namespace = "distributed")
if (identical(globalenv(), config$envir)){
dir <- cache_path
file <- globalenv_file(dir)
load(file = file, envir = config$envir)
}
config <- inventory(config)
config <- recover_config(cache_path = cache_path)
do_prework(config = config, verbose_packages = FALSE)
prune_envir(targets = target, config = config)
hash_list <- hash_list(targets = target, config = config)
Expand All @@ -47,5 +41,16 @@ build_distributed <- function(target, cache_path){
config = config
)
}
invisible(config)
invisible()
}

recover_config <- function(cache_path){
cache <- this_cache(cache_path)
config <- cache$get("config", namespace = "distributed")
if (identical(globalenv(), config$envir)){
dir <- cache_path
file <- globalenv_file(dir)
load(file = file, envir = config$envir)
}
inventory(config)
}
7 changes: 4 additions & 3 deletions R/make.R
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ make <- function(
)
check_config(config = config)
store_config(config = config)
store_session(config = config)
initialize_session(config = config)
config <- imports_only_preprocessing(config = config)
config <- run_parallel_backend(config = config)
run_parallel_backend(config = config)
config$parallelism <- parallelism
console_up_to_date(config = config)
return(invisible(config))
Expand Down Expand Up @@ -278,7 +278,8 @@ next_targets <- function(graph_remaining_targets){
names()
}

store_session <- function(config){
initialize_session <- function(config){
config$cache$clear(namespace = "target_attempts")
config$cache$set(
key = "sessionInfo",
value = sessionInfo(),
Expand Down
3 changes: 1 addition & 2 deletions R/parLapply.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ run_parLapply <- function(config) { # nolint
})
clusterCall(cl = config$cluster, fun = do_prework, config = config,
verbose_packages = FALSE)
config <- run_parallel(
run_parallel(
config = config,
worker = worker_parLapply # nolint
)
invisible(config)
}

worker_parLapply <- function(targets, hash_list, config) { # nolint
Expand Down
8 changes: 3 additions & 5 deletions R/parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ run_parallel <- function(config, worker) {
while (length(V(config$graph_remaining_targets))){
config <- parallel_stage(worker = worker, config = config)
}
config
invisible()
}

parallel_stage <- function(worker, config) {
Expand All @@ -24,10 +24,8 @@ parallel_stage <- function(worker, config) {
build_these <- Filter(candidates,
f = function(target)
should_build(target = target, hash_list = hash_list, config = config))
config$attempted_targets <- c(
config$attempted_targets,
intersect(build_these, config$plan$target)
)
target_attempts <- intersect(build_these, config$plan$target)
log_target_attempts(targets = target_attempts, config = config)
hash_list <- hash_list[build_these]
if (length(build_these)){
worker(targets = build_these, hash_list = hash_list,
Expand Down
3 changes: 2 additions & 1 deletion R/timestamp.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ time_stamps <- function(config){
dir_empty(stamp_dir)
write_time_stamp_template(cache_path)
targets <- intersect(V(config$graph)$name, config$plan$target)
stamp_these <- setdiff(targets, config$attempted_targets)
target_attempts <- config$cache$list(namespace = "target_attempts")
stamp_these <- setdiff(targets, target_attempts)
lightly_parallelize(
stamp_these, write_time_stamp, jobs = config$jobs, config = config)
return(invisible())
Expand Down

0 comments on commit 1e5daed

Please sign in to comment.