Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rechunking for Xarray datasets #52

Merged
merged 4 commits into from
Oct 6, 2020

Conversation

eric-czech
Copy link
Contributor

This is an attempt at #45.

I'm not sure what the best way to go about this is, but I thought I would get something working and then get thoughts from you guys on where to go next. Notes:

  • The signature I'm exposing here is rechunk_dataset(source: Dataset, encoding: Mapping, max_mem, target_store, temp_store, executor). I'm using encoding to indicate the target chunkings, along with any other compressor/filter options, so there is some consistency with Dataset.to_zarr. I think it would probably be best if Dataset/DataArray were other possible options in the main rechunk function with the same target_chunks parameter and any other options in {target|temp}_options. For the sake of discussion I thought it was easier to review quickly if all the new code was in one place. I'll happily combine the functions if this is on the right track.
  • I'm probably missing some corner cases, but this seemed like a reasonable lift of the necessary functionality in the zarr backend. I tried to ensure that the condition validated at xarray/backends/zarr.py#L135 is also checked here. I'm not actually sure if that matters in this case. @rabernat / @TomAugspurger do you happen to know if I should need to worry about this? I.e. if the input dataset contains dask arrays backed by a zarr store and the target chunking would create new dask chunks that overlap multiple old zarr chunks, should rechunker throw an error like xarray does?
  • I'm encoding attributes using encode_zarr_attr_value. Should all of the rechunker functions be using this or something like it too?
  • I think there are several more tests that would be useful -- the test in this PR is just a start.

@codecov
Copy link

codecov bot commented Sep 22, 2020

Codecov Report

Merging #52 into master will increase coverage by 2.75%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #52      +/-   ##
==========================================
+ Coverage   95.00%   97.75%   +2.75%     
==========================================
  Files          10       10              
  Lines         400      445      +45     
  Branches       78       88      +10     
==========================================
+ Hits          380      435      +55     
+ Misses         10        5       -5     
+ Partials       10        5       -5     
Impacted Files Coverage Δ
rechunker/api.py 100.00% <100.00%> (+7.46%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8917e20...8502a33. Read the comment docs.

@tomwhite
Copy link
Collaborator

This looks great @eric-czech, thanks for working on it.

I think it would probably be best if Dataset/DataArray were other possible options in the main rechunk function

+1

rechunker/api.py Outdated

copy_specs = []
for variable in source:
array = source[variable].copy()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Xarray's backend encoding functions are designed to work on xarray.Variable objects, so if you're going to use those I would recommend accessing source.variables[variable] instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other part that is missing here is that you are only writing the data variables, not the coordinates. Iterating over source.variables should solve that problem, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shoyer
Copy link
Collaborator

shoyer commented Sep 26, 2020

  • I'm encoding attributes using encode_zarr_attr_value. Should all of the rechunker functions be using this or something like it too?

I suspect this is only really relevant if the source is from xarray:

  • If the source is from zarr, attributes are already encoded properly
  • If the source is from dask, attributes don't exist

@eric-czech
Copy link
Contributor Author

eric-czech commented Sep 30, 2020

Ok I re-worked this one a good bit (apologies for the big changes since first review). Some notes on the latest commit at fc1b17a:

  • Added Datasets as a possible source in api.rechunk (there is no rechunk_dataset now)
  • Changed {temp|target}_options to be specific to individual variables and have a structure similar to target_chunks
  • Defaulted the temporary store location to somewhere in the system temp dir for Zarr groups and Xarray datasets
    • It didn't make much sense IMO for temp_store to be optional in the API and not have a default be set for collections of arrays. Otherwise when it's not set, an assertion error was thrown if any one array is rechunked to a different size.
  • I skipped trying to support DataArrays because of DataArrays to/from Zarr Arrays pydata/xarray#2660
  • The happy path for these changes would result from usage like this:
import zarr
import xarray as xr
import numpy as np
from rechunker.api import rechunk

shape = (100, 50)
ds = xr.Dataset(
    dict(
        a=(("x", "y"), np.ones(shape, dtype='f4')),
        b=(("x"), np.ones(shape[0])),
        c=(("y"), np.ones(shape[1]))
    ),
    coords=dict(
        cx=(("x"), np.ones(shape[0])),
        cy=(("y"), np.ones(shape[1]))
    )
).chunk(chunks=25)

rechunked = rechunk(
    ds,
    target_chunks=dict(a=(10, 10), b=(10,), c=(10,)),
    max_mem='50MB',
    target_store="/tmp/store.zarr",
    target_options=dict(
        a=dict(
            compressor=zarr.Blosc(cname="zstd"),
            dtype="int16",
            scale_factor=0.1,
            _FillValue=-9999,
        )
    )
)
print(rechunked)
<Rechunked>
* Source      : <xarray.Dataset>
Dimensions:  (x: 100, y: 50)
Coordinates:
    cx       (x) float64 dask.array<chunksize=(25,), meta=np.ndarray>
    cy       (y) float64 dask.array<chunksize=(25,), meta=np.ndarray>
Dimensions without coordinates: x, y
Data variables:
    a        (x, y) float32 dask.array<chunksize=(25, 25), meta=np.ndarray>
    b        (x) float64 dask.array<chunksize=(25,), meta=np.ndarray>
    c        (y) float64 dask.array<chunksize=(25,), meta=np.ndarray>

* Intermediate: <zarr.hierarchy.Group '/'>

* Target      : <zarr.hierarchy.Group '/'>

Copy link
Collaborator

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Defaulted the temporary store location to somewhere in the system temp dir for Zarr groups and Xarray datasets

    • It didn't make much sense IMO for temp_store to be optional in the API and not have a default be set for collections of arrays. Otherwise when it's not set, an assertion error was thrown if any one array is rechunked to a different size.

I would rather require an explicit temp directory for now. My concern is that using a local directory as a default is likely to result in unexpected errors when scaling up rechunker for "production" use cases that run on multiple machines. Perhaps there is some way we might ask Executors to provide a location for temporary storage

Note that in the future there will be Executors that don't require temporary arrays on disk (e.g., see Beam in #36).

@pytest.mark.parametrize("target_chunks", [(20, 10)])
@pytest.mark.parametrize("max_mem", ["10MB"])
@pytest.mark.parametrize("pass_temp", [True, False])
@pytest.mark.parametrize("executor", ["dask", api._get_executor("dask")])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to also test non-dask executors on xarray.Dataset objects, e.g. beam. I assume this would work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm it doesn't, but I can't tell if it should. I'm converting all the arrays in the dataset to dask and sending them through the dask.array.Array codepath, which is apparently also broken for all other executors. This is the trace I get in one form or another from different executors:

Traceback (most recent call last):
  File "/Users/eczech/repos/pydata/rechunker/tests/test_rechunk.py", line 100, in test_rechunk_dataset
    rechunked.execute()
  File "/Users/eczech/repos/pydata/rechunker/rechunker/api.py", line 77, in execute
    self._executor.execute_plan(self._plan, **kwargs)
  File "/Users/eczech/repos/pydata/rechunker/rechunker/executors/python.py", line 31, in execute_plan
    plan()
  File "/Users/eczech/repos/pydata/rechunker/rechunker/executors/python.py", line 44, in _execute_all
    task()
  File "/Users/eczech/repos/pydata/rechunker/rechunker/executors/python.py", line 39, in _direct_array_copy
    target[key] = source[key]
  File "/Users/eczech/.conda/envs/rechunker-dev/lib/python3.7/site-packages/zarr/core.py", line 1115, in __setitem__
    self.set_basic_selection(selection, value, fields=fields)
  File "/Users/eczech/.conda/envs/rechunker-dev/lib/python3.7/site-packages/zarr/core.py", line 1210, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/Users/eczech/.conda/envs/rechunker-dev/lib/python3.7/site-packages/zarr/core.py", line 1501, in _set_basic_selection_nd
    self._set_selection(indexer, value, fields=fields)
  File "/Users/eczech/.conda/envs/rechunker-dev/lib/python3.7/site-packages/zarr/core.py", line 1550, in _set_selection
    self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
  File "/Users/eczech/.conda/envs/rechunker-dev/lib/python3.7/site-packages/zarr/core.py", line 1665, in _chunk_setitem
    fields=fields)
  File "/Users/eczech/.conda/envs/rechunker-dev/lib/python3.7/site-packages/zarr/core.py", line 1687, in _chunk_setitem_nosync
    chunk = value.astype(self._dtype, order=self._order, copy=False)
  File "/Users/eczech/.conda/envs/rechunker-dev/lib/python3.7/site-packages/dask/array/core.py", line 1843, in astype
    "arguments: {0!s}".format(list(extra))
