Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate dimension coordinates #210

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import functools
import itertools
import logging
import os
import warnings
Expand All @@ -26,6 +27,7 @@
# use this filename to store global recipe metadata in the metadata_cache
# it will be written once (by prepare_target) and read many times (by store_chunk)
_GLOBAL_METADATA_KEY = "pangeo-forge-recipe-metadata.json"
_ARRAY_DIMENSIONS = "_ARRAY_DIMENSIONS"
MAX_MEMORY = (
int(os.getenv("PANGEO_FORGE_MAX_MEMORY")) # type: ignore
if os.getenv("PANGEO_FORGE_MAX_MEMORY")
Expand Down Expand Up @@ -621,9 +623,41 @@ def store_chunk(
zarr_array[zarr_region] = data


def finalize_target(target: CacheFSSpecTarget, consolidate_zarr: bool) -> None:
def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]:
return list(
set(itertools.chain(*(group[var].attrs.get(_ARRAY_DIMENSIONS, []) for var in group)))
)


def finalize_target(
target: CacheFSSpecTarget,
consolidate_zarr: bool,
consolidate_dimension_coordinates: bool = True,
) -> None:
if target is None:
raise ValueError("target has not been set.")

if consolidate_dimension_coordinates:
logger.info("Consolidating dimension coordinate arrays")
target_mapper = target.get_mapper()
group = zarr.open(target_mapper, mode="a")
dims = _gather_coordinate_dimensions(group)
for dim in dims:
arr = group[dim]
attrs = dict(arr.attrs)
new = group.array(
dim,
arr[:],
chunks=arr.shape,
dtype=arr.dtype,
compressor=arr.compressor,
fill_value=arr.fill_value,
order=arr.order,
filters=arr.filters,
overwrite=True,
)
new.attrs.update(attrs)

if consolidate_zarr:
logger.info("Consolidating Zarr metadata")
target_mapper = target.get_mapper()
Expand Down Expand Up @@ -661,6 +695,9 @@ class XarrayZarrRecipe(BaseRecipe, FilePatternRecipeMixin):
``xr.open_dataset``. This is required for engines that can't open
file-like objects (e.g. pynio).
:param consolidate_zarr: Whether to consolidate the resulting Zarr dataset.
:param consolidate_dimension_coordinates: Whether to rewrite coordinate variables as a
single chunk. We recommend consolidating coordinate variables to avoid
many small read requests to get the coordinates in xarray.
:param xarray_open_kwargs: Extra options for opening the inputs with Xarray.
:param xarray_concat_kwargs: Extra options to pass to Xarray when concatenating
the inputs to form a chunk.
Expand All @@ -685,6 +722,7 @@ class XarrayZarrRecipe(BaseRecipe, FilePatternRecipeMixin):
cache_inputs: Optional[bool] = None
copy_input_to_local_file: bool = False
consolidate_zarr: bool = True
consolidate_dimension_coordinates: bool = True
xarray_open_kwargs: dict = field(default_factory=dict)
xarray_concat_kwargs: dict = field(default_factory=dict)
delete_input_encoding: bool = True
Expand Down Expand Up @@ -851,7 +889,10 @@ def store_chunk(self) -> Callable[[Hashable], None]:
@property
def finalize_target(self) -> Callable[[], None]:
return functools.partial(
finalize_target, target=self.target, consolidate_zarr=self.consolidate_zarr
finalize_target,
target=self.target,
consolidate_zarr=self.consolidate_zarr,
consolidate_dimension_coordinates=self.consolidate_dimension_coordinates,
)

def iter_inputs(self) -> Iterator[InputKey]:
Expand Down
18 changes: 18 additions & 0 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
import xarray as xr
import zarr

# need to import this way (rather than use pytest.lazy_fixture) to make it work with dask
from pytest_lazyfixture import lazy_fixture
Expand Down Expand Up @@ -274,6 +275,10 @@ def do_actual_chunks_test(
assert all([item == chunk_len for item in ds_actual.chunks[other_dim][:-1]])

ds_actual.load()
store = zarr.open_consolidated(target.get_mapper())
for dim in ds_actual.dims:
assert store[dim].chunks == ds_actual[dim].shape

xr.testing.assert_identical(ds_actual, ds_expected)


Expand Down Expand Up @@ -340,6 +345,19 @@ def test_chunks_distributed_locking(
)


def test_no_consolidate_dimension_coordinates(netCDFtoZarr_recipe):
RecipeClass, file_pattern, kwargs, ds_expected, target = netCDFtoZarr_recipe

rec = RecipeClass(file_pattern, **kwargs)
rec.consolidate_dimension_coordinates = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect...my one last comment was going to be that we needed a test for both options, but that's already here.

rec.to_function()()
ds_actual = xr.open_zarr(target.get_mapper()).load()
xr.testing.assert_identical(ds_actual, ds_expected)

store = zarr.open_consolidated(target.get_mapper())
assert store["time"].chunks == (file_pattern.nitems_per_input["time"],)


def test_lock_timeout(netCDFtoZarr_recipe_sequential_only, execute_recipe_no_dask):
RecipeClass, file_pattern, kwargs, ds_expected, target = netCDFtoZarr_recipe_sequential_only

Expand Down