From a70d4ce459b4c7352f21031be19bc034a8bc744f Mon Sep 17 00:00:00 2001 From: Tom White Date: Fri, 13 Sep 2024 08:36:25 +0100 Subject: [PATCH] Allow Zarr compression to be set for intermediate files (#572) --- cubed/core/ops.py | 2 ++ cubed/primitive/blockwise.py | 4 ++++ cubed/runtime/utils.py | 2 +- cubed/spec.py | 17 ++++++++++++++- cubed/storage/backends/tensorstore.py | 11 ++++++---- cubed/storage/backends/zarr_python.py | 8 ++++++- cubed/tests/storage/test_zarr.py | 31 +++++++++++++++++++++++++++ cubed/tests/test_core.py | 11 ++++++++++ setup.cfg | 2 ++ 9 files changed, 81 insertions(+), 7 deletions(-) diff --git a/cubed/core/ops.py b/cubed/core/ops.py index 36fd9dc0..bea16dcb 100644 --- a/cubed/core/ops.py +++ b/cubed/core/ops.py @@ -304,6 +304,7 @@ def blockwise( target_store=target_store, target_path=target_path, storage_options=spec.storage_options, + compressor=spec.zarr_compressor, shape=shape, dtype=dtype, chunks=_chunks, @@ -367,6 +368,7 @@ def general_blockwise( target_store=target_store, target_path=target_path, storage_options=spec.storage_options, + compressor=spec.zarr_compressor, shape=shape, dtype=dtype, chunks=chunks, diff --git a/cubed/primitive/blockwise.py b/cubed/primitive/blockwise.py index 2190deee..b4571149 100644 --- a/cubed/primitive/blockwise.py +++ b/cubed/primitive/blockwise.py @@ -128,6 +128,7 @@ def blockwise( target_store: T_Store, target_path: Optional[str] = None, storage_options: Optional[Dict[str, Any]] = None, + compressor: Union[dict, str, None] = "default", shape: T_Shape, dtype: T_DType, chunks: T_Chunks, @@ -215,6 +216,7 @@ def blockwise( target_store=target_store, target_path=target_path, storage_options=storage_options, + compressor=compressor, shape=shape, dtype=dtype, chunks=chunks, @@ -236,6 +238,7 @@ def general_blockwise( target_store: T_Store, target_path: Optional[str] = None, storage_options: Optional[Dict[str, Any]] = None, + compressor: Union[dict, str, None] = "default", shape: T_Shape, dtype: T_DType, chunks: T_Chunks, @@ -297,6 +300,7 @@ def general_blockwise( chunks=chunksize, path=target_path, storage_options=storage_options, + compressor=compressor, ) func_kwargs = extra_func_kwargs or {} diff --git a/cubed/runtime/utils.py b/cubed/runtime/utils.py index a3bbbf98..4591842a 100644 --- a/cubed/runtime/utils.py +++ b/cubed/runtime/utils.py @@ -10,7 +10,7 @@ try: import memray except ImportError: - memray = None + memray = None # type: ignore sym_counter = 0 diff --git a/cubed/spec.py b/cubed/spec.py index 39319859..3a81342c 100644 --- a/cubed/spec.py +++ b/cubed/spec.py @@ -21,6 +21,7 @@ def __init__( executor_name: Optional[str] = None, executor_options: Optional[dict] = None, storage_options: Union[dict, None] = None, + zarr_compressor: Union[dict, str, None] = "default", ): """ Specify resources available to run a computation. @@ -42,6 +43,13 @@ def __init__( The default executor for running computations. storage_options : dict, optional Storage options to be passed to fsspec. + zarr_compressor : dict or str, optional + The compressor used by Zarr for intermediate data. + + If not specified, or set to ``"default"``, Zarr will use the default Blosc compressor. + If set to ``None``, compression is disabled, which can be a good option when using local storage. + Use a dictionary to configure arbitrary compression using Numcodecs. The following example specifies + Blosc compression: ``zarr_compressor={"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}``. """ self._work_dir = work_dir @@ -61,6 +69,7 @@ def __init__( self._executor = None self._storage_options = storage_options + self._zarr_compressor = zarr_compressor @property def work_dir(self) -> Optional[str]: @@ -97,10 +106,15 @@ def storage_options(self) -> Optional[dict]: """Storage options to be passed to fsspec.""" return self._storage_options + @property + def zarr_compressor(self) -> Union[dict, str, None]: + """The compressor used by Zarr for intermediate data.""" + return self._zarr_compressor + def __repr__(self) -> str: return ( f"cubed.Spec(work_dir={self._work_dir}, allowed_mem={self._allowed_mem}, " - f"reserved_mem={self._reserved_mem}, executor={self._executor}, storage_options={self._storage_options})" + f"reserved_mem={self._reserved_mem}, executor={self._executor}, storage_options={self._storage_options}, zarr_compressor={self._zarr_compressor})" ) def __eq__(self, other): @@ -111,6 +125,7 @@ def __eq__(self, other): and self.reserved_mem == other.reserved_mem and self.executor == other.executor and self.storage_options == other.storage_options + and self.zarr_compressor == other.zarr_compressor ) else: return False diff --git a/cubed/storage/backends/tensorstore.py b/cubed/storage/backends/tensorstore.py index efd2adad..77c50503 100644 --- a/cubed/storage/backends/tensorstore.py +++ b/cubed/storage/backends/tensorstore.py @@ -1,6 +1,6 @@ import dataclasses import math -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union import numpy as np import tensorstore @@ -73,7 +73,7 @@ def encode_dtype(d): return d.descr -def get_metadata(dtype, chunks): +def get_metadata(dtype, chunks, compressor): metadata = {} if dtype is not None: dtype = np.dtype(dtype) @@ -82,6 +82,8 @@ def get_metadata(dtype, chunks): if isinstance(chunks, int): chunks = (chunks,) metadata["chunks"] = chunks + if compressor != "default": + metadata["compressor"] = compressor return metadata @@ -93,6 +95,7 @@ def open_tensorstore_array( dtype: Optional[T_DType] = None, chunks: Optional[T_RegularChunks] = None, path: Optional[str] = None, + compressor: Union[dict, str, None] = "default", **kwargs, ): store = str(store) # TODO: check if Path or str @@ -121,7 +124,7 @@ def open_tensorstore_array( raise ValueError(f"Mode not supported: {mode}") if dtype is None or not hasattr(dtype, "fields") or dtype.fields is None: - metadata = get_metadata(dtype, chunks) + metadata = get_metadata(dtype, chunks, compressor) if metadata: spec["metadata"] = metadata @@ -140,7 +143,7 @@ def open_tensorstore_array( spec["path"] = field_path field_dtype, _ = dtype.fields[field] - metadata = get_metadata(field_dtype, chunks) + metadata = get_metadata(field_dtype, chunks, compressor) if metadata: spec["metadata"] = metadata diff --git a/cubed/storage/backends/zarr_python.py b/cubed/storage/backends/zarr_python.py index 47287d99..ae0383cf 100644 --- a/cubed/storage/backends/zarr_python.py +++ b/cubed/storage/backends/zarr_python.py @@ -1,6 +1,7 @@ -from typing import Optional +from typing import Optional, Union import zarr +from numcodecs.registry import get_codec from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store @@ -13,8 +14,12 @@ def open_zarr_array( dtype: Optional[T_DType] = None, chunks: Optional[T_RegularChunks] = None, path: Optional[str] = None, + compressor: Union[dict, str, None] = "default", **kwargs, ): + if isinstance(compressor, dict): + compressor = get_codec(compressor) + return zarr.open_array( store, mode=mode, @@ -22,5 +27,6 @@ def open_zarr_array( dtype=dtype, chunks=chunks, path=path, + compressor=compressor, **kwargs, ) diff --git a/cubed/tests/storage/test_zarr.py b/cubed/tests/storage/test_zarr.py index ce27dff8..29532d7d 100644 --- a/cubed/tests/storage/test_zarr.py +++ b/cubed/tests/storage/test_zarr.py @@ -1,7 +1,13 @@ import pytest +import zarr +from numcodecs.registry import get_codec +from cubed import config +from cubed.storage.backend import open_backend_array from cubed.storage.zarr import lazy_zarr_array +ZARR_PYTHON_V3 = zarr.__version__[0] == "3" + def test_lazy_zarr_array(tmp_path): zarr_path = tmp_path / "lazy.zarr" @@ -14,3 +20,28 @@ def test_lazy_zarr_array(tmp_path): arr.create() assert zarr_path.exists() arr.open() + + +@pytest.mark.skipif( + ZARR_PYTHON_V3, reason="setting zarr compressor not yet possible for Zarr Python v3" +) +@pytest.mark.parametrize( + "compressor", + [None, {"id": "zstd"}, {"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}], +) +def test_compression(tmp_path, compressor): + zarr_path = tmp_path / "lazy.zarr" + + arr = lazy_zarr_array( + zarr_path, shape=(3, 3), dtype=int, chunks=(2, 2), compressor=compressor + ) + arr.create() + + # open with zarr python (for zarr python v2 and tensorstore) + with config.set({"storage_name": "zarr-python"}): + z = open_backend_array(zarr_path, mode="r") + + if compressor is None: + assert z.compressor is None + else: + assert z.compressor == get_codec(compressor) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 715b81b7..93400bfb 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -349,6 +349,17 @@ def test_default_spec_config_override(): assert_array_equal(b.compute(), -np.ones((20000, 1000))) +@pytest.mark.parametrize( + "compressor", + [None, {"id": "zstd"}, {"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}], +) +def test_spec_compressor(tmp_path, compressor): + spec = cubed.Spec(tmp_path, allowed_mem=100000, zarr_compressor=compressor) + a = xp.ones((3, 3), chunks=(2, 2), spec=spec) + b = xp.negative(a) + assert_array_equal(b.compute(), -np.ones((3, 3))) + + def test_different_specs(tmp_path): spec1 = cubed.Spec(tmp_path, allowed_mem=100000) spec2 = cubed.Spec(tmp_path, allowed_mem=200000) diff --git a/setup.cfg b/setup.cfg index 0a761bc2..4437a544 100644 --- a/setup.cfg +++ b/setup.cfg @@ -50,6 +50,8 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-networkx.*] ignore_missing_imports = True +[mypy-numcodecs.*] +ignore_missing_imports = True [mypy-numpy.*] ignore_missing_imports = True [mypy-pandas.*]