Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lz4 and zstd compression via fst #111

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open

Conversation

wlandau
Copy link
Contributor

@wlandau wlandau commented Jun 18, 2019

No description provided.

@wlandau
Copy link
Contributor Author

wlandau commented Jun 18, 2019

Suggested by @MarcusKlik. This PR implements optional fst compression for RDS storrs. When we save serialized data, we

  1. Use fst::compress_fst() to compress the data, and
  2. Save to a file using saveRDS(compress = FALSE).

When we read the data back in, we call fst::decompress_fst() before returning the value.

We can select fst compression for new storrs with storr_rds(compress = "fst"). The compress argument now understands values "fst", "gzfile", and "none", as well as the original logical arguments. The default for new storrs is "gzfile". Should it be?. Personally, I would rather it be "fst", but then we would have to move the fst package from Suggests to Imports.

I also updated the tests to check back-compatibility and compress = "fst".

@codecov-io
Copy link

codecov-io commented Jun 18, 2019

Codecov Report

Merging #111 into master will increase coverage by <.01%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #111      +/-   ##
==========================================
+ Coverage   99.91%   99.91%   +<.01%     
==========================================
  Files          16       16              
  Lines        1203     1224      +21     
==========================================
+ Hits         1202     1223      +21     
  Misses          1        1
Impacted Files Coverage Δ
R/utils.R 100% <ø> (ø) ⬆️
R/hash.R 100% <100%> (ø) ⬆️
R/driver_rds.R 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0c64f1e...deea50d. Read the comment docs.

@wlandau
Copy link
Contributor Author

wlandau commented Jun 18, 2019

Speeds are comparable for small data. Large data benchmarks are forthcoming.

library(storr)
library(microbenchmark)
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")
 microbenchmark(
    none = st_none$set(key = "x", value = runif(1)),
    gzfile = st_gzfile$set(key = "x", value = runif(1)),
    fst = st_fst$set(key = "x", value = runif(1))
 )
#> Unit: milliseconds
#>    expr      min       lq     mean   median       uq       max neval
#>    none 5.863562 6.246155 6.503483 6.442023 6.751117  7.786466   100
#>  gzfile 4.528847 4.978007 5.209034 5.138960 5.376522  6.907493   100
#>     fst 5.827846 6.205787 6.563974 6.392180 6.712995 13.425614   100

Created on 2019-06-18 by the reprex package (v0.3.0)

@wlandau
Copy link
Contributor Author

wlandau commented Jun 18, 2019

Benchmarks on 800 MB data, where storr uses writeBin():

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
data <- runif(1e8)
object_size(data)
#> 800 MB
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")
 microbenchmark(
    none = st_none$set(key = "x", value = data),
    gzfile = st_gzfile$set(key = "x", value = data),
    fst = st_fst$set(key = "x", value = data),
    times = 1
 )
#> Unit: seconds
#>    expr       min        lq      mean    median        uq       max neval
#>    none  6.375112  6.375112  6.375112  6.375112  6.375112  6.375112     1
#>  gzfile 89.511704 89.511704 89.511704 89.511704 89.511704 89.511704     1
#>     fst  6.636744  6.636744  6.636744  6.636744  6.636744  6.636744     1

and for large enough data to repack:

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
data <- runif(3e8)
object_size(data)
#> 2.4 GB
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")
 microbenchmark(
    none = st_none$set(key = "x", value = data, use_cache = FALSE),
    gzfile = st_gzfile$set(key = "x", value = data, use_cache = FALSE),
    fst = st_fst$set(key = "x", value = data, use_cache = FALSE),
    times = 1
 )
#> Repacking large object
#> Repacking large object
#> Unit: seconds
#>    expr       min        lq      mean    median        uq       max neval
#>    none  27.21743  27.21743  27.21743  27.21743  27.21743  27.21743     1
#>  gzfile 276.25813 276.25813 276.25813 276.25813 276.25813 276.25813     1
#>     fst  36.22005  36.22005  36.22005  36.22005  36.22005  36.22005     1

Created on 2019-06-18 by the reprex package (v0.3.0)

@wlandau
Copy link
Contributor Author

wlandau commented Jun 18, 2019

A minor snag: we need an extra deserialization step if we compress with fst. Not terrible, but worth noting.

library(storr)
library(microbenchmark)

st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")

st_none$set(key = "x", value = 1, use_cache = FALSE)
st_gzfile$set(key = "x", value = 1, use_cache = FALSE)
st_fst$set(key = "x", value = 1, use_cache = FALSE)

microbenchmark(
    none = st_none$get(key = "x", use_cache = FALSE),
    gzfile = st_gzfile$get(key = "x", use_cache = FALSE),
    fst = st_fst$get(key = "x", use_cache = FALSE)
)
#> Unit: microseconds
#>    expr     min       lq     mean   median       uq     max neval
#>    none 205.729 207.9270 219.5831 211.3970 224.3855 415.171   100
#>  gzfile 207.871 210.9915 225.9019 217.7565 233.5205 274.534   100
#>     fst 227.325 229.5640 240.8948 232.3175 244.8615 374.784   100

Created on 2019-06-18 by the reprex package (v0.3.0)

@wlandau
Copy link
Contributor Author

wlandau commented Jun 18, 2019

For 800 GB, reading the data is slower than with no compression, but it is still better than the default compression.

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
    
data <- runif(1e8)
object_size(data)
#> 800 MB
    
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")

st_none$set(key = "x", value = data, use_cache = FALSE)
st_gzfile$set(key = "x", value = data, use_cache = FALSE)
st_fst$set(key = "x", value = data, use_cache = FALSE)

microbenchmark(
    none = st_none$get(key = "x", use_cache = FALSE),
    gzfile = st_gzfile$get(key = "x", use_cache = FALSE),
    fst = st_fst$get(key = "x", use_cache = FALSE),
    times = 1
)
#> Unit: seconds
#>    expr      min       lq     mean   median       uq      max neval
#>    none 2.425930 2.425930 2.425930 2.425930 2.425930 2.425930     1
#>  gzfile 4.908687 4.908687 4.908687 4.908687 4.908687 4.908687     1
#>     fst 3.112166 3.112166 3.112166 3.112166 3.112166 3.112166     1

Created on 2019-06-18 by the reprex package (v0.3.0)

For larger data:

library(storr)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
    
data <- runif(3e8)
object_size(data)
#> 2.4 GB
    
st_none <- storr_rds(tempfile(), compress = "none")
st_gzfile <- storr_rds(tempfile(), compress = "gzfile")
st_fst <- storr_rds(tempfile(), compress = "fst")

st_none$set(key = "x", value = data, use_cache = FALSE)
#> Repacking large object
st_gzfile$set(key = "x", value = data, use_cache = FALSE)
#> Repacking large object
st_fst$set(key = "x", value = data, use_cache = FALSE)

microbenchmark(
    none = st_none$get(key = "x", use_cache = FALSE),
    gzfile = st_gzfile$get(key = "x", use_cache = FALSE),
    fst = st_fst$get(key = "x", use_cache = FALSE),
    times = 1
)
#> Unit: seconds
#>    expr       min        lq      mean    median        uq       max neval
#>    none  7.194649  7.194649  7.194649  7.194649  7.194649  7.194649     1
#>  gzfile 14.287884 14.287884 14.287884 14.287884 14.287884 14.287884     1
#>     fst  8.197173  8.197173  8.197173  8.197173  8.197173  8.197173     1

@wlandau
Copy link
Contributor Author

wlandau commented Jun 19, 2019

We might still want to keep #109, either instead of or in addition to #111: #109 (comment).

@MarcusKlik
Copy link

Hi @wlandau, I see you have added an option compress = "fst". Method fst::compress_fst() allows for selection of the LZ4 compressor (build for speed) or the ZSTD compressor (slower but with better compression) and a compression setting between 0 and 100.

Would it be useful to distinguish between the two (very different) compressors?

@wlandau
Copy link
Contributor Author

wlandau commented Jun 20, 2019

Yes, I agree. In 486b1ea, I exposed both the algorithm and the compression factor to storr_rds(). Both seem to fall within the scope of storr.

@wlandau
Copy link
Contributor Author

wlandau commented Jun 20, 2019

Assuming a compression factor of 0, I expect drake users to prefer LZ4. Finding the right compression factor is an intense (but interesting) optimization problem that I plan to come back to later. (I want to use an optimization method that a colleague and I are working on right now, and we need to wrap it up first.)

library(gt)
library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
library(storr)
library(tidyverse)

none <- storr_rds(tempfile(), "none")
gzip <- storr_rds(tempfile(), "gzip")
lz4 <- storr_rds(tempfile(), "lz4")
zstd <- storr_rds(tempfile("zstd"))

results <- NULL
for (scale in seq_len(8)) {
    data <- runif(10 ^ scale)
    set <- microbenchmark(
        none = none$set(key = "x", value = data, use_cache = FALSE),
        gzip = gzip$set(key = "x", value = data, use_cache = FALSE),
        lz4 = lz4$set(key = "x", value = data, use_cache = FALSE),
        zstd = zstd$set(key = "x", value = data, use_cache = FALSE),
        times = floor(100 ^ (1 / scale))
    ) %>%
        mutate(op = "set")
  get <- microbenchmark(
    none = none$get(key = "x", use_cache = FALSE),
    gzip = gzip$get(key = "x", use_cache = FALSE),
    lz4 = lz4$get(key = "x", use_cache = FALSE),
    zstd = zstd$get(key = "x", use_cache = FALSE),
    times = floor(100 ^ (1 / scale))
  ) %>%
    mutate(op = "get")
    new_results <- bind_rows(get, set) %>%
        mutate(size = as.numeric(object_size(data)))
    results <- bind_rows(new_results, results)
}
results <- as_tibble(results) %>%
    mutate(
        algo = as.character(expr),
        time = time / 1e9
    ) %>%
    select(-expr) %>%
    group_by(algo, op, size) %>%
    summarize(time = mean(time), reps = n())

ggplot(results) +
    geom_line(aes(x = size, y = time, group = algo, color = algo)) +
    facet_grid(op ~ ., scales = "free_y") +
    xlab("Bytes") +
    ylab("Mean seconds") +
    labs(color = "storr operation") +
    theme_gray(16)

ggplot(results) +
    geom_line(aes(x = size, y = time, group = algo, color = algo)) +
    facet_grid(op ~ ., scales = "free_y") +
    xlab("Log bytes") +
    ylab("Log mean seconds") +
    scale_x_log10() +
    scale_y_log10() +
    labs(color = "storr operation") +
    theme_gray(16)

results %>%
   spread("algo", "time") %>%
    arrange(op, size) %>%
    select(reps, size, none, gzip, lz4, zstd) %>%
    gt() %>%
      fmt_number(columns = vars(none, gzip, lz4, zstd), decimals = 3) %>%
      fmt_scientific(column = vars(size))
#> Adding missing grouping variables: `op`

reps

size

none

gzip

lz4

zstd

get

100

1.76 × 102

0.000

0.000

0.000

0.000

10

8.48 × 102

0.000

0.000

0.000

0.000

4

8.05 × 103

0.000

0.000

0.000

0.000

3

8.00 × 104

0.001

0.001

0.001

0.001

2

8.00 × 105

0.004

0.008

0.005

0.006

2

8.00 × 106

0.020

0.051

0.031

0.046

1

8.00 × 107

0.180

0.443

0.309

0.441

1

8.00 × 108

2.061

4.546

2.848

4.605

set

100

1.76 × 102

0.004

0.004

0.004

0.004

10

8.48 × 102

0.005

0.005

0.005

0.004

4

8.05 × 103

0.006

0.006

0.007

0.005

3

8.00 × 104

0.006

0.008

0.006

0.009

2

8.00 × 105

0.013

0.055

0.014

0.054

2

8.00 × 106

0.075

0.456

0.061

0.456

1

8.00 × 107

0.722

8.892

0.793

8.798

1

8.00 × 108

6.322

87.454

6.509

86.731

Created on 2019-06-20 by the reprex package (v0.3.0)

@wlandau wlandau changed the title fst compression lz4 and zstd compression via fst Jul 4, 2019
@wlandau
Copy link
Contributor Author

wlandau commented Jul 6, 2019

One problem with this approach is that we have 3 copies of the data in memory at the same time.

  1. The original data.
  2. The serialized raw vector.
  3. The compressed serialized raw vector.

@MarcusKlik, does there exist a way to compress and write to a connection simultaneously, as with saveRDS(compress = TRUE) or gzfile()? If not, would you be open to the idea in compress_fst() itself?

@MarcusKlik
Copy link

Hi @wlandau, in general, if you want to compress and store in a single step, you will need some existing function that uses R's C API to accomplish that or write one yourself. Any step that calls two distinct methods (from R) for compressing and storing will need two buffers...

In write_fst(), the input data is split into blocks that are processed one at a time. Therefore, only a limited amount of buffer memory is required to compress and store that block and the user won't need much additional memory to store existing vectors (it looks like saveRDS is doing something similar).

Method compress_fst is also using blocks to compress, but those are much larger than the blocks in write_fst(). Perhaps an extra argument (connection) could be provided to that method to output the results of those (larger) buffers to a connection instead of the main memory. So much like R's serialize() method?

@wlandau
Copy link
Contributor Author

wlandau commented Jul 9, 2019

Perhaps an extra argument (connection) could be provided to that method to output the results of those (larger) buffers to a connection instead of the main memory. So much like R's serialize() method?

This is exactly what I had in mind. How much of a difference in memory usage do you think it will make? How much does the compression level affect block size? It looks like blocks cam be as large as 0.8 * INT_MAX, which on my machine is about 1.7 GB. Do blocks get that large in practice for large data?

@MarcusKlik
Copy link

MarcusKlik commented Jul 9, 2019

Hi @wlandau, yes indeed, the blocksize is set using this code and will be in between 16 kB and 0.8 * INT_MAX in size. The latter only occurs if the original raw vector length is larger than 48 times the block size (about 82 GB).

The thing is that because compress_fst() uses multiple threads, the output buffer needs to be allocated for each thread. So, in practice, an amount of memory is allocated that can be several times the blocksize.

To avoid large allocations, perhaps yet another argument will be useful to allow the user to set the maximum block size to use, e.g. max_buffer = 1e8. Or maybe using a factor of the original vector length: max_buffer_ratio = 0.5.

With the two arguments combined (connection and max_buffer_ratio) you would be able to set the algorithm to use very little overhead in terms of RAM usage, what do you think?

@wlandau
Copy link
Contributor Author

wlandau commented Jul 10, 2019

The thing is that because compress_fst() uses multiple threads, the output buffer needs to be allocated for each thread. So, in practice, an amount of memory is allocated that can be several times the blocksize.

I think I should document this tradeoff. Commits forthcoming.

With the two arguments combined (connection and max_buffer_ratio) you would be able to set the algorithm to use very little overhead in terms of RAM usage, what do you think?

I love that idea. I think it would require some work to understand the tradeoff between max_buffer_ratio and speed, but I think it could still help a lot. I am curious to know what @richfitz thinks (though right now he is in Toulouse for useR 2019.

@MarcusKlik
Copy link

MarcusKlik commented Jul 10, 2019

ah, lucky guy :-).

In the zst format specification, it is stated that the internally used buffer can be up to 128 kB (but not larger). Having blocks that are orders of magnitude larger probably doesn't help compression too much (the size of the block is much larger than the sliding window anyway). I suspect that any block size larger than a few tens of MB won't help compression too much. Switching to a new block will lose the compressors history about previous elements however, and that switch will incur a cost in speed.

So, it might be a good idea to significantly reduce the block sizes used in compress_fst(). It will lower the RAM usage and if the blocks are still large enough, we will have the maximum possible compression anyway. Also, the threads will be balanced better when the compression is more difficult for some parts of the data.

Thanks for discussing your use case!

@wlandau
Copy link
Contributor Author

wlandau commented Jul 12, 2019

Excellent! Thank you for being open to these improvements.

@wlandau
Copy link
Contributor Author

wlandau commented Jul 19, 2019

When I discussed with @richfitz earlier today, he suggested that:

  1. We wait for Optional connection argument to compress_fst fstpackage/fst#204 and Optionally reduce block and buffer sizes fstpackage/fst#205, and
  2. We decouple the compression options (algorithm, compression level, buffer size, etc.) from the driver API.

@richfitz
Copy link
Owner

Thanks @wlandau - I'll take a shot at an abstraction layer for this once I see what the connection interface to the fst compression looks like. I think once that's done we can come up with something here that gets the performance improvements you're wanting/seeing without creating an interface that's too unweildly

@wlandau
Copy link
Contributor Author

wlandau commented Jul 20, 2019

Awesome! Thank you for the support.

@wlandau
Copy link
Contributor Author

wlandau commented Aug 4, 2019

Just realized I had a typo in the code for the zstd benchmarks from #111 (comment). Updated results, now including the compression level:

library(microbenchmark)
library(pryr)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
library(storr)
library(tidyverse)

none_0 <- storr_rds(tempfile())
lz4_0 <- storr_rds(tempfile(), "lz4", 0)
lz4_25 <- storr_rds(tempfile(), "lz4", 25)
lz4_50 <- storr_rds(tempfile(), "lz4", 50)
lz4_75 <- storr_rds(tempfile(), "lz4", 75)
lz4_100 <- storr_rds(tempfile(), "lz4", 100)
zstd_0 <- storr_rds(tempfile(), "zstd", 0)
zstd_25 <- storr_rds(tempfile(), "zstd", 25)
zstd_50 <- storr_rds(tempfile(), "zstd", 50)
zstd_75 <- storr_rds(tempfile(), "zstd", 75)
zstd_100 <- storr_rds(tempfile(), "zstd", 100)

results <- NULL
for (scale in seq_len(8)) {
  data <- runif(10 ^ scale)
  set <- microbenchmark(
    none_0 = none_0$set(key = "x", value = data, use_cache = FALSE),
    lz4_0 = lz4_0$set(key = "x", value = data, use_cache = FALSE),
    lz4_25 = lz4_25$set(key = "x", value = data, use_cache = FALSE),
    lz4_50 = lz4_50$set(key = "x", value = data, use_cache = FALSE),
    lz4_75 = lz4_75$set(key = "x", value = data, use_cache = FALSE),
    lz4_100 = lz4_100$set(key = "x", value = data, use_cache = FALSE),
    zstd_0 = zstd_0$set(key = "x", value = data, use_cache = FALSE),
    zstd_25 = zstd_25$set(key = "x", value = data, use_cache = FALSE),
    zstd_50 = zstd_50$set(key = "x", value = data, use_cache = FALSE),
    zstd_75 = zstd_75$set(key = "x", value = data, use_cache = FALSE),
    zstd_100 = zstd_100$set(key = "x", value = data, use_cache = FALSE),
    times = floor(100 ^ (1 / scale))
  ) %>%
    mutate(op = "set")
  get <- microbenchmark(
    none_0 = none_0$get(key = "x", use_cache = FALSE),
    lz4_0 = lz4_0$get(key = "x", use_cache = FALSE),
    lz4_25 = lz4_25$get(key = "x", use_cache = FALSE),
    lz4_50 = lz4_50$get(key = "x", use_cache = FALSE),
    lz4_75 = lz4_75$get(key = "x", use_cache = FALSE),
    lz4_100 = lz4_100$get(key = "x", use_cache = FALSE),
    zstd_0 = zstd_0$get(key = "x", use_cache = FALSE),
    zstd_25 = zstd_25$get(key = "x", use_cache = FALSE),
    zstd_50 = zstd_50$get(key = "x", use_cache = FALSE),
    zstd_75 = zstd_75$get(key = "x", use_cache = FALSE),
    zstd_100 = zstd_100$get(key = "x", use_cache = FALSE),
    times = floor(100 ^ (1 / scale))
  ) %>%
    mutate(op = "get")
  new_results <- bind_rows(get, set) %>%
    mutate(size = as.numeric(object_size(data)))
  results <- bind_rows(new_results, results)
}
results <- as_tibble(results) %>%
  mutate(
    algo = ordered(
      gsub("_.*$", "", as.character(expr)),
      levels = c("lz4", "zstd", "none")
    ),
    compression = ordered(
      as.integer(gsub("^.*_", "", as.character(expr)))
    ),
    time = time / 1e9
  ) %>%
  select(-expr) %>%
  group_by(algo, compression, op, size) %>%
  summarize(time = mean(time), reps = n())

ggplot(results) +
  geom_line(
    aes(
      x = size,
      y = time,
      group = compression,
      linetype = compression,
      color = compression
    )
  ) +
  facet_grid(op ~ algo, scales = "free_y", labeller = label_both) +
  xlab("Bytes") +
  ylab("Mean seconds") +
  theme_gray(16) +
  scale_color_brewer(palette = "Set1")

Created on 2019-08-03 by the reprex package (v0.3.0)

@wlandau
Copy link
Contributor Author

wlandau commented Aug 4, 2019

From ropensci/drake#907 (comment), this PR does not quite achieve the efficiency of write_fst(). I still think #111 has value for the general case, but drake would also benefit from a decorated storr: ropensci/drake#971 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants