Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recombining dynamic targets #1076

Closed
3 tasks done
brendanf opened this issue Nov 19, 2019 · 6 comments
Closed
3 tasks done

recombining dynamic targets #1076

brendanf opened this issue Nov 19, 2019 · 6 comments

Comments

@brendanf
Copy link
Contributor

brendanf commented Nov 19, 2019

Prework

  • Read and abide by drake's code of conduct.
  • Search for duplicates among the existing issues, both open and closed.
  • If you think your issue has a quick and definite solution, consider posting to Stack Overflow under the drake-r-package tag. (If you anticipate extended follow-up and discussion, you are already in the right place!)

Description

I'm working on converting a plan to dynamic targets, but I'm having trouble figuring out how to gather up a set of dynamic targets which represent tabular data, and divide them differently.

Reproducible example

library(drake)
clean(w, x, y, y_a, z)
x <- 1:3
create_tabular_data <- function(x, n) {
  tibble::tibble(
    type = sample(letters[1:3], n, replace = TRUE),
    size = runif(n)
  )
}

plan <- drake_plan(
  dataset = 1:3,
  observations = target(
    create_tabular_data(dataset, 5),
    dynamic = map(dataset)
  ),
  all_data = target(
    dplyr::bind_rows(observations),
    dynamic = group(observations)
  ),
  types = target(
    all_data$type,
    dynamic = map(all_data)
  ),
  means = target(
    mean(all_data$size),
    dynamic = group(all_data, .by = types, .trace = types)
  )
)

make(plan)
#> target dataset
#> dynamic observations
#> subtarget observations_0b3474bd
#> subtarget observations_b2a5c9b8
#> subtarget observations_71f311ad
#> aggregate observations
#> dynamic all_data
#> subtarget all_data_1e10cf35
#> aggregate all_data
#> dynamic types
#> subtarget types_dabc0280
#> aggregate types
#> dynamic means
#> subtarget means_8b226f20
#> Warning: target means_8b226f20 warnings:
#>   argument is not numeric or logical: returning NA
#> aggregate means
readd(observations)
#> [[1]]
#> # A tibble: 5 x 2
#>   type   size
#>   <chr> <dbl>
#> 1 c     0.883
#> 2 c     0.275
#> 3 b     0.336
#> 4 b     0.251
#> 5 b     0.426
#> 
#> [[2]]
#> # A tibble: 5 x 2
#>   type    size
#>   <chr>  <dbl>
#> 1 c     0.480 
#> 2 c     0.421 
#> 3 c     0.977 
#> 4 b     0.747 
#> 5 a     0.0960
#> 
#> [[3]]
#> # A tibble: 5 x 2
#>   type    size
#>   <chr>  <dbl>
#> 1 c     0.0393
#> 2 b     0.548 
#> 3 c     0.263 
#> 4 b     0.862 
#> 5 b     0.405
readd(all_data)
#> [[1]]
#> # A tibble: 15 x 2
#>    type    size
#>    <chr>  <dbl>
#>  1 c     0.883 
#>  2 c     0.275 
#>  3 b     0.336 
#>  4 b     0.251 
#>  5 b     0.426 
#>  6 c     0.480 
#>  7 c     0.421 
#>  8 c     0.977 
#>  9 b     0.747 
#> 10 a     0.0960
#> 11 c     0.0393
#> 12 b     0.548 
#> 13 c     0.263 
#> 14 b     0.862 
#> 15 b     0.405
readd(types)
#> [[1]]
#>  [1] "c" "c" "b" "b" "b" "c" "c" "c" "b" "a" "c" "b" "c" "b" "b"
readd(means)
#> [[1]]
#> [1] NA
read_trace("types", "means")
#> [1] "49186fa708605656"

Created on 2019-11-19 by the reprex package (v0.3.0)
(edited to fix typos)

Desired result

I'm looking for means to be a list of three numeric values, representing the mean size of each type across all datasets, and for read_trace("types", means) to give me which type corresponds to each mean.

Session info

32b0695

@brendanf
Copy link
Contributor Author

Here is an example that works, by using readd in the plan to combine the dynamic targets into a static target. Is this the recommended way to go about it?

library(drake)
clean(w, x, y, y_a, z)
x <- 1:3
create_tabular_data <- function(x, n) {
  tibble::tibble(
    type = sample(letters[1:3], n, replace = TRUE),
    size = runif(n)
  )
}

plan <- drake_plan(
  dataset = 1:3,
  observations = target(
    create_tabular_data(dataset, 5),
    dynamic = map(dataset)
  ),
  all_data = target(
    dplyr::bind_rows(readd(observations))
  ),
  types = target(
    all_data$type
  ),
  means = target(
    mean(all_data$size),
    dynamic = group(all_data, .by = types, .trace = types)
  ),
  result = tibble::tibble(
    type = get_trace("types", means),
    mean = unlist(readd(means))
  )
)

make(plan)
#> target dataset
#> dynamic observations
#> subtarget observations_0b3474bd
#> subtarget observations_b2a5c9b8
#> subtarget observations_71f311ad
#> aggregate observations
#> target all_data
#> target types
#> dynamic means
#> subtarget means_f5bade49
#> subtarget means_b365074d
#> subtarget means_c4734dfa
#> aggregate means
#> target result
readd(observations)
#> [[1]]
#> # A tibble: 5 x 2
#>   type   size
#>   <chr> <dbl>
#> 1 c     0.883
#> 2 c     0.275
#> 3 b     0.336
#> 4 b     0.251
#> 5 b     0.426
#> 
#> [[2]]
#> # A tibble: 5 x 2
#>   type    size
#>   <chr>  <dbl>
#> 1 c     0.480 
#> 2 c     0.421 
#> 3 c     0.977 
#> 4 b     0.747 
#> 5 a     0.0960
#> 
#> [[3]]
#> # A tibble: 5 x 2
#>   type    size
#>   <chr>  <dbl>
#> 1 c     0.0393
#> 2 b     0.548 
#> 3 c     0.263 
#> 4 b     0.862 
#> 5 b     0.405
readd(all_data)
#> # A tibble: 15 x 2
#>    type    size
#>    <chr>  <dbl>
#>  1 c     0.883 
#>  2 c     0.275 
#>  3 b     0.336 
#>  4 b     0.251 
#>  5 b     0.426 
#>  6 c     0.480 
#>  7 c     0.421 
#>  8 c     0.977 
#>  9 b     0.747 
#> 10 a     0.0960
#> 11 c     0.0393
#> 12 b     0.548 
#> 13 c     0.263 
#> 14 b     0.862 
#> 15 b     0.405
readd(types)
#>  [1] "c" "c" "b" "b" "b" "c" "c" "c" "b" "a" "c" "b" "c" "b" "b"
readd(means)
#> [[1]]
#> [1] 0.4769097
#> 
#> [[2]]
#> [1] 0.5106568
#> 
#> [[3]]
#> [1] 0.09598578
read_trace("types", "means")
#> [1] "c" "b" "a"
readd(result)
#> # A tibble: 3 x 2
#>   type    mean
#>   <chr>  <dbl>
#> 1 c     0.477 
#> 2 b     0.511 
#> 3 a     0.0960

Created on 2019-11-19 by the reprex package (v0.3.0)

@wlandau
Copy link
Member

wlandau commented Nov 19, 2019

@brendanf, I am really glad you are continuing to test drive dynamic branching before it goes out in a formal release. Much appreciated. I might not be able to get to this issue today, but I will investigate ASAP.

@brendanf
Copy link
Contributor Author

@wlandau No problem. I really appreciate the work you've done on this package, and dynamic targets are already speeding up my plan a lot.

Here's a version of this plan which recombines the dynamic target into a disk.frame using a variation of the approach I suggested in #1070. (This wouldn't be a good use of disk.frame for genuinely big data, because it immediately loads the whole thing into memory, but it works for illustration.)
Oddly, combine_dynamic_dataframe always fails on the first make, but succeeds on the second.

library(drake)
library(magrittr)
clean(dataset, observations, all_data, types, means)

combine_dynamic_diskframe <- function(dd, dir = drake_tempfile()) {
  dir.create(dir)
  cache <- drake_cache()
  for (i in seq_along(dd)) {
    shardfile <- file.path(cache$path_return, dd[i])
    file.link(shardfile, file.path(dir, paste0(i, ".fst")))
  }
  disk.frame::disk.frame(dir)
}

create_tabular_data <- function(x, n) {
  data.frame(
    type = sample(letters[1:3], n, replace = TRUE),
    size = runif(n),
    stringsAsFactors = FALSE
  )
}

plan <- drake_plan(
  dataset = 1:3,
  observations = target(
    create_tabular_data(dataset, 5),
    dynamic = map(dataset),
    format = "fst"
  ),

  all_data = target(
    combine_dynamic_diskframe(observations),
    format = "diskframe"
  ),

  result = target(
    all_data %>%
      as.data.frame() %>%
      dplyr::group_by(type) %>%
      dplyr::summarize(mean = mean(size))
  )
)

make(plan)
#> target dataset
#> dynamic observations
#> subtarget observations_0b3474bd
#> subtarget observations_b2a5c9b8
#> subtarget observations_71f311ad
#> aggregate observations
#> target all_data
#> target result
#> fail result
#> Error: Target `result` failed. Call `diagnose(result)` for details. Error message:
#>   [ENOENT] Failed to search directory '/tmp/Rtmpbh73wg/reprex3de968fdccb8/.drake/drake/tmp/file3ab83f0bbcf3': no such file or directory
make(plan)
#> target result
readd(observations)
#> [[1]]
#>   type      size
#> 1    c 0.8831873
#> 2    c 0.2749970
#> 3    b 0.3361448
#> 4    b 0.2513900
#> 5    b 0.4255147
#> 
#> [[2]]
#>   type       size
#> 1    c 0.48001512
#> 2    c 0.42080385
#> 3    c 0.97729546
#> 4    b 0.74695374
#> 5    a 0.09598578
#> 
#> [[3]]
#>   type       size
#> 1    c 0.03932213
#> 2    b 0.54789832
#> 3    c 0.26274679
#> 4    b 0.86169360
#> 5    b 0.40500204
readd(all_data)
#> path: "/tmp/Rtmpbh73wg/reprex3de968fdccb8/.drake/drake/return/0d3ca9cb7879085d"
#> nchunks: 3
#> nrow (at source): 15
#> ncol (at source): 2
#> nrow (post operations): ???
#> ncol (post operations): ???
readd(result)
#> # A tibble: 3 x 2
#>   type    mean
#>   <chr>  <dbl>
#> 1 a     0.0960
#> 2 b     0.511 
#> 3 c     0.477

Created on 2019-11-20 by the reprex package (v0.3.0)

@brendanf
Copy link
Contributor Author

Actually, it looks like it fails if run during the same make as the targets which are being combined. That's interesting. I've tried debugging through combine_dynamic_diskframe (and into disk.frame::disk.frame) and all the files seem to be present and readable on the first make; the error comes in the return step.

library(drake)
library(magrittr)
clean(dataset, observations, all_data, types, means)

combine_dynamic_diskframe <- function(dd, dir = drake_tempfile()) {
  dir.create(dir)
  cache <- drake_cache()
  for (i in seq_along(dd)) {
    shardfile <- file.path(cache$path_return, dd[i])
    file.link(shardfile, file.path(dir, paste0(i, ".fst")))
  }
  disk.frame::disk.frame(dir)
}

create_tabular_data <- function(x, n) {
  data.frame(
    type = sample(letters[1:3], n, replace = TRUE),
    size = runif(n),
    stringsAsFactors = FALSE
  )
}

plan <- drake_plan(
  dataset = 1:3,
  observations = target(
    create_tabular_data(dataset, 5),
    dynamic = map(dataset),
    format = "fst"
  ),

  all_data = target(
    combine_dynamic_diskframe(observations),
    format = "diskframe"
  ),

  result = target(
    all_data %>%
      as.data.frame() %>%
      dplyr::group_by(type) %>%
      dplyr::summarize(mean = mean(size))
  )
)

make(plan, targets = "all_data")
#> target dataset
#> dynamic observations
#> subtarget observations_0b3474bd
#> subtarget observations_b2a5c9b8
#> subtarget observations_71f311ad
#> aggregate observations
#> target all_data
make(plan)
#> target result
readd(observations)
#> [[1]]
#>   type      size
#> 1    c 0.8831873
#> 2    c 0.2749970
#> 3    b 0.3361448
#> 4    b 0.2513900
#> 5    b 0.4255147
#> 
#> [[2]]
#>   type       size
#> 1    c 0.48001512
#> 2    c 0.42080385
#> 3    c 0.97729546
#> 4    b 0.74695374
#> 5    a 0.09598578
#> 
#> [[3]]
#>   type       size
#> 1    c 0.03932213
#> 2    b 0.54789832
#> 3    c 0.26274679
#> 4    b 0.86169360
#> 5    b 0.40500204
readd(all_data)
#> path: "/tmp/Rtmpbh73wg/reprex3de93f7463d7/.drake/drake/return/0d3ca9cb7879085d"
#> nchunks: 3
#> nrow (at source): 15
#> ncol (at source): 2
#> nrow (post operations): ???
#> ncol (post operations): ???
readd(result)
#> # A tibble: 3 x 2
#>   type    mean
#>   <chr>  <dbl>
#> 1 a     0.0960
#> 2 b     0.511 
#> 3 c     0.477

Created on 2019-11-20 by the reprex package (v0.3.0)

@brendanf
Copy link
Contributor Author

Actually, I was looking at the error wrong; it comes when the disk.frame is read during the same make in which it is created (i.e., when result is built in the same call to make() as all_data).

This works if I don't declare the format as "diskframe". (But then the disk.frame is still in the cache temp directory.)

library(drake)
library(magrittr)
clean(dataset, observations, all_data, types, means)

combine_dynamic_diskframe <- function(dd, dir = drake_tempfile()) {
  dir.create(dir)
  cache <- drake_cache()
  for (i in seq_along(dd)) {
    shardfile <- file.path(cache$path_return, dd[i])
    stopifnot(file.exists(shardfile))
    file.link(shardfile, file.path(dir, paste0(i, ".fst")))
  }
  disk.frame::disk.frame(dir)
}

create_tabular_data <- function(x, n) {
  data.frame(
    type = sample(letters[1:3], n, replace = TRUE),
    size = runif(n),
    stringsAsFactors = FALSE
  )
}

plan <- drake_plan(
  dataset = 1:3,
  observations = target(
    create_tabular_data(dataset, 5),
    dynamic = map(dataset),
    format = "fst"
  ),
  
  all_data = target(
    combine_dynamic_diskframe(observations)
  ),
  
  result = target(
    all_data %>%
      as.data.frame() %>%
      dplyr::group_by(type) %>%
      dplyr::summarize(mean = mean(size))
  )
)

make(plan)
#> In drake, consider r_make() instead of make(). r_make() runs make() in a fresh R session for enhanced robustness and reproducibility.
#> target dataset
#> dynamic observations
#> subtarget observations_0b3474bd
#> subtarget observations_b2a5c9b8
#> subtarget observations_71f311ad
#> aggregate observations
#> target all_data
#> target result
readd(observations)
#> [[1]]
#>   type      size
#> 1    c 0.8831873
#> 2    c 0.2749970
#> 3    b 0.3361448
#> 4    b 0.2513900
#> 5    b 0.4255147
#> 
#> [[2]]
#>   type       size
#> 1    c 0.48001512
#> 2    c 0.42080385
#> 3    c 0.97729546
#> 4    b 0.74695374
#> 5    a 0.09598578
#> 
#> [[3]]
#>   type       size
#> 1    c 0.03932213
#> 2    b 0.54789832
#> 3    c 0.26274679
#> 4    b 0.86169360
#> 5    b 0.40500204
readd(all_data)
#> path: "/tmp/Rtmpbh73wg/reprex3de920306085/.drake/drake/tmp/file46972343fe2f"
#> nchunks: 3
#> nrow (at source): 15
#> ncol (at source): 2
#> nrow (post operations): ???
#> ncol (post operations): ???
readd(result)
#> # A tibble: 3 x 2
#>   type    mean
#>   <chr>  <dbl>
#> 1 a     0.0960
#> 2 b     0.511 
#> 3 c     0.477

Created on 2019-11-20 by the reprex package (v0.3.0)

I'll open a separate issue for this, since it seems to be more general for disk.frame targets.

@wlandau
Copy link
Member

wlandau commented Nov 20, 2019

@brendanf, I agree with your workaround in #1076 (comment). For the current API, I think it is the best option.

I think I need to elaborate on the semantics of dynamic branching. Only the sub-targets have the actual data. The dynamic parent target itself is a vector of hashes (plus traces if applicable) and it serves as an abstract representation of the sub-targets.

library(dplyr)
library(drake)

create_tabular_data <- function(n) {
  tibble::tibble(
    type = sample(letters[seq_len(3)], n, replace = TRUE),
    size = runif(n)
  )
}

plan <- drake_plan(
    reps = seq_len(3),
  observations = target(
    create_tabular_data(5),
    dynamic = map(reps)
  ),
  all_data = bind_rows(readd(observations)),
  types = all_data$type,
  means = target(
    mean(all_data$size),
    dynamic = group(all_data, .by = types, .trace = types)
  )
)

make(plan, verbose = 0L)

cache <- drake_cache()

cache$get("observations")
#> [1] "2dc83190f9df8b2b" "4d7921d6877c3de4" "77f55ff82ce21d0d"
#> attr(,"class")
#> [1] "drake_dynamic"

cache$get("means")
#> [1] "970765199d7cda31" "a0c1e09c0ce8f240" "8a61862e4b16cdc7"
#> attr(,"dynamic_trace")
#> attr(,"dynamic_trace")$types
#> [1] "c" "a" "b"
#> 
#> attr(,"class")
#> [1] "drake_dynamic"

Created on 2019-11-20 by the reprex package (v0.3.0)

The example in #1076 (comment) tries to map over a dynamic parent (all_data) to get types, which as you saw, does not produce the desired result. What you really want is to dynamically map over the first sub-target of all_data. The solution in #1076 (comment) promotes the sub-target to a regular target, which makes further dynamic branching possible.

So again, your workaround is correct, and the manual probably needs to elaborate.

I will try to address the disk.frame issue in #1077.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants