diff --git a/ci/requirements-py36-zarr-dev.yml b/ci/requirements-py36-zarr-dev.yml index 7fbce63aa81..6ed466ba5cb 100644 --- a/ci/requirements-py36-zarr-dev.yml +++ b/ci/requirements-py36-zarr-dev.yml @@ -18,4 +18,4 @@ dependencies: - pip: - coveralls - pytest-cov - - git+https://github.com/alimanfoo/zarr.git + - git+https://github.com/zarr-developers/zarr.git diff --git a/doc/io.rst b/doc/io.rst index e841e665308..682fbf5202e 100644 --- a/doc/io.rst +++ b/doc/io.rst @@ -635,6 +635,35 @@ For example: Not all native zarr compression and filtering options have been tested with xarray. +Consolidated Metadata +~~~~~~~~~~~~~~~~~~~~~ + +Xarray needs to read all of the zarr metadata when it opens a dataset. +In some storage mediums, such as with cloud object storage (e.g. amazon S3), +this can introduce significant overhead, because two separate HTTP calls to the +object store must be made for each variable in the dataset. +With version 2.3, zarr will support a feature called *consolidated metadata*, +which allows all metadata for the entire dataset to be stored with a single +key (by default called ``.zmetadata``). This can drastically speed up +opening the store. (For more information on this feature, consult the +`zarr docs `_.) + +If you have zarr version 2.3 or greater, xarray can write and read stores +with consolidated metadata. To write consolidated metadata, pass the +``consolidated=True`` option to the +:py:attr:`Dataset.to_zarr ` method:: + + ds.to_zarr('foo.zarr', consolidated=True) + +To read a consolidated store, pass the ``consolidated=True`` option to +:py:func:`~xarray.open_zarr`:: + + ds = xr.open_zarr('foo.zarr', consolidated=True) + +Xarray can't perform consolidation on pre-existing zarr datasets. This should +be done directly from zarr, as described in the +`zarr docs `_. + .. _io.cfgrib: GRIB format via cfgrib @@ -678,7 +707,7 @@ Formats supported by PseudoNetCDF --------------------------------- xarray can also read CAMx, BPCH, ARL PACKED BIT, and many other file -formats supported by PseudoNetCDF_, if PseudoNetCDF is installed. +formats supported by PseudoNetCDF_, if PseudoNetCDF is installed. PseudoNetCDF can also provide Climate Forecasting Conventions to CMAQ files. In addition, PseudoNetCDF can automatically register custom readers that subclass PseudoNetCDF.PseudoNetCDFFile. PseudoNetCDF can diff --git a/doc/whats-new.rst b/doc/whats-new.rst index b1d5b92da4d..040f72acb56 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -36,6 +36,8 @@ Breaking changes Enhancements ~~~~~~~~~~~~ +- Ability to read and write consolidated metadata in zarr stores (:issue:`2558`). + By `Ryan Abernathey `_. - :py:class:`CFTimeIndex` uses slicing for string indexing when possible (like :py:class:`pandas.DatetimeIndex`), which avoids unnecessary copies. By `Stephan Hoyer `_ @@ -56,15 +58,15 @@ Breaking changes - ``Dataset.T`` has been removed as a shortcut for :py:meth:`Dataset.transpose`. Call :py:meth:`Dataset.transpose` directly instead. - Iterating over a ``Dataset`` now includes only data variables, not coordinates. - Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now + Similarily, calling ``len`` and ``bool`` on a ``Dataset`` now includes only data variables. - ``DataArray.__contains__`` (used by Python's ``in`` operator) now checks - array data, not coordinates. + array data, not coordinates. - The old resample syntax from before xarray 0.10, e.g., ``data.resample('1D', dim='time', how='mean')``, is no longer supported will raise an error in most cases. You need to use the new resample syntax instead, e.g., ``data.resample(time='1D').mean()`` or - ``data.resample({'time': '1D'}).mean()``. + ``data.resample({'time': '1D'}).mean()``. - New deprecations (behavior will be changed in xarray 0.12): @@ -101,13 +103,13 @@ Breaking changes than by default trying to coerce them into ``np.datetime64[ns]`` objects. A :py:class:`~xarray.CFTimeIndex` will be used for indexing along time coordinates in these cases. - - A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added + - A new method :py:meth:`~xarray.CFTimeIndex.to_datetimeindex` has been added to aid in converting from a :py:class:`~xarray.CFTimeIndex` to a :py:class:`pandas.DatetimeIndex` for the remaining use-cases where using a :py:class:`~xarray.CFTimeIndex` is still a limitation (e.g. for resample or plotting). - Setting the ``enable_cftimeindex`` option is now a no-op and emits a - ``FutureWarning``. + ``FutureWarning``. Enhancements ~~~~~~~~~~~~ @@ -194,7 +196,7 @@ Bug fixes the dates must be encoded using cftime rather than NumPy (:issue:`2272`). By `Spencer Clark `_. -- Chunked datasets can now roundtrip to Zarr storage continually +- Chunked datasets can now roundtrip to Zarr storage continually with `to_zarr` and ``open_zarr`` (:issue:`2300`). By `Lily Wang `_. diff --git a/xarray/backends/api.py b/xarray/backends/api.py index c1ace7774f9..f2b6bc196a0 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -861,7 +861,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None, def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True): + encoding=None, compute=True, consolidated=False): """This function creates an appropriate datastore for writing a dataset to a zarr ztore @@ -876,16 +876,20 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None, _validate_dataset_names(dataset) _validate_attrs(dataset) - store = backends.ZarrStore.open_group(store=store, mode=mode, - synchronizer=synchronizer, - group=group) + zstore = backends.ZarrStore.open_group(store=store, mode=mode, + synchronizer=synchronizer, + group=group, + consolidate_on_close=consolidated) writer = ArrayWriter() # TODO: figure out how to properly handle unlimited_dims - dump_to_store(dataset, store, writer, encoding=encoding) + dump_to_store(dataset, zstore, writer, encoding=encoding) writes = writer.sync(compute=compute) - if not compute: + if compute: + _finalize_store(writes, zstore) + else: import dask - return dask.delayed(_finalize_store)(writes, store) - return store + return dask.delayed(_finalize_store)(writes, zstore) + + return zstore diff --git a/xarray/backends/zarr.py b/xarray/backends/zarr.py index 06fe7f04e4f..05e445a1e88 100644 --- a/xarray/backends/zarr.py +++ b/xarray/backends/zarr.py @@ -224,7 +224,8 @@ class ZarrStore(AbstractWritableDataStore): """ @classmethod - def open_group(cls, store, mode='r', synchronizer=None, group=None): + def open_group(cls, store, mode='r', synchronizer=None, group=None, + consolidated=False, consolidate_on_close=False): import zarr min_zarr = '2.2' @@ -234,15 +235,27 @@ def open_group(cls, store, mode='r', synchronizer=None, group=None): "installation " "http://zarr.readthedocs.io/en/stable/" "#installation" % min_zarr) - zarr_group = zarr.open_group(store=store, mode=mode, - synchronizer=synchronizer, path=group) - return cls(zarr_group) - def __init__(self, zarr_group): + if consolidated or consolidate_on_close: + if LooseVersion(zarr.__version__) <= '2.2.1.dev2': # pragma: no cover + raise NotImplementedError("Zarr version 2.2.1.dev2 or greater " + "is required by for consolidated " + "metadata.") + + open_kwargs = dict(mode=mode, synchronizer=synchronizer, path=group) + if consolidated: + # TODO: an option to pass the metadata_key keyword + zarr_group = zarr.open_consolidated(store, **open_kwargs) + else: + zarr_group = zarr.open_group(store, **open_kwargs) + return cls(zarr_group, consolidate_on_close) + + def __init__(self, zarr_group, consolidate_on_close=False): self.ds = zarr_group self._read_only = self.ds.read_only self._synchronizer = self.ds.synchronizer self._group = self.ds.path + self._consolidate_on_close = consolidate_on_close def open_store_variable(self, name, zarr_array): data = indexing.LazilyOuterIndexedArray(ZarrArrayWrapper(name, self)) @@ -333,11 +346,16 @@ def store(self, variables, attributes, *args, **kwargs): def sync(self): pass + def close(self): + if self._consolidate_on_close: + import zarr + zarr.consolidate_metadata(self.ds.store) + def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, decode_cf=True, mask_and_scale=True, decode_times=True, concat_characters=True, decode_coords=True, - drop_variables=None): + drop_variables=None, consolidated=False): """Load and decode a dataset from a Zarr store. .. note:: Experimental @@ -383,10 +401,13 @@ def open_zarr(store, group=None, synchronizer=None, auto_chunk=True, decode_coords : bool, optional If True, decode the 'coordinates' attribute to identify coordinates in the resulting dataset. - drop_variables: string or iterable, optional + drop_variables : string or iterable, optional A variable or list of variables to exclude from being parsed from the dataset. This may be useful to drop variables with problems or inconsistent values. + consolidated : bool, optional + Whether to open the store using zarr's consolidated metadata + capability. Only works for stores that have already been consolidated. Returns ------- @@ -423,7 +444,7 @@ def maybe_decode_store(store, lock=False): mode = 'r' zarr_store = ZarrStore.open_group(store, mode=mode, synchronizer=synchronizer, - group=group) + group=group, consolidated=consolidated) ds = maybe_decode_store(zarr_store) # auto chunking needs to be here and not in ZarrStore because variable diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 4f9c61b3269..c289703875d 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -1222,7 +1222,7 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None, compute=compute) def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, - encoding=None, compute=True): + encoding=None, compute=True, consolidated=False): """Write dataset contents to a zarr group. .. note:: Experimental @@ -1244,9 +1244,16 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, Nested dictionary with variable names as keys and dictionaries of variable specific encodings as values, e.g., ``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}`` - compute: boolean - If true compute immediately, otherwise return a + compute: bool, optional + If True compute immediately, otherwise return a ``dask.delayed.Delayed`` object that can be computed later. + consolidated: bool, optional + If True, apply zarr's `consolidate_metadata` function to the store + after writing. + + References + ---------- + https://zarr.readthedocs.io/ """ if encoding is None: encoding = {} @@ -1256,7 +1263,8 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None, "and 'w-'.") from ..backends.api import to_zarr return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer, - group=group, encoding=encoding, compute=compute) + group=group, encoding=encoding, compute=compute, + consolidated=consolidated) def __unicode__(self): return formatting.dataset_repr(self) @@ -1380,7 +1388,7 @@ def _validate_indexers(self, indexers): """ Here we make sure + indexer has a valid keys + indexer is in a valid data type - + string indexers are cast to the appropriate date type if the + + string indexers are cast to the appropriate date type if the associated index is a DatetimeIndex or CFTimeIndex """ from .dataarray import DataArray @@ -1963,7 +1971,7 @@ def _validate_interp_indexer(x, new_x): 'Instead got\n{}'.format(new_x)) else: return (x, new_x) - + variables = OrderedDict() for name, var in iteritems(obj._variables): if name not in indexers: diff --git a/xarray/tests/test_backends.py b/xarray/tests/test_backends.py index fb9c43c0165..2361b8c2236 100644 --- a/xarray/tests/test_backends.py +++ b/xarray/tests/test_backends.py @@ -1320,6 +1320,15 @@ def roundtrip_append(self, data, save_kwargs={}, open_kwargs={}, allow_cleanup_failure=False): pytest.skip("zarr backend does not support appending") + def test_roundtrip_consolidated(self): + zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2") + expected = create_test_data() + with self.roundtrip(expected, + save_kwargs={'consolidated': True}, + open_kwargs={'consolidated': True}) as actual: + self.check_dtypes_roundtripped(expected, actual) + assert_identical(expected, actual) + def test_auto_chunk(self): original = create_test_data().chunk() diff --git a/xarray/tests/test_distributed.py b/xarray/tests/test_distributed.py index 1837a0fe4ef..bd62b8d906d 100644 --- a/xarray/tests/test_distributed.py +++ b/xarray/tests/test_distributed.py @@ -1,5 +1,6 @@ """ isort:skip_file """ from __future__ import absolute_import, division, print_function +from distutils.version import LooseVersion import os import sys import pickle @@ -118,15 +119,26 @@ def test_dask_distributed_read_netcdf_integration_test( @requires_zarr -def test_dask_distributed_zarr_integration_test(loop): +@pytest.mark.parametrize('consolidated', [True, False]) +@pytest.mark.parametrize('compute', [True, False]) +def test_dask_distributed_zarr_integration_test(loop, consolidated, compute): + if consolidated: + zarr = pytest.importorskip('zarr', minversion="2.2.1.dev2") + write_kwargs = dict(consolidated=True) + read_kwargs = dict(consolidated=True) + else: + write_kwargs = read_kwargs = {} chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5} with cluster() as (s, [a, b]): with Client(s['address'], loop=loop) as c: original = create_test_data().chunk(chunks) with create_tmp_file(allow_cleanup_failure=ON_WINDOWS, - suffix='.zarr') as filename: - original.to_zarr(filename) - with xr.open_zarr(filename) as restored: + suffix='.zarrc') as filename: + maybe_futures = original.to_zarr(filename, compute=compute, + **write_kwargs) + if not compute: + maybe_futures.compute() + with xr.open_zarr(filename, **read_kwargs) as restored: assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_allclose(original, computed)