Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rechunking for Xarray datasets #52

Merged
merged 4 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 135 additions & 34 deletions rechunker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@
import zarr
import dask
import dask.array
import xarray
import tempfile

from rechunker.algorithm import rechunking_plan
from rechunker.types import ArrayProxy, CopySpec, Executor
from xarray.backends.zarr import (
encode_zarr_attr_value,
encode_zarr_variable,
extract_zarr_variable_encoding,
DIMENSION_KEY,
)
from xarray.conventions import encode_dataset_coordinates


class Rechunked:
Expand Down Expand Up @@ -69,18 +78,13 @@ def execute(self, **kwargs):
return self._target

def __repr__(self):
entries = []
entries.append(f"\n* Source : {repr(self._source)}")
if self._intermediate is not None:
intermediate = f"\n* Intermediate: {repr(self._intermediate)}"
else:
intermediate = ""

return textwrap.dedent(
f"""\
<Rechunked>
* Source : {repr(self._source)}{{}}
* Target : {repr(self._target)}
"""
).format(intermediate)
entries.append(f"\n* Intermediate: {repr(self._intermediate)}")
entries.append(f"\n* Target : {repr(self._target)}")
entries = "\n".join(entries)
return f"<Rechunked>{entries}\n"

def _repr_html_(self):
entries = {}
Expand Down Expand Up @@ -137,6 +141,10 @@ def _get_dims_from_zarr_array(z_array):
return z_array.attrs["_ARRAY_DIMENSIONS"]


def _encode_zarr_attributes(attrs):
return {k: encode_zarr_attr_value(v) for k, v in attrs.items()}


def _zarr_empty(shape, store_or_group, chunks, dtype, name=None, **kwargs):
# wrapper that maybe creates the array within a group
if name is not None:
Expand All @@ -150,6 +158,27 @@ def _zarr_empty(shape, store_or_group, chunks, dtype, name=None, **kwargs):
)


ZARR_OPTIONS = [
"compressor",
"filters",
"order",
"cache_metadata",
"cache_attrs",
"overwrite",
]


def _validate_options(options):
if not options:
return
for o in options:
if o not in ZARR_OPTIONS:
raise ValueError(
f"Zarr options must not include {o} (got {o}={options[o]}). "
f"Only the following options are supported: {ZARR_OPTIONS}."
)


def _get_executor(name: str) -> Executor:
# converts a string name into a Executor instance
# imports are conditional to avoid hard dependencies
Expand Down Expand Up @@ -206,7 +235,7 @@ def rechunk(
attributes (see Xarray :ref:`xarray:zarr_encoding`.)
A value of ``None`` means that the array will
be copied with no change to its chunk structure.
- For a group, a dict is required. The keys correspond to array names.
- For a group of arrays, a dict is required. The keys correspond to array names.
The values are ``target_chunks`` arguments for the array. For example,
``{'foo': (20, 10), 'bar': {'x': 3, 'y': 5}, 'baz': None}``.
*All arrays you want to rechunk must be explicitly named.* Arrays
Expand All @@ -219,16 +248,25 @@ def rechunk(
The location in which to store the final, rechunked result.
Will be passed directly to :py:meth:`zarr.creation.create`
target_options: Dict, optional
Additional keyword arguments used to create target arrays.
See :py:meth:`zarr.creation.create` for arguments available.
Must not include any of [``shape``, ``chunks``, ``dtype``, ``store``].
Additional keyword arguments used to control array storage.
If the source is :py:class:`xarray.Dataset`, then these options will be used
to encode variables in the same manner as the ``encoding`` parameter in
:py:meth:`xarray.Dataset.to_zarr`. Otherwise, these options will be passed
to :py:meth:`zarr.creation.create`. The structure depends on ``source``.

- For a single array source, this should be a single dict such
as ``{'compressor': zarr.Blosc(), 'order': 'F'}``.
- For a group of arrays, a nested dict is required with values
like the above keyed by array name. For example,
``{'foo': {'compressor': zarr.Blosc(), 'order': 'F'}, 'bar': {'compressor': None}}``.

temp_store : str, MutableMapping, or zarr.Store object, optional
Location of temporary store for intermediate data. Can be deleted
once rechunking is complete.
temp_options: Dict, optional
Additional keyword arguments used to create intermediate arrays.
See :py:meth:`zarr.creation.create` for arguments available.
Must not include any of [``shape``, ``chunks``, ``dtype``, ``store``].
Options with same semantics as ``target_options`` for ``temp_store`` rather than
``target_store``. Defaults to ``target_options`` and has no effect when source
is of type xarray.Dataset.
executor: str or rechunker.types.Executor
Implementation of the execution engine for copying between zarr arrays.
Supplying a custom Executor is currently even more experimental than the
Expand Down Expand Up @@ -263,7 +301,79 @@ def _setup_rechunk(
temp_store=None,
temp_options=None,
):
if isinstance(source, zarr.hierarchy.Group):
if temp_options is None:
temp_options = target_options
target_options = target_options or {}
temp_options = temp_options or {}

if isinstance(source, xarray.Dataset):
if not isinstance(target_chunks, dict):
raise ValueError(
"You must specify ``target-chunks`` as a dict when rechunking a dataset."
)

variables, attrs = encode_dataset_coordinates(source)
attrs = _encode_zarr_attributes(attrs)

if temp_store:
temp_group = zarr.group(temp_store)
else:
temp_group = zarr.group(tempfile.mkdtemp(".zarr", "temp_store_"))
target_group = zarr.group(target_store)
target_group.attrs.update(attrs)

copy_specs = []
for name, variable in variables.items():
# This isn't strictly necessary because a shallow copy
# also occurs in `encode_dataset_coordinates` but do it
# anyways in case the coord encoding function changes
variable = variable.copy()

# Update the array encoding with provided options and apply it;
# note that at this point the `options` may contain any valid property
# applicable for the `encoding` parameter in Dataset.to_zarr other than "chunks"
options = target_options.get(name, {})
if "chunks" in options:
raise ValueError(
f"Chunks must be provided in ``target_chunks`` rather than options (variable={name})"
)
variable.encoding.update(options)
variable = encode_zarr_variable(variable)

# Extract the array encoding to get a default chunking, a step
# which will also ensure that the target chunking is compatible
# with the current chunking (only necessary for on-disk arrays)
variable_encoding = extract_zarr_variable_encoding(
variable, raise_on_invalid=False, name=name
)
variable_chunks = target_chunks.get(name, variable_encoding["chunks"])

# Restrict options to only those that are specific to zarr and
# not managed internally
options = {k: v for k, v in options.items() if k in ZARR_OPTIONS}
_validate_options(options)

# Extract array attributes along with reserved property for
# xarray dimension names
variable_attrs = _encode_zarr_attributes(variable.attrs)
variable_attrs[DIMENSION_KEY] = encode_zarr_attr_value(variable.dims)

copy_spec = _setup_array_rechunk(
dask.array.asarray(variable),
variable_chunks,
max_mem,
target_group,
target_options=options,
temp_store_or_group=temp_group,
temp_options=options,
name=name,
)
copy_spec.write.array.attrs.update(variable_attrs) # type: ignore
copy_specs.append(copy_spec)

return copy_specs, temp_group, target_group

elif isinstance(source, zarr.hierarchy.Group):
if not isinstance(target_chunks, dict):
raise ValueError(
"You must specify ``target-chunks`` as a dict when rechunking a group."
Expand All @@ -272,7 +382,7 @@ def _setup_rechunk(
if temp_store:
temp_group = zarr.group(temp_store)
else:
temp_group = None
temp_group = zarr.group(tempfile.mkdtemp(".zarr", "temp_store_"))
target_group = zarr.group(target_store)
target_group.attrs.update(source.attrs)

Expand All @@ -283,9 +393,9 @@ def _setup_rechunk(
array_target_chunks,
max_mem,
target_group,
target_options=target_options,
target_options=target_options.get(array_name),
temp_store_or_group=temp_group,
temp_options=temp_options,
temp_options=temp_options.get(array_name),
name=array_name,
)
copy_specs.append(copy_spec)
Expand All @@ -308,18 +418,9 @@ def _setup_rechunk(
return [copy_spec], intermediate, target

else:
raise ValueError("Source must be a Zarr Array or Group, or a Dask Array.")


def _validate_options(options):
if not options:
return
for k in ["shape", "chunks", "dtype", "store", "name"]:
if k in options:
raise ValueError(
f"Optional array arguments must not include {k} (provided {k}={options[k]}). "
"Values for this property are managed internally."
)
raise ValueError(
f"Source must be a Zarr Array, Zarr Group, Dask Array or Xarray Dataset (not {type(source)})."
)


def _setup_array_rechunk(
Expand Down
5 changes: 1 addition & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
long_description = f.read()


install_requires = [
"dask[array]",
"zarr",
]
install_requires = ["dask[array]", "zarr", "xarray"]
doc_requires = [
"sphinx",
"sphinxcontrib-srclinks",
Expand Down
Loading