Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zarr consolidated #2559

Merged
merged 16 commits into from
Dec 4, 2018
Merged
2 changes: 1 addition & 1 deletion ci/requirements-py36-zarr-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ dependencies:
- pip:
- coveralls
- pytest-cov
- git+https://github.com/alimanfoo/zarr.git
- git+https://github.com/zarr-developers/zarr.git
27 changes: 26 additions & 1 deletion doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,31 @@ For example:
Not all native zarr compression and filtering options have been tested with
xarray.

Consolidated Metadata
~~~~~~~~~~~~~~~~~~~~~

Xarray needs to read all of the zarr metadata when it opens a dataset.
In some storage mediums, such as with cloud object storage (e.g. amazon S3),
this can introduce significant overhead, because two separate HTTP calls to the
object store must be made for each variable in the dataset.
With version 2.3, zarr will support a feature called *consolidated metadata*,
which allows all metadata for the entire dataset to be stored with a single
key (by default called ``.zmetadata``). This can drastically speed up
opening the store. (For more information on this feature, consult the
`zarr docs <https://zarr.readthedocs.io/en/latest/tutorial.html#consolidating-metadata>`_.)

If you have zarr version 2.3 or greater, xarray can write and read stores
with consolidated metadata. To write consolidated metadata, pass the
``consolidated=True`` option to the
:py:attr:`Dataset.to_zarr <xarray.Dataset.to_zarr>` method::

ds.to_zarr('foo.zarr', consolidated=True)

To read a consolidated store, pass the ``consolidated=True`` option to
:py:func:`~xarray.open_zarr`::

ds = xr.open_zarr('foo.zarr', consolidated=True)
jhamman marked this conversation as resolved.
Show resolved Hide resolved

.. _io.cfgrib:

GRIB format via cfgrib
Expand Down Expand Up @@ -678,7 +703,7 @@ Formats supported by PseudoNetCDF
---------------------------------

xarray can also read CAMx, BPCH, ARL PACKED BIT, and many other file
formats supported by PseudoNetCDF_, if PseudoNetCDF is installed.
formats supported by PseudoNetCDF_, if PseudoNetCDF is installed.
PseudoNetCDF can also provide Climate Forecasting Conventions to
CMAQ files. In addition, PseudoNetCDF can automatically register custom
readers that subclass PseudoNetCDF.PseudoNetCDFFile. PseudoNetCDF can
Expand Down
14 changes: 8 additions & 6 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Breaking changes
Enhancements
~~~~~~~~~~~~

- Ability to read and write consolidated metadata in zarr stores.
By `Ryan Abernathey <https://github.com/rabernat>`_.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you reference the issue this is attached to: (:issue:`2558`).

- :py:class:`CFTimeIndex` uses slicing for string indexing when possible (like
:py:class:`pandas.DatetimeIndex`), which avoids unnecessary copies.
By `Stephan Hoyer <https://github.com/shoyer>`_
Expand All @@ -56,15 +58,15 @@ Breaking changes
- ``Dataset.T`` has been removed as a shortcut for :py:meth:`Dataset.transpose`.
Call :py:meth:`Dataset.transpose` directly instead.
- Iterating over a ``Dataset`` now includes only data variables, not coordinates.
Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now
Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now
includes only data variables.
- ``DataArray.__contains__`` (used by Python's ``in`` operator) now checks
array data, not coordinates.
array data, not coordinates.
- The old resample syntax from before xarray 0.10, e.g.,
``data.resample('1D', dim='time', how='mean')``, is no longer supported will
raise an error in most cases. You need to use the new resample syntax
instead, e.g., ``data.resample(time='1D').mean()`` or
``data.resample({'time': '1D'}).mean()``.
``data.resample({'time': '1D'}).mean()``.


- New deprecations (behavior will be changed in xarray 0.12):
Expand Down Expand Up @@ -101,13 +103,13 @@ Breaking changes
than by default trying to coerce them into ``np.datetime64[ns]`` objects.
A :py:class:`~xarray.CFTimeIndex` will be used for indexing along time
coordinates in these cases.
- A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added
- A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added
to aid in converting from a :py:class:`~xarray.CFTimeIndex` to a
:py:class:`pandas.DatetimeIndex` for the remaining use-cases where
using a :py:class:`~xarray.CFTimeIndex` is still a limitation (e.g. for
resample or plotting).
- Setting the ``enable_cftimeindex`` option is now a no-op and emits a
``FutureWarning``.
``FutureWarning``.

Enhancements
~~~~~~~~~~~~
Expand Down Expand Up @@ -194,7 +196,7 @@ Bug fixes
the dates must be encoded using cftime rather than NumPy (:issue:`2272`).
By `Spencer Clark <https://github.com/spencerkclark>`_.

- Chunked datasets can now roundtrip to Zarr storage continually
- Chunked datasets can now roundtrip to Zarr storage continually
with `to_zarr` and ``open_zarr`` (:issue:`2300`).
By `Lily Wang <https://github.com/lilyminium>`_.

Expand Down
20 changes: 12 additions & 8 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,


def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
encoding=None, compute=True):
encoding=None, compute=True, consolidated=False):
"""This function creates an appropriate datastore for writing a dataset to
a zarr ztore

Expand All @@ -876,16 +876,20 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
_validate_dataset_names(dataset)
_validate_attrs(dataset)

store = backends.ZarrStore.open_group(store=store, mode=mode,
synchronizer=synchronizer,
group=group)
zstore = backends.ZarrStore.open_group(store=store, mode=mode,
synchronizer=synchronizer,
group=group,
consolidate_on_close=consolidated)

writer = ArrayWriter()
# TODO: figure out how to properly handle unlimited_dims
dump_to_store(dataset, store, writer, encoding=encoding)
dump_to_store(dataset, zstore, writer, encoding=encoding)
writes = writer.sync(compute=compute)

if not compute:
if compute:
_finalize_store(writes, zstore)
else:
import dask
return dask.delayed(_finalize_store)(writes, store)
return store
return dask.delayed(_finalize_store)(writes, zstore)

return zstore
37 changes: 29 additions & 8 deletions xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ class ZarrStore(AbstractWritableDataStore):
"""

@classmethod
def open_group(cls, store, mode='r', synchronizer=None, group=None):
def open_group(cls, store, mode='r', synchronizer=None, group=None,
consolidated=False, consolidate_on_close=False):
import zarr
min_zarr = '2.2'

Expand All @@ -234,15 +235,27 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None):
"installation "
"http://zarr.readthedocs.io/en/stable/"
"#installation" % min_zarr)
zarr_group = zarr.open_group(store=store, mode=mode,
synchronizer=synchronizer, path=group)
return cls(zarr_group)

def __init__(self, zarr_group):
if consolidated or consolidate_on_close:
if LooseVersion(zarr.__version__) <= '2.2.1.dev2': # pragma: no cover
raise NotImplementedError("Zarr version 2.2.1.dev2 or greater "
"is required by for consolidated "
"metadata.")

open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group)
if consolidated:
# TODO: an option to pass the metadata_key keyword
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to consider this TODO here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything to do here now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we feel that it's important to expose this functionality from within xarray? I don't.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't.
I think it's ok for xarray to have an opinion on what the special key is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose we just leave these TODO's here as is. If anyone ever needs this feature from the xarray side, this will help guide them on how to implement it.

zarr_group = zarr.open_consolidated(store, **open_kwargs)
else:
zarr_group = zarr.open_group(store, **open_kwargs)
return cls(zarr_group, consolidate_on_close)

def __init__(self, zarr_group, consolidate_on_close=False):
self.ds = zarr_group
self._read_only = self.ds.read_only
self._synchronizer = self.ds.synchronizer
self._group = self.ds.path
self._consolidate_on_close = consolidate_on_close

def open_store_variable(self, name, zarr_array):
data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self))
Expand Down Expand Up @@ -333,11 +346,16 @@ def store(self, variables, attributes, *args, **kwargs):
def sync(self):
pass

def close(self):
if self._consolidate_on_close:
import zarr
zarr.consolidate_metadata(self.ds.store)


def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
decode_cf=True, mask_and_scale=True, decode_times=True,
concat_characters=True, decode_coords=True,
drop_variables=None):
drop_variables=None, consolidated=False):
"""Load and decode a dataset from a Zarr store.

.. note:: Experimental
Expand Down Expand Up @@ -383,10 +401,13 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,
decode_coords : bool, optional
If True, decode the 'coordinates' attribute to identify coordinates in
the resulting dataset.
drop_variables: string or iterable, optional
drop_variables : string or iterable, optional
A variable or list of variables to exclude from being parsed from the
dataset. This may be useful to drop variables with problems or
inconsistent values.
consolidated : bool, optional
Whether to open the store using zarr's consolidated metadata
capability. Only works for stores that have already been consolidated.

Returns
-------
Expand Down Expand Up @@ -423,7 +444,7 @@ def maybe_decode_store(store, lock=False):
mode = 'r'
zarr_store = ZarrStore.open_group(store, mode=mode,
synchronizer=synchronizer,
group=group)
group=group, consolidated=consolidated)
ds = maybe_decode_store(zarr_store)

# auto chunking needs to be here and not in ZarrStore because variable
Expand Down
20 changes: 14 additions & 6 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None,
compute=compute)

def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
encoding=None, compute=True):
encoding=None, compute=True, consolidated=False):
"""Write dataset contents to a zarr group.

.. note:: Experimental
Expand All @@ -1244,9 +1244,16 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
Nested dictionary with variable names as keys and dictionaries of
variable specific encodings as values, e.g.,
``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}``
compute: boolean
If true compute immediately, otherwise return a
compute: bool, optional
If True compute immediately, otherwise return a
``dask.delayed.Delayed`` object that can be computed later.
consolidated: bool, optional
If True, apply zarr's `consolidate_metadata` function to the store
after writing.

References
----------
https://zarr.readthedocs.io/
"""
if encoding is None:
encoding = {}
Expand All @@ -1256,7 +1263,8 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
"and 'w-'.")
from ..backends.api import to_zarr
return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
group=group, encoding=encoding, compute=compute)
group=group, encoding=encoding, compute=compute,
consolidated=consolidated)

def __unicode__(self):
return formatting.dataset_repr(self)
Expand Down Expand Up @@ -1380,7 +1388,7 @@ def _validate_indexers(self, indexers):
""" Here we make sure
+ indexer has a valid keys
+ indexer is in a valid data type
+ string indexers are cast to the appropriate date type if the
+ string indexers are cast to the appropriate date type if the
associated index is a DatetimeIndex or CFTimeIndex
"""
from .dataarray import DataArray
Expand Down Expand Up @@ -1963,7 +1971,7 @@ def _validate_interp_indexer(x, new_x):
'Instead got\n{}'.format(new_x))
else:
return (x, new_x)

variables = OrderedDict()
for name, var in iteritems(obj._variables):
if name not in indexers:
Expand Down
9 changes: 9 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,15 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={},
allow_cleanup_failure=False):
pytest.skip("zarr backend does not support appending")

def test_roundtrip_consolidated(self):
zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2")
expected = create_test_data()
with self.roundtrip(expected,
save_kwargs={'consolidated': True},
open_kwargs={'consolidated': True}) as actual:
self.check_dtypes_roundtripped(expected, actual)
assert_identical(expected, actual)

def test_auto_chunk(self):
original = create_test_data().chunk()

Expand Down
20 changes: 16 additions & 4 deletions xarray/tests/test_distributed.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" isort:skip_file """
from __future__ import absolute_import, division, print_function
from distutils.version import LooseVersion
import os
import sys
import pickle
Expand Down Expand Up @@ -118,15 +119,26 @@ def test_dask_distributed_read_netcdf_integration_test(


@requires_zarr
def test_dask_distributed_zarr_integration_test(loop):
@pytest.mark.parametrize('consolidated', [True, False])
@pytest.mark.parametrize('compute', [True, False])
def test_dask_distributed_zarr_integration_test(loop, consolidated, compute):
if consolidated:
zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2")
write_kwargs = dict(consolidated=True)
read_kwargs = dict(consolidated=True)
else:
write_kwargs = read_kwargs = {}
chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5}
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as c:
original = create_test_data().chunk(chunks)
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS,
suffix='.zarr') as filename:
original.to_zarr(filename)
with xr.open_zarr(filename) as restored:
suffix='.zarrc') as filename:
maybe_futures = original.to_zarr(filename, compute=compute,
**write_kwargs)
if not compute:
maybe_futures.compute()
with xr.open_zarr(filename, **read_kwargs) as restored:
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_allclose(original, computed)
Expand Down