TypeError: astype does not take the following keyword arguments: ['order']

Two potential solutions I see are:

  1. Coerce all dataset variables to zarr when not using the dask executor
  2. Make the other executors work on dask arrays

The first seems like a bad idea, and I'm not sure how to do the second.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect it would just to change target[key] = source[key] to target[key] = np.asarray(source[key]) inside _direct_array_copy? Or perhaps this fix could be done upstream inside __setitem__ on Zarr arrays?

Anyways, this can definitely be saved for later!

@rabernat
Copy link
Member

rabernat commented Oct 1, 2020

Thanks for all the hard work happening here!

I would rather require an explicit temp directory for now. My concern is that using a local directory as a default is likely to result in unexpected errors when scaling up rechunker for "production" use cases that run on multiple machines.

👍 to this. Our main use of rechunker is using dask in the cloud with object store, where there is no shared local filesystem. I'd like to avoid any default assumptions about the nature of the storage.

Going forward, maybe we could consider adding some sort of config system for rechunker, which would allow you to specify your preferred way of creating temporary storage.

@eric-czech
Copy link
Contributor Author

Our main use of rechunker is using dask in the cloud with object store, where there is no shared local filesystem

Ah of course, makes sense.

In 67ee2aa, I removed the default temp store, added a better error when it's not present, and added a NotImplementedError when the source is Xarray and the executor is anything but dask. I think I should probably do the same for when the source is da.Array. Does this sound right to you both?

@shoyer
Copy link
Collaborator

shoyer commented Oct 1, 2020

In 67ee2aa, I removed the default temp store, added a better error when it's not present, and added a NotImplementedError when the source is Xarray and the executor is anything but dask. I think I should probably do the same for when the source is da.Array. Does this sound right to you both?

This sounds fine to me for now.

Long term, I do think it could make sense to pass an xarray.Dataset backed by multi-threaded dask into alternative executors, such as Beam. But this certainly isn't urgent.

@eric-czech
Copy link
Contributor Author

In 67ee2aa, I removed the default temp store, added a better error when it's not present, and added a NotImplementedError when the source is Xarray and the executor is anything but dask. I think I should probably do the same for when the source is da.Array. Does this sound right to you both?

This sounds fine to me for now.

Ok, 8502a33 adds a similar error for dask array sources.

Long term, I do think it could make sense to pass an xarray.Dataset backed by multi-threaded dask into alternative executors, such as Beam. But this certainly isn't urgent.

I see, maybe the error in #52 (comment) is actually pretty superficial? I can't tell whether or not that's hinting at a fundamental limitation.

@eric-czech
Copy link
Contributor Author

Is there anything else you guys think I should address on this one?

@eric-czech
Copy link
Contributor Author

Hey @shoyer sorry to keep bugging you about this one, but is there anything else you'd like me to change?

@rabernat
Copy link
Member

rabernat commented Oct 6, 2020

Hi @eric-czech. Thanks for your work on this! And thanks for your patience.

I'm fine with merging now. I assume issues will come up as people try it out, and we can iterate as needed.

@rabernat rabernat merged commit 4b0da52 into pangeo-data:master Oct 6, 2020
@eric-czech
Copy link
Contributor Author

Thanks @rabernat! And for your suggestions @shoyer.

@tomwhite tomwhite mentioned this pull request Oct 23, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants