Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new transform function split to chunk a data.frame #833

Closed
2 tasks done
kendonB opened this issue Apr 11, 2019 · 17 comments
Closed
2 tasks done

new transform function split to chunk a data.frame #833

kendonB opened this issue Apr 11, 2019 · 17 comments

Comments

@kendonB
Copy link
Contributor

kendonB commented Apr 11, 2019

Prework

Description

I have found myself often chunking up a data.frame to run crunching code on the chunks like this:

library(drake)
drake_plan(
  chunks = parallel::splitIndices(nrow(iris), ncl = 3),
  iris_chunked = target(iris[chunks[[param]],],
                        transform = map(param = !!(1:3), .id = FALSE))
)
#> # A tibble: 4 x 2
#>   target         command                                    
#>   <chr>          <expr>                                     
#> 1 chunks         parallel::splitIndices(nrow(iris), ncl = 3)
#> 2 iris_chunked   iris[chunks[[1L]], ]                       
#> 3 iris_chunked_2 iris[chunks[[2L]], ]                       
#> 4 iris_chunked_3 iris[chunks[[3L]], ]

Created on 2019-04-12 by the reprex package (v0.2.1)

A possible interface could be:

drake_plan(
  iris_chunked = target(
    iris,
    transform = split(iris, n = 3, .id = FALSE)
  )
)

Precalculating the chunks variable is necessary as it can be costly for large nx and ncl. Using [ and not slice is also necessary as the current slice is very slow. You could also potentially add groups to the split call.

@wlandau
Copy link
Member

wlandau commented Apr 12, 2019

Such a good idea! However, at this stage of development, I am resistant to building it into drake itself. When @AlexAxthelm proposed #77 nearly 2 years ago, it was difficult to get the wildcard interface to split data frames. Now that we have the DSL, your workaround with splitIndices() and map() makes it seamless by comparison. It is only slightly longer to write than your proposed split() transform, so it is not clear that we gain all that much. And because this is a very specific kind of processing on data frames and 2D arrays only, we pay a high complexity penalty. I am more cautious about these penalties now. drake is larger and more complicated than I ever intended it to be.

That said, this is a fantastic technique. Perhaps it belongs in the manual, re #80. Maybe in a new example chapter?

@wlandau
Copy link
Member

wlandau commented May 20, 2019

Reopening: if we handle this in a more general way, we may be able get most of what #685 is trying to solve.

EDIT, 2019-05-20: a digression on indices

@kendonB, I like your original suggestion of focusing on the indices. Transforming the indices instead of the data might help us avoid loading the entire dataset into memory. That alone is worth a special case! I have thought about something like this:

drake_plan(
  x = target(
    read_csv(file_in("large.csv"), skip = i, n_max = 1e2),
    transform = split(i, .size = 1e6, .splits = 1e4)
  )
)

But it requires prior knowledge that large.csv has a million rows. So I think the slicing needs to happen at runtime so we can adjust the slice sizes accordingly.

@wlandau wlandau reopened this May 20, 2019
@wlandau
Copy link
Member

wlandau commented May 20, 2019

@kendonB, your sketch from #833 (comment) gets us almost there. With the demand for the use case, I think we could add a special split() transformation. A couple more things we would need to add.

  1. The number of dimensions of the thing we're splitting (default: 1).
  2. The dimension we are splitting over (default: 1).

Sketch:

drake_plan(
  iris_chunked = target(
    iris,
    transform = split(iris, .splits = 3, .dim = 2, .margin = 1)
  )
)
#> # A tibble: 4 x 2
#>   target         command                                    
#>   <chr>          <expr>                                     
#> 1 iris_chunked_1 drake_split(iris, .splits = 3, .dim = 2, .margin = 1, .index = 1)
#> 2 iris_chunked_2 drake_split(iris, .splits = 3, .dim = 2, .margin = 1, .index = 1)
#> 3 iris_chunked_3 drake_split(iris, .splits = 3, .dim = 2, .margin = 1, .index = 1)

where drake_split(iris, .splits = 3, .dim = 2, .margin = 1, .index = 2) evaluates to iris[51:100, ] at runtime. .margin = 2 would have split over the columns instead of the rows (drake_split(iris, .splits = 3, .dim = 2, .margin = 2, .index = 2) evaluates to iris[, 3:4]).

@wlandau
Copy link
Member

wlandau commented May 20, 2019

You know what? It would save a lot of hard work in the implementation and documentation to just leverage map(), and it is not that much more burdensome on the user. Simplicity in the implementation is important at this stage of drake's development.

drake_plan(
  iris_chunked = target(
    drake_split(iris, splits = 3, margin = 1, index = i),
    transform = map(i = c(1, 2, 3))
  )
)
#> # A tibble: 4 x 2
#>   target         command                                    
#>   <chr>          <expr>                                     
#> 1 iris_chunked_1 drake_split(iris, splits = 3, margin = 1, index = 1)
#> 2 iris_chunked_2 drake_split(iris, splits = 3, margin = 1, index = 2)
#> 3 iris_chunked_3 drake_split(iris, splits = 3, margin = 1, index = 3)

@wlandau
Copy link
Member

wlandau commented May 20, 2019

But we could probably use a better name than drake_split()... 🤔

@wlandau
Copy link
Member

wlandau commented May 20, 2019

In fact, I like using existing transforms a lot more because it ends up being more flexible. One example that split() alone would not be able to accomplish:

drake_plan(
  iris_chunked = target(
    fn(drake_split(iris, splits = 3, margin = 1, index = i)),
    transform = cross(i = c(1, 2, 3), fn = c(analysis1, analysis2), .id = FALSE)
  )
)
#> # A tibble: 4 x 2
#>   target         command                                    
#>   <chr>          <expr>                                     
#> 1 iris_chunked   analysis1(drake_split(iris, splits = 3, margin = 1, index = 1))
#> 2 iris_chunked_2 analysis1(drake_split(iris, splits = 3, margin = 1, index = 2))
#> 3 iris_chunked_3 analysis1(drake_split(iris, splits = 3, margin = 1, index = 3))
#> 4 iris_chunked_4 analysis2(drake_split(iris, splits = 3, margin = 1, index = 1))
#> 5 iris_chunked_5 analysis2(drake_split(iris, splits = 3, margin = 1, index = 2))
#> 6 iris_chunked_6 analysis2(drake_split(iris, splits = 3, margin = 1, index = 3))

@wlandau
Copy link
Member

wlandau commented May 20, 2019

A couple more thoughts:

  • drake_slice() is a slightly better name. Unfortunately, plain slice() is likely to conflict with other packages.
  • Do we really need to precompute the indices of all the targets? A naive use of base R seems to be faster than splitIndices(). (@kendonB, what am I missing here?) Plus, if we distribute the burden over multiple targets, we have parallel computing on our side.
library(parallel)
base_indices <- function(n, splits) {
  out <- list()
  delta <- floor(n / splits)
  for (i in seq_len(splits)) {
    out[[i]] <- seq.int(from = 1 + delta * (i - 1), to = delta * i, by = 1)
  }
  out
}
microbenchmark::microbenchmark(
  x = splitIndices(1e7, 1e4),
  y = base_indices(1e7, 1e4)
)
#> Unit: milliseconds
#> expr        min         lq       mean     median         uq       max neval
#>    x 1704.36510 1828.32656 1892.58508 1869.34639 1940.80053 2159.7202   100
#>    y   20.16306   42.41099   53.84792   45.20207   48.65589  177.5154   100

@wlandau wlandau mentioned this issue May 20, 2019
4 tasks
@kendonB
Copy link
Contributor Author

kendonB commented May 20, 2019

Does your solution handle the last chunk being smaller than the rest? On my phone so can't check rn

@wlandau
Copy link
Member

wlandau commented May 20, 2019

Yes, drake distributes the remainder over the first few chunks.

drake/R/api-slice.R

Lines 69 to 73 in 6e33d92

inc <- as.integer(length / splits)
mod <- length %% splits
n <- inc + as.integer(index <= mod)
from <- 1L + inc * (index - 1L) + min(index - 1L, mod)
seq(from = from, length.out = n)

We have a unit test to confirm that the chunk sizes differ by no more than one element.

lengths <- vapply(s, length, FUN.VALUE = integer(1))
diff <- max(lengths) - min(lengths)
expect_true(diff <= 1L)

@kendonB
Copy link
Contributor Author

kendonB commented May 20, 2019

Neat. I think this is worth an email to r-devel as 1) there may be some pitfall we're missing and 2) if not, your approach can significantly improve splitIndices

@wlandau
Copy link
Member

wlandau commented May 21, 2019

Reopening to investigate #876 (comment).

@wlandau
Copy link
Member

wlandau commented May 22, 2019

From #876 (comment), vroom will not save us from loading a lot of data into memory. Despite this, I am reconsidering #876 (comment). Reasons:

  1. Splitting is a super important use case, so much so that tools like Spark and MapReduce use it as a simplifying assumption.
  2. Splitting and combining are inverses, so adding a split() transformation would carry the DSL to its logical conclusion.
  3. Implementation is much simpler than the other transforms because split() is really just a special case of map(). All we need to do is convert each split() to the correct map() and we are done.
  4. We can probably use it on file_in() files after all. If we use split() to split up an index set instead of the data itself, we can pass the min and max to read_csv() etc. and have each target read only the requested rows.

@wlandau
Copy link
Member

wlandau commented May 22, 2019

To elaborate: the DSL should turn this:

drake_plan(
  iris_chunked = target(
    iris %>%
      analyze(),
    transform = split(iris, slices = 3, margin = 1, drop = FALSE)
  )
)

into this:

drake_plan(
  iris_chunked = target(
    drake_slice(iris, slices = 3, margin = 1, drop = FALSE, index = iris_chunked_index) %>%
      analyze(),
    transform = map(iris, iris_chunked_index = c(1, 2, 3))
  )
)

To avoid opening Pandora's Box, let's assume one dataset per split() call.

@wlandau wlandau mentioned this issue May 22, 2019
4 tasks
@wlandau
Copy link
Member

wlandau commented May 23, 2019

A useful note: split() can help avoid loading all of a huge data file into memory.

library(drake)
plan <- drake_plan(
  all_rows = file_in("huge.csv") %>%
    number_of_rows() %>%
    seq_len(),
  rows = target(
    all_rows,
    transform = split(all_rows, slices = 3)
  ),
  analysis = target(
    read_rows( # custom function
      file = file_in("huge.csv"),
      rows = rows
    ) %>%
      analyze_data(),
    transform = map(rows, .id = rows_index) # an internal trick
  )
)

drake_plan_source(plan)
#> drake_plan(
#>   all_rows = file_in("huge.csv") %>%
#>     number_of_rows() %>%
#>     seq_len(),
#>   rows_1 = drake_slice(data = all_rows, slices = 3, index = 1),
#>   rows_2 = drake_slice(data = all_rows, slices = 3, index = 2),
#>   rows_3 = drake_slice(data = all_rows, slices = 3, index = 3),
#>   analysis_1 = read_rows(file = file_in("huge.csv"), rows = rows_1) %>% analyze_data(),
#>   analysis_2 = read_rows(file = file_in("huge.csv"), rows = rows_2) %>% analyze_data(),
#>   analysis_3 = read_rows(file = file_in("huge.csv"), rows = rows_3) %>% analyze_data()
#> )

Created on 2019-05-22 by the reprex package (v0.3.0)

@wlandau
Copy link
Member

wlandau commented Mar 3, 2020

split() and drake_slice() are awkward because they are trying to do something dynamic when the interface is static. So should we deprecate them in favor of dynamic map() and group()?

@kendonB
Copy link
Contributor Author

kendonB commented Jun 22, 2020

@wlandau I was looking for a way to chunk up a data frame into 10 roughly even chunks and found that there was no way to do it with dynamic branching. The only way I can see is to use group then define a grouping variable yourself. One nice interface might just be to use group(data, slices = 10)

@wlandau
Copy link
Member

wlandau commented Jun 22, 2020

I agree, it would be a bit more convenient for some users. But in the case of dynamic branching in drake, I believe the technical debt would be costly enough to far outweigh the benefits, especially since the whole group() transformation seems to come up far less often than I originally envisioned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants