Skip to content

Commit

Permalink
Allow Zarr compression to be set for intermediate files (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite authored Sep 13, 2024
1 parent 88ae813 commit a70d4ce
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def blockwise(
target_store=target_store,
target_path=target_path,
storage_options=spec.storage_options,
compressor=spec.zarr_compressor,
shape=shape,
dtype=dtype,
chunks=_chunks,
Expand Down Expand Up @@ -367,6 +368,7 @@ def general_blockwise(
target_store=target_store,
target_path=target_path,
storage_options=spec.storage_options,
compressor=spec.zarr_compressor,
shape=shape,
dtype=dtype,
chunks=chunks,
Expand Down
4 changes: 4 additions & 0 deletions cubed/primitive/blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def blockwise(
target_store: T_Store,
target_path: Optional[str] = None,
storage_options: Optional[Dict[str, Any]] = None,
compressor: Union[dict, str, None] = "default",
shape: T_Shape,
dtype: T_DType,
chunks: T_Chunks,
Expand Down Expand Up @@ -215,6 +216,7 @@ def blockwise(
target_store=target_store,
target_path=target_path,
storage_options=storage_options,
compressor=compressor,
shape=shape,
dtype=dtype,
chunks=chunks,
Expand All @@ -236,6 +238,7 @@ def general_blockwise(
target_store: T_Store,
target_path: Optional[str] = None,
storage_options: Optional[Dict[str, Any]] = None,
compressor: Union[dict, str, None] = "default",
shape: T_Shape,
dtype: T_DType,
chunks: T_Chunks,
Expand Down Expand Up @@ -297,6 +300,7 @@ def general_blockwise(
chunks=chunksize,
path=target_path,
storage_options=storage_options,
compressor=compressor,
)

func_kwargs = extra_func_kwargs or {}
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
try:
import memray
except ImportError:
memray = None
memray = None # type: ignore

sym_counter = 0

Expand Down
17 changes: 16 additions & 1 deletion cubed/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(
executor_name: Optional[str] = None,
executor_options: Optional[dict] = None,
storage_options: Union[dict, None] = None,
zarr_compressor: Union[dict, str, None] = "default",
):
"""
Specify resources available to run a computation.
Expand All @@ -42,6 +43,13 @@ def __init__(
The default executor for running computations.
storage_options : dict, optional
Storage options to be passed to fsspec.
zarr_compressor : dict or str, optional
The compressor used by Zarr for intermediate data.
If not specified, or set to ``"default"``, Zarr will use the default Blosc compressor.
If set to ``None``, compression is disabled, which can be a good option when using local storage.
Use a dictionary to configure arbitrary compression using Numcodecs. The following example specifies
Blosc compression: ``zarr_compressor={"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}``.
"""

self._work_dir = work_dir
Expand All @@ -61,6 +69,7 @@ def __init__(
self._executor = None

self._storage_options = storage_options
self._zarr_compressor = zarr_compressor

@property
def work_dir(self) -> Optional[str]:
Expand Down Expand Up @@ -97,10 +106,15 @@ def storage_options(self) -> Optional[dict]:
"""Storage options to be passed to fsspec."""
return self._storage_options

@property
def zarr_compressor(self) -> Union[dict, str, None]:
"""The compressor used by Zarr for intermediate data."""
return self._zarr_compressor

def __repr__(self) -> str:
return (
f"cubed.Spec(work_dir={self._work_dir}, allowed_mem={self._allowed_mem}, "
f"reserved_mem={self._reserved_mem}, executor={self._executor}, storage_options={self._storage_options})"
f"reserved_mem={self._reserved_mem}, executor={self._executor}, storage_options={self._storage_options}, zarr_compressor={self._zarr_compressor})"
)

def __eq__(self, other):
Expand All @@ -111,6 +125,7 @@ def __eq__(self, other):
and self.reserved_mem == other.reserved_mem
and self.executor == other.executor
and self.storage_options == other.storage_options
and self.zarr_compressor == other.zarr_compressor
)
else:
return False
Expand Down
11 changes: 7 additions & 4 deletions cubed/storage/backends/tensorstore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import math
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union

import numpy as np
import tensorstore
Expand Down Expand Up @@ -73,7 +73,7 @@ def encode_dtype(d):
return d.descr


def get_metadata(dtype, chunks):
def get_metadata(dtype, chunks, compressor):
metadata = {}
if dtype is not None:
dtype = np.dtype(dtype)
Expand All @@ -82,6 +82,8 @@ def get_metadata(dtype, chunks):
if isinstance(chunks, int):
chunks = (chunks,)
metadata["chunks"] = chunks
if compressor != "default":
metadata["compressor"] = compressor
return metadata


Expand All @@ -93,6 +95,7 @@ def open_tensorstore_array(
dtype: Optional[T_DType] = None,
chunks: Optional[T_RegularChunks] = None,
path: Optional[str] = None,
compressor: Union[dict, str, None] = "default",
**kwargs,
):
store = str(store) # TODO: check if Path or str
Expand Down Expand Up @@ -121,7 +124,7 @@ def open_tensorstore_array(
raise ValueError(f"Mode not supported: {mode}")

if dtype is None or not hasattr(dtype, "fields") or dtype.fields is None:
metadata = get_metadata(dtype, chunks)
metadata = get_metadata(dtype, chunks, compressor)
if metadata:
spec["metadata"] = metadata

Expand All @@ -140,7 +143,7 @@ def open_tensorstore_array(
spec["path"] = field_path

field_dtype, _ = dtype.fields[field]
metadata = get_metadata(field_dtype, chunks)
metadata = get_metadata(field_dtype, chunks, compressor)
if metadata:
spec["metadata"] = metadata

Expand Down
8 changes: 7 additions & 1 deletion cubed/storage/backends/zarr_python.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from typing import Optional, Union

import zarr
from numcodecs.registry import get_codec

from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store

Expand All @@ -13,14 +14,19 @@ def open_zarr_array(
dtype: Optional[T_DType] = None,
chunks: Optional[T_RegularChunks] = None,
path: Optional[str] = None,
compressor: Union[dict, str, None] = "default",
**kwargs,
):
if isinstance(compressor, dict):
compressor = get_codec(compressor)

return zarr.open_array(
store,
mode=mode,
shape=shape,
dtype=dtype,
chunks=chunks,
path=path,
compressor=compressor,
**kwargs,
)
31 changes: 31 additions & 0 deletions cubed/tests/storage/test_zarr.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import pytest
import zarr
from numcodecs.registry import get_codec

from cubed import config
from cubed.storage.backend import open_backend_array
from cubed.storage.zarr import lazy_zarr_array

ZARR_PYTHON_V3 = zarr.__version__[0] == "3"


def test_lazy_zarr_array(tmp_path):
zarr_path = tmp_path / "lazy.zarr"
Expand All @@ -14,3 +20,28 @@ def test_lazy_zarr_array(tmp_path):
arr.create()
assert zarr_path.exists()
arr.open()


@pytest.mark.skipif(
ZARR_PYTHON_V3, reason="setting zarr compressor not yet possible for Zarr Python v3"
)
@pytest.mark.parametrize(
"compressor",
[None, {"id": "zstd"}, {"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}],
)
def test_compression(tmp_path, compressor):
zarr_path = tmp_path / "lazy.zarr"

arr = lazy_zarr_array(
zarr_path, shape=(3, 3), dtype=int, chunks=(2, 2), compressor=compressor
)
arr.create()

# open with zarr python (for zarr python v2 and tensorstore)
with config.set({"storage_name": "zarr-python"}):
z = open_backend_array(zarr_path, mode="r")

if compressor is None:
assert z.compressor is None
else:
assert z.compressor == get_codec(compressor)
11 changes: 11 additions & 0 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,17 @@ def test_default_spec_config_override():
assert_array_equal(b.compute(), -np.ones((20000, 1000)))


@pytest.mark.parametrize(
"compressor",
[None, {"id": "zstd"}, {"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}],
)
def test_spec_compressor(tmp_path, compressor):
spec = cubed.Spec(tmp_path, allowed_mem=100000, zarr_compressor=compressor)
a = xp.ones((3, 3), chunks=(2, 2), spec=spec)
b = xp.negative(a)
assert_array_equal(b.compute(), -np.ones((3, 3)))


def test_different_specs(tmp_path):
spec1 = cubed.Spec(tmp_path, allowed_mem=100000)
spec2 = cubed.Spec(tmp_path, allowed_mem=200000)
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-networkx.*]
ignore_missing_imports = True
[mypy-numcodecs.*]
ignore_missing_imports = True
[mypy-numpy.*]
ignore_missing_imports = True
[mypy-pandas.*]
Expand Down

0 comments on commit a70d4ce

Please sign in to comment.