Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add options for zarr array definition #48

Merged
merged 3 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rechunker/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def rechunking_plan(
Original chunk shape (must be in form (5, 10, 20), no irregular chunks)
target_chunks : Tuple
Target chunk shape (must be in form (5, 10, 20), no irregular chunks)
itemsize: int
Number of bytes used to represent a single array element
max_mem : Int
Maximum permissible chunk memory size, measured in units of itemsize
consolidate_reads: bool, optional
Expand Down
69 changes: 62 additions & 7 deletions rechunker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,17 @@ def _get_dims_from_zarr_array(z_array):
return z_array.attrs["_ARRAY_DIMENSIONS"]


def _zarr_empty(shape, store_or_group, chunks, dtype, name=None):
def _zarr_empty(shape, store_or_group, chunks, dtype, name=None, **kwargs):
# wrapper that maybe creates the array within a group
if name is not None:
assert isinstance(store_or_group, zarr.hierarchy.Group)
return store_or_group.empty(name, shape=shape, chunks=chunks, dtype=dtype)
return store_or_group.empty(
name, shape=shape, chunks=chunks, dtype=dtype, **kwargs
)
else:
return zarr.empty(shape, chunks=chunks, dtype=dtype, store=store_or_group)
return zarr.empty(
shape, chunks=chunks, dtype=dtype, store=store_or_group, **kwargs
)


def _get_executor(name: str) -> Executor:
Expand Down Expand Up @@ -178,7 +182,9 @@ def rechunk(
target_chunks,
max_mem,
target_store,
target_options=None,
temp_store=None,
temp_options=None,
executor: Union[str, Executor] = "dask",
) -> Rechunked:
"""
Expand Down Expand Up @@ -212,9 +218,17 @@ def rechunk(
target_store : str, MutableMapping, or zarr.Store object
The location in which to store the final, rechunked result.
Will be passed directly to :py:meth:`zarr.creation.create`
target_options: Dict, optional
Additional keyword arguments used to create target arrays.
See :py:meth:`zarr.creation.create` for arguments available.
Must not include any of [``shape``, ``chunks``, ``dtype``, ``store``].
temp_store : str, MutableMapping, or zarr.Store object, optional
Location of temporary store for intermediate data. Can be deleted
once rechunking is complete.
temp_options: Dict, optional
Additional keyword arguments used to create intermediate arrays.
See :py:meth:`zarr.creation.create` for arguments available.
Must not include any of [``shape``, ``chunks``, ``dtype``, ``store``].
executor: str or rechunker.types.Executor
Implementation of the execution engine for copying between zarr arrays.
Supplying a custom Executor is currently even more experimental than the
Expand All @@ -228,14 +242,26 @@ def rechunk(
if isinstance(executor, str):
executor = _get_executor(executor)
copy_spec, intermediate, target = _setup_rechunk(
source, target_chunks, max_mem, target_store, temp_store
source=source,
target_chunks=target_chunks,
max_mem=max_mem,
target_store=target_store,
target_options=target_options,
temp_store=temp_store,
temp_options=temp_options,
)
plan = executor.prepare_plan(copy_spec)
return Rechunked(executor, plan, source, intermediate, target)


def _setup_rechunk(
source, target_chunks, max_mem, target_store, temp_store=None,
source,
target_chunks,
max_mem,
target_store,
target_options=None,
temp_store=None,
temp_options=None,
):
if isinstance(source, zarr.hierarchy.Group):
if not isinstance(target_chunks, dict):
Expand All @@ -257,7 +283,9 @@ def _setup_rechunk(
array_target_chunks,
max_mem,
target_group,
target_options=target_options,
temp_store_or_group=temp_group,
temp_options=temp_options,
name=array_name,
)
copy_specs.append(copy_spec)
Expand All @@ -271,7 +299,9 @@ def _setup_rechunk(
target_chunks,
max_mem,
target_store,
target_options=target_options,
temp_store_or_group=temp_store,
temp_options=temp_options,
)
intermediate = copy_spec.intermediate.array
target = copy_spec.write.array
Expand All @@ -281,14 +311,29 @@ def _setup_rechunk(
raise ValueError("Source must be a Zarr Array or Group, or a Dask Array.")


def _validate_options(options):
if not options:
return
for k in ["shape", "chunks", "dtype", "store", "name"]:
if k in options:
raise ValueError(
f"Optional array arguments must not include {k} (provided {k}={options[k]}). "
"Values for this property are managed internally."
)


def _setup_array_rechunk(
source_array,
target_chunks,
max_mem,
target_store_or_group,
target_options=None,
temp_store_or_group=None,
temp_options=None,
name=None,
) -> CopySpec:
_validate_options(target_options)
_validate_options(temp_options)
shape = source_array.shape
source_chunks = (
source_array.chunksize
Expand Down Expand Up @@ -333,7 +378,12 @@ def _setup_array_rechunk(
write_chunks = tuple(int(x) for x in write_chunks)

target_array = _zarr_empty(
shape, target_store_or_group, target_chunks, dtype, name=name
shape,
target_store_or_group,
target_chunks,
dtype,
name=name,
**(target_options or {}),
)
try:
target_array.attrs.update(source_array.attrs)
Expand All @@ -346,7 +396,12 @@ def _setup_array_rechunk(
# do intermediate store
assert temp_store_or_group is not None
int_array = _zarr_empty(
shape, temp_store_or_group, int_chunks, dtype, name=name
shape,
temp_store_or_group,
int_chunks,
dtype,
name=name,
**(temp_options or {}),
)

read_proxy = ArrayProxy(source_array, read_chunks)
Expand Down
77 changes: 68 additions & 9 deletions tests/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import importlib
import pytest

from pathlib import Path
import zarr
import dask.array as dsa
import dask
Expand Down Expand Up @@ -186,25 +187,30 @@ def test_rechunk_group(tmp_path, executor):


@pytest.fixture(params=["Array", "Group"])
def rechunked(tmp_path, request):
def rechunked_fn(tmp_path, request):
if request.param == "Group":
store_source = str(tmp_path / "source.zarr")
group = zarr.group(store_source)
group.attrs["foo"] = "bar"
# 800 byte chunks
a = group.ones("a", shape=(5, 10, 20), chunks=(1, 10, 20), dtype="f4")
a.attrs["foo"] = "bar"
b = group.ones("b", shape=(20,), chunks=(10,), dtype="f4")
b = group.ones("b", shape=(8000,), chunks=(100,), dtype="f4")
b.attrs["foo"] = "bar"

target_store = str(tmp_path / "target.zarr")
temp_store = str(tmp_path / "temp.zarr")

max_mem = 1600 # should force a two-step plan for a
target_chunks = {"a": (5, 10, 4), "b": (20,)}
max_mem = 16000 # should force a two-step plan for b
target_chunks = {"a": (5, 10, 4), "b": (4000,)}

rechunked = api.rechunk(
group, target_chunks, max_mem, target_store, temp_store=temp_store
rechunked_fn = partial(
api.rechunk,
group,
target_chunks,
max_mem,
target_store,
temp_store=temp_store,
)
else:
shape = (8000, 8000)
Expand All @@ -227,10 +233,20 @@ def rechunked(tmp_path, request):
target_store = str(tmp_path / "target.zarr")
temp_store = str(tmp_path / "temp.zarr")

rechunked = api.rechunk(
source_array, target_chunks, max_mem, target_store, temp_store=temp_store
rechunked_fn = partial(
api.rechunk,
source_array,
target_chunks,
max_mem,
target_store,
temp_store=temp_store,
)
return rechunked
return rechunked_fn


@pytest.fixture()
def rechunked(rechunked_fn):
return rechunked_fn()


def test_repr(rechunked):
Expand All @@ -241,6 +257,49 @@ def test_repr(rechunked):
assert all(thing in repr_str for thing in ["Source", "Intermediate", "Target"])


def test_rechunk_option_overwrite(rechunked_fn):
rechunked_fn().execute()
# TODO: make this match more reliable based on outcome of
# https://github.com/zarr-developers/zarr-python/issues/605
with pytest.raises(ValueError, match=r"path .* contains an array"):
rechunked_fn().execute()
rechunked = rechunked_fn(
temp_options=dict(overwrite=True), target_options=dict(overwrite=True)
)
rechunked.execute()


def test_rechunk_option_compression(rechunked_fn):
def rechunk(compressor):
rechunked = rechunked_fn(
temp_options=dict(overwrite=True, compressor=compressor),
target_options=dict(overwrite=True, compressor=compressor),
)
rechunked.execute()
return sum(
file.stat().st_size
for file in Path(rechunked._target.store.path).rglob("*")
)

size_uncompressed = rechunk(None)
size_compressed = rechunk(
zarr.Blosc(cname="zstd", clevel=9, shuffle=zarr.Blosc.SHUFFLE)
)
assert size_compressed < size_uncompressed


def test_rechunk_reserved_option(rechunked_fn):
for o in ["shape", "chunks", "dtype", "store", "name"]:
with pytest.raises(
ValueError, match=f"Optional array arguments must not include {o}"
):
rechunked_fn(temp_options={o: True})
with pytest.raises(
ValueError, match=f"Optional array arguments must not include {o}"
):
rechunked_fn(target_options={o: True})


def test_repr_html(rechunked):
rechunked._repr_html_() # no exceptions

Expand Down