Skip to content

Commit

Permalink
Avoid duplicated branch aggregation: just send the branches over the …
Browse files Browse the repository at this point in the history
…network.
  • Loading branch information
wlandau committed Nov 10, 2024
1 parent b0f63f9 commit 425a7f4
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally
The methodology in this package
borrows from GNU 'Make' (2015, ISBN:978-9881443519)
and 'drake' (2018, <doi:10.21105/joss.00550>).
Version: 1.8.0.9012
Version: 1.8.0.9013
License: MIT + file LICENSE
URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets
BugReports: https://github.com/ropensci/targets/issues
Expand Down
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# targets 1.8.0.9012 (development)
# targets 1.8.0.9013 (development)

* Un-break workflows that use `format = "file_fast"` (#1339, @koefoeden).
* Fix deadlock in `error = "trim"` (#1340, @koefoeden).
Expand All @@ -18,6 +18,7 @@
* Omit whole pattern targets from branch subpipelines when possible. Should reduce memory consumption in some cases.
* Omit whole stem targets from branch subpipelines when `retrieval` is `"main"` and only a bud is actually used. The same cannot be done with branches because each branch may need to be (un)marshaled individually.
* Compress branches into references when `retrieval` is `"worker"` and the whole pattern is part of the subpipeline.
* Avoid duplicated branch aggregation: just send the branches over the network.

# targets 1.8.0

Expand Down
11 changes: 2 additions & 9 deletions R/class_builder.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ target_prepare.tar_builder <- function(
progress = scheduler$progress,
pending = pending
)
builder_ensure_deps(target, pipeline, "main")
target_ensure_deps_main(target, pipeline)
builder_update_subpipeline(target, pipeline)
}

Expand Down Expand Up @@ -138,7 +138,7 @@ target_run.tar_builder <- function(target, envir, path_store) {
}
on.exit(builder_unset_tar_runtime(), add = TRUE)
on.exit(target$subpipeline <- NULL, add = TRUE)
builder_ensure_deps(target, target$subpipeline, "worker")
target_ensure_deps_worker(target, target$subpipeline)
frames <- frames_produce(envir, target, target$subpipeline)
builder_set_tar_runtime(target, frames)
store_update_stage_early(
Expand Down Expand Up @@ -304,13 +304,6 @@ target_validate.tar_builder <- function(target) {
}
}

builder_ensure_deps <- function(target, pipeline, retrieval) {
if (!identical(target$settings$retrieval, retrieval)) {
return()
}
target_ensure_deps(target, pipeline)
}

builder_update_subpipeline <- function(target, pipeline) {
target$subpipeline <- pipeline_produce_subpipeline(
pipeline,
Expand Down
16 changes: 15 additions & 1 deletion R/class_target.R
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,27 @@ target_ensure_dep <- function(target, dep, pipeline) {
)
}

target_ensure_deps <- function(target, pipeline) {
target_ensure_deps_worker <- function(target, pipeline) {
map(
target_deps_shallow(target, pipeline),
~target_ensure_dep(target, pipeline_get_target(pipeline, .x), pipeline)
)
}

target_ensure_deps_main <- function(target, pipeline) {
for (name in target_deps_shallow(target, pipeline)) {
dep <- pipeline_get_target(pipeline, name)
if (inherits(dep, "tar_pattern")) {
map(
target_get_children(dep),
~target_ensure_dep(target, pipeline_get_target(pipeline, .x), pipeline)
)
} else {
target_ensure_dep(target, dep, pipeline)
}
}
}

target_value_null <- function(target) {
value_init(
object = NULL,
Expand Down
2 changes: 1 addition & 1 deletion R/class_workspace.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ workspace_read <- function(name, path_store) {
}

workspace_populate <- function(workspace) {
target_ensure_deps(workspace$target, workspace$subpipeline)
target_ensure_deps_worker(workspace$target, workspace$subpipeline)
}

workspace_assign <- function(workspace, envir) {
Expand Down

0 comments on commit 425a7f4

Please sign in to comment.