Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate dimension coordinates #210

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions pangeo_forge_recipes/recipes/xarray_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,25 @@ def store_chunk(
zarr_array[zarr_region] = data


def finalize_target(target: CacheFSSpecTarget, consolidate_zarr: bool) -> None:
def finalize_target(
target: CacheFSSpecTarget,
consolidate_zarr: bool,
consolidate_dimension_coordinates: bool = True,
) -> None:
if target is None:
raise ValueError("target has not been set.")

if consolidate_dimension_coordinates:
logger.info("Consolidating dimension coordinate arrays")
target_mapper = target.get_mapper()
ds = xr.open_zarr(target_mapper) # Probably a better way to get the dimension coords?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could make a set out of the _ARRAY_DIMENSIONS attribute on each array in the group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Done in 1c36e21.

group = zarr.open(target_mapper)
for dim in ds.dims:
attrs = dict(group[dim].attrs)
data = group[dim][:]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes the coordinate fits in-memory on a single machine. I don't know if we're assuming that anywhere else (probably in the tests), but it's probably a safe assumption for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a safe assumption, since the dimension coordinates are guaranteed to be 1D. We could note this caveat in the docs on consolidate_dimension_coordinates.

It also assumes that dim in group. But that's not always the case. Xarray Datasets can have dimensions with no corresponding coordinate, which would give a KeyError here.

group[dim] = data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this will not actually change the chunking. Since the array already exists, this statement will stripe the data over the existing chunks.

group[dim].attrs.update(attrs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you updating the attrs here? It doesn't look like they could have changed at all.


if consolidate_zarr:
logger.info("Consolidating Zarr metadata")
target_mapper = target.get_mapper()
Expand Down Expand Up @@ -661,6 +677,9 @@ class XarrayZarrRecipe(BaseRecipe, FilePatternRecipeMixin):
``xr.open_dataset``. This is required for engines that can't open
file-like objects (e.g. pynio).
:param consolidate_zarr: Whether to consolidate the resulting Zarr dataset.
:param consolidate_dimension_coordinates: Whether to rewrite coordinate variables as a
single chunk. We recommend consolidating coordinate variables to avoid
many small read requests to get the coordinates in xarray.
:param xarray_open_kwargs: Extra options for opening the inputs with Xarray.
:param xarray_concat_kwargs: Extra options to pass to Xarray when concatenating
the inputs to form a chunk.
Expand All @@ -685,6 +704,7 @@ class XarrayZarrRecipe(BaseRecipe, FilePatternRecipeMixin):
cache_inputs: Optional[bool] = None
copy_input_to_local_file: bool = False
consolidate_zarr: bool = True
consolidate_dimension_coordinates: bool = True
xarray_open_kwargs: dict = field(default_factory=dict)
xarray_concat_kwargs: dict = field(default_factory=dict)
delete_input_encoding: bool = True
Expand Down Expand Up @@ -851,7 +871,10 @@ def store_chunk(self) -> Callable[[Hashable], None]:
@property
def finalize_target(self) -> Callable[[], None]:
return functools.partial(
finalize_target, target=self.target, consolidate_zarr=self.consolidate_zarr
finalize_target,
target=self.target,
consolidate_zarr=self.consolidate_zarr,
consolidate_dimension_coordinates=self.consolidate_dimension_coordinates,
)

def iter_inputs(self) -> Iterator[InputKey]:
Expand Down
5 changes: 5 additions & 0 deletions tests/recipe_tests/test_XarrayZarrRecipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest
import xarray as xr
import zarr

# need to import this way (rather than use pytest.lazy_fixture) to make it work with dask
from pytest_lazyfixture import lazy_fixture
Expand Down Expand Up @@ -274,6 +275,10 @@ def do_actual_chunks_test(
assert all([item == chunk_len for item in ds_actual.chunks[other_dim][:-1]])

ds_actual.load()
store = zarr.open_consolidated(target.get_mapper())
for dim in ds_actual.dims:
assert store[dim].chunks == ds_actual[dim].shape

xr.testing.assert_identical(ds_actual, ds_expected)


Expand Down