From b4c2a1981c8ba501bac87b02466f2130d05895b2 Mon Sep 17 00:00:00 2001 From: Joe Hamman Date: Tue, 5 Dec 2023 10:22:13 +0100 Subject: [PATCH] Bootstrap v3 branch with zarrita (#1584) * Pull Zarrita into Zarr-Python @ 78274781ad64aef95772eb4b083f7ea9b7d03d06 No code changes to Zarrita were made. * apply zarr lint rules * zarrita -> v3 * v3/abc [wip] * use abcs plus implementation notes --- zarr/v3/__init__.py | 40 +++ zarr/v3/abc/__init__.py | 0 zarr/v3/abc/array.py | 140 ++++++++++ zarr/v3/abc/codec.py | 84 ++++++ zarr/v3/abc/group.py | 86 +++++++ zarr/v3/abc/store.py | 115 +++++++++ zarr/v3/array.py | 550 +++++++++++++++++++++++++++++++++++++++ zarr/v3/array_v2.py | 552 ++++++++++++++++++++++++++++++++++++++++ zarr/v3/codecs.py | 514 +++++++++++++++++++++++++++++++++++++ zarr/v3/common.py | 158 ++++++++++++ zarr/v3/group.py | 179 +++++++++++++ zarr/v3/group_v2.py | 218 ++++++++++++++++ zarr/v3/indexing.py | 208 +++++++++++++++ zarr/v3/metadata.py | 339 ++++++++++++++++++++++++ zarr/v3/sharding.py | 516 +++++++++++++++++++++++++++++++++++++ zarr/v3/store.py | 304 ++++++++++++++++++++++ zarr/v3/sync.py | 87 +++++++ 17 files changed, 4090 insertions(+) create mode 100644 zarr/v3/__init__.py create mode 100644 zarr/v3/abc/__init__.py create mode 100644 zarr/v3/abc/array.py create mode 100644 zarr/v3/abc/codec.py create mode 100644 zarr/v3/abc/group.py create mode 100644 zarr/v3/abc/store.py create mode 100644 zarr/v3/array.py create mode 100644 zarr/v3/array_v2.py create mode 100644 zarr/v3/codecs.py create mode 100644 zarr/v3/common.py create mode 100644 zarr/v3/group.py create mode 100644 zarr/v3/group_v2.py create mode 100644 zarr/v3/indexing.py create mode 100644 zarr/v3/metadata.py create mode 100644 zarr/v3/sharding.py create mode 100644 zarr/v3/store.py create mode 100644 zarr/v3/sync.py diff --git a/zarr/v3/__init__.py b/zarr/v3/__init__.py new file mode 100644 index 0000000000..bbf5aa0359 --- /dev/null +++ b/zarr/v3/__init__.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from typing import Union + +import zarr.v3.codecs # noqa: F401 +from zarr.v3.array import Array # noqa: F401 +from zarr.v3.array_v2 import ArrayV2 # noqa: F401 +from zarr.v3.group import Group # noqa: F401 +from zarr.v3.group_v2 import GroupV2 # noqa: F401 +from zarr.v3.metadata import RuntimeConfiguration, runtime_configuration # noqa: F401 +from zarr.v3.store import ( # noqa: F401 + LocalStore, + RemoteStore, + Store, + StoreLike, + StorePath, + make_store_path, +) +from zarr.v3.sync import sync as _sync + + +async def open_auto_async( + store: StoreLike, + runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), +) -> Union[Array, ArrayV2, Group, GroupV2]: + store_path = make_store_path(store) + try: + return await Group.open_or_array(store_path, runtime_configuration=runtime_configuration_) + except KeyError: + return await GroupV2.open_or_array(store_path, runtime_configuration_) + + +def open_auto( + store: StoreLike, + runtime_configuration_: RuntimeConfiguration = RuntimeConfiguration(), +) -> Union[Array, ArrayV2, Group, GroupV2]: + return _sync( + open_auto_async(store, runtime_configuration_), + runtime_configuration_.asyncio_loop, + ) diff --git a/zarr/v3/abc/__init__.py b/zarr/v3/abc/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/zarr/v3/abc/array.py b/zarr/v3/abc/array.py new file mode 100644 index 0000000000..976aa48618 --- /dev/null +++ b/zarr/v3/abc/array.py @@ -0,0 +1,140 @@ +from __future__ import annotations +from abc import abstractproperty, abstractmethod, ABC +from typing import Tuple, Any, Dict + +import numpy as np + +from zarr.v3.abc.store import ReadStore, WriteStore +from zarr.v3.common import Selection + + +class BaseArray(ABC): + @abstractproperty + def store_path(self) -> str: # TODO: rename to `path`? + """Path to this array in the underlying store.""" + ... + + @abstractproperty + def dtype(self) -> np.dtype: + """Data type of the array elements. + + Returns + ------- + dtype + array data type + """ + ... + + @abstractproperty + def ndim(self) -> int: + """Number of array dimensions (axes). + + Returns + ------- + int + number of array dimensions (axes) + """ + ... + + @abstractproperty + def shape(self) -> Tuple[int, ...]: + """Array dimensions. + + Returns + ------- + tuple of int + array dimensions + """ + ... + + @abstractproperty + def size(self) -> int: + """Number of elements in the array. + + Returns + ------- + int + number of elements in an array. + """ + + @abstractproperty + def attrs(self) -> Dict[str, Any]: + """Array attributes. + + Returns + ------- + dict + user defined attributes + """ + ... + + @abstractproperty + def info(self) -> Any: + """Report some diagnostic information about the array. + + Returns + ------- + out + """ + ... + + +class AsynchronousArray(BaseArray): + """This class can be implemented as a v2 or v3 array""" + + @classmethod + @abstractmethod + async def from_json(cls, zarr_json: Any, store: ReadStore) -> AsynchronousArray: + ... + + @classmethod + @abstractmethod + async def open(cls, store: ReadStore) -> AsynchronousArray: + ... + + @classmethod + @abstractmethod + async def create(cls, store: WriteStore, *, shape, **kwargs) -> AsynchronousArray: + ... + + @abstractmethod + async def getitem(self, selection: Selection): + ... + + @abstractmethod + async def setitem(self, selection: Selection, value: np.ndarray) -> None: + ... + + +class SynchronousArray(BaseArray): + """ + This class can be implemented as a v2 or v3 array + """ + + @classmethod + @abstractmethod + def from_json(cls, zarr_json: Any, store: ReadStore) -> SynchronousArray: + ... + + @classmethod + @abstractmethod + def open(cls, store: ReadStore) -> SynchronousArray: + ... + + @classmethod + @abstractmethod + def create(cls, store: WriteStore, *, shape, **kwargs) -> SynchronousArray: + ... + + @abstractmethod + def __getitem__(self, selection: Selection): # TODO: type as np.ndarray | scalar + ... + + @abstractmethod + def __setitem__(self, selection: Selection, value: np.ndarray) -> None: + ... + + # some day ;) + # @property + # def __array_api_version__(self) -> str: + # return "2022.12" diff --git a/zarr/v3/abc/codec.py b/zarr/v3/abc/codec.py new file mode 100644 index 0000000000..f84fc74af9 --- /dev/null +++ b/zarr/v3/abc/codec.py @@ -0,0 +1,84 @@ +# Notes: +# 1. These are missing methods described in the spec. I expected to see these method definitions: +# def compute_encoded_representation_type(self, decoded_representation_type): +# def encode(self, decoded_value): +# def decode(self, encoded_value, decoded_representation_type): +# def partial_decode(self, input_handle, decoded_representation_type, decoded_regions): +# def compute_encoded_size(self, input_size): +# 2. Understand why array metadata is included on all codecs + + +from __future__ import annotations + +from abc import abstractmethod, ABC +from typing import TYPE_CHECKING, Optional + +import numpy as np + +from zarr.v3.common import BytesLike + + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + + +class Codec(ABC): + supports_partial_decode: bool + supports_partial_encode: bool + is_fixed_size: bool + array_metadata: CoreArrayMetadata + + @abstractmethod + def compute_encoded_size(self, input_byte_length: int) -> int: + pass + + def resolve_metadata(self) -> CoreArrayMetadata: + return self.array_metadata + + +class ArrayArrayCodec(Codec): + @abstractmethod + async def decode( + self, + chunk_array: np.ndarray, + ) -> np.ndarray: + pass + + @abstractmethod + async def encode( + self, + chunk_array: np.ndarray, + ) -> Optional[np.ndarray]: + pass + + +class ArrayBytesCodec(Codec): + @abstractmethod + async def decode( + self, + chunk_array: BytesLike, + ) -> np.ndarray: + pass + + @abstractmethod + async def encode( + self, + chunk_array: np.ndarray, + ) -> Optional[BytesLike]: + pass + + +class BytesBytesCodec(Codec): + @abstractmethod + async def decode( + self, + chunk_array: BytesLike, + ) -> BytesLike: + pass + + @abstractmethod + async def encode( + self, + chunk_array: BytesLike, + ) -> Optional[BytesLike]: + pass diff --git a/zarr/v3/abc/group.py b/zarr/v3/abc/group.py new file mode 100644 index 0000000000..02de819894 --- /dev/null +++ b/zarr/v3/abc/group.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from abc import abstractproperty, ABC +from collections.abc import MutableMapping +from typing import Dict, Any + + +class BaseGroup(ABC): + @abstractproperty + def attrs(self) -> Dict[str, Any]: + """User-defined attributes.""" + ... + + @abstractproperty + def info(self) -> Any: # TODO: type this later + """Return diagnostic information about the group.""" + ... + + +class AsynchronousGroup(BaseGroup): + pass + # TODO: (considering the following api) + # store_path (rename to path?) + # nchildren - number of child groups + arrays + # children (async iterator) + # contains - check if child exists + # getitem - get child + # group_keys (async iterator) + # groups (async iterator) + # array_keys (async iterator) + # arrays (async iterator) + # visit + # visitkeys + # visitvalues + # tree + # create_group + # require_group + # create_groups + # require_groups + # create_dataset + # require_dataset + # create + # empty + # zeros + # ones + # full + # array + # empty_like + # zeros_like + # ones_like + # full_like + # move + + +class SynchronousGroup(BaseGroup, MutableMapping): + # TODO - think about if we want to keep the MutableMapping abstraction or + pass + # store_path (rename to path?) + # __enter__ + # __exit__ + # group_keys + # groups + # array_keys + # arrays + # visit + # visitkeys + # visitvalues + # visititems + # tree + # create_group + # require_group + # create_groups + # require_groups + # create_dataset + # require_dataset + # create + # empty + # zeros + # ones + # full + # array + # empty_like + # zeros_like + # ones_like + # full_like + # move diff --git a/zarr/v3/abc/store.py b/zarr/v3/abc/store.py new file mode 100644 index 0000000000..5469cafe6d --- /dev/null +++ b/zarr/v3/abc/store.py @@ -0,0 +1,115 @@ +from abc import abstractmethod, ABC + +from typing import List, Tuple + + +class Store(ABC): + pass + + +class ReadStore(Store): + @abstractmethod + async def get(self, key: str) -> bytes: + """Retrieve the value associated with a given key. + + Parameters + ---------- + key : str + + Returns + ------- + bytes + """ + ... + + @abstractmethod + async def get_partial_values(self, key_ranges: List[Tuple[str, int]]) -> bytes: + """Retrieve possibly partial values from given key_ranges. + + Parameters + ---------- + key_ranges : list[tuple[str, int]] + Ordered set of key, range pairs, a key may occur multiple times with different ranges + + Returns + ------- + list[bytes] + list of values, in the order of the key_ranges, may contain null/none for missing keys + """ + ... + + +class WriteStore(ReadStore): + @abstractmethod + async def set(self, key: str, value: bytes) -> None: + """Store a (key, value) pair. + + Parameters + ---------- + key : str + value : bytes + """ + ... + + @abstractmethod + async def set_partial_values(self, key_start_values: List[Tuple[str, int, bytes]]) -> None: + """Store values at a given key, starting at byte range_start. + + Parameters + ---------- + key_start_values : list[tuple[str, int, bytes]] + set of key, range_start, values triples, a key may occur multiple times with different + range_starts, range_starts (considering the length of the respective values) must not + specify overlapping ranges for the same key + """ + ... + + +class ListMixin: + @abstractmethod + async def list(self) -> List[str]: + """Retrieve all keys in the store. + + Returns + ------- + list[str] + """ + ... + + @abstractmethod + async def list_prefix(self, prefix: str) -> List[str]: + """Retrieve all keys in the store. + + Parameters + ---------- + prefix : str + + Returns + ------- + list[str] + """ + ... + + @abstractmethod + async def list_dir(self, prefix: str) -> List[str]: + """ + Retrieve all keys and prefixes with a given prefix and which do not contain the character + “/” after the given prefix. + + Parameters + ---------- + prefix : str + + Returns + ------- + list[str] + """ + ... + + +class ReadListStore(ReadStore, ListMixin): + pass + + +class WriteListStore(WriteStore, ListMixin): + pass diff --git a/zarr/v3/array.py b/zarr/v3/array.py new file mode 100644 index 0000000000..3c0d7eba5c --- /dev/null +++ b/zarr/v3/array.py @@ -0,0 +1,550 @@ +# Notes on what I've changed here: +# 1. Split Array into AsyncArray and Array +# 2. Inherit from abc (SynchronousArray, AsynchronousArray) +# 3. Added .size and .attrs methods +# 4. Temporarily disabled the creation of ArrayV2 +# 5. Added from_json to AsyncArray + +# Questions to consider: +# 1. Was splitting the array into two classes really necessary? +# 2. Do we really need runtime_configuration? Specifically, the asyncio_loop seems problematic + +from __future__ import annotations + +import json +from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union + +import numpy as np +from attr import evolve, frozen + +from zarr.v3.abc.array import SynchronousArray, AsynchronousArray + +# from zarr.v3.array_v2 import ArrayV2 +from zarr.v3.codecs import CodecMetadata, CodecPipeline, bytes_codec +from zarr.v3.common import ( + ZARR_JSON, + ChunkCoords, + Selection, + SliceSelection, + concurrent_map, +) +from zarr.v3.indexing import BasicIndexer, all_chunk_coords, is_total_slice +from zarr.v3.metadata import ( + ArrayMetadata, + DataType, + DefaultChunkKeyEncodingConfigurationMetadata, + DefaultChunkKeyEncodingMetadata, + RegularChunkGridConfigurationMetadata, + RegularChunkGridMetadata, + RuntimeConfiguration, + V2ChunkKeyEncodingConfigurationMetadata, + V2ChunkKeyEncodingMetadata, + dtype_to_data_type, +) +from zarr.v3.sharding import ShardingCodec +from zarr.v3.store import StoreLike, StorePath, make_store_path +from zarr.v3.sync import sync + + +@frozen +class AsyncArray(AsynchronousArray): + metadata: ArrayMetadata + store_path: StorePath + runtime_configuration: RuntimeConfiguration + codec_pipeline: CodecPipeline + + @classmethod + async def create( + cls, + store: StoreLike, + *, + shape: ChunkCoords, + dtype: Union[str, np.dtype], + chunk_shape: ChunkCoords, + fill_value: Optional[Any] = None, + chunk_key_encoding: Union[ + Tuple[Literal["default"], Literal[".", "/"]], + Tuple[Literal["v2"], Literal[".", "/"]], + ] = ("default", "/"), + codecs: Optional[Iterable[CodecMetadata]] = None, + dimension_names: Optional[Iterable[str]] = None, + attributes: Optional[Dict[str, Any]] = None, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + exists_ok: bool = False, + ) -> AsyncArray: + store_path = make_store_path(store) + if not exists_ok: + assert not await (store_path / ZARR_JSON).exists_async() + + data_type = ( + DataType[dtype] if isinstance(dtype, str) else DataType[dtype_to_data_type[dtype.str]] + ) + + codecs = list(codecs) if codecs is not None else [bytes_codec()] + + if fill_value is None: + if data_type == DataType.bool: + fill_value = False + else: + fill_value = 0 + + metadata = ArrayMetadata( + shape=shape, + data_type=data_type, + chunk_grid=RegularChunkGridMetadata( + configuration=RegularChunkGridConfigurationMetadata(chunk_shape=chunk_shape) + ), + chunk_key_encoding=( + V2ChunkKeyEncodingMetadata( + configuration=V2ChunkKeyEncodingConfigurationMetadata( + separator=chunk_key_encoding[1] + ) + ) + if chunk_key_encoding[0] == "v2" + else DefaultChunkKeyEncodingMetadata( + configuration=DefaultChunkKeyEncodingConfigurationMetadata( + separator=chunk_key_encoding[1] + ) + ) + ), + fill_value=fill_value, + codecs=codecs, + dimension_names=tuple(dimension_names) if dimension_names else None, + attributes=attributes or {}, + ) + runtime_configuration = runtime_configuration or RuntimeConfiguration() + + array = cls( + metadata=metadata, + store_path=store_path, + runtime_configuration=runtime_configuration, + codec_pipeline=CodecPipeline.from_metadata( + metadata.codecs, metadata.get_core_metadata(runtime_configuration) + ), + ) + + await array._save_metadata() + return array + + @classmethod + def from_json( + cls, + store_path: StorePath, + zarr_json: Any, + runtime_configuration: RuntimeConfiguration, + ) -> AsyncArray: + metadata = ArrayMetadata.from_json(zarr_json) + async_array = cls( + metadata=metadata, + store_path=store_path, + runtime_configuration=runtime_configuration, + codec_pipeline=CodecPipeline.from_metadata( + metadata.codecs, metadata.get_core_metadata(runtime_configuration) + ), + ) + async_array._validate_metadata() + return async_array + + @classmethod + async def open( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> AsyncArray: + store_path = make_store_path(store) + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + assert zarr_json_bytes is not None + return cls.from_json( + store_path, + json.loads(zarr_json_bytes), + runtime_configuration=runtime_configuration, + ) + + @classmethod + async def open_auto( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> AsyncArray: # TODO: Union[AsyncArray, ArrayV2] + store_path = make_store_path(store) + v3_metadata_bytes = await (store_path / ZARR_JSON).get_async() + if v3_metadata_bytes is not None: + return cls.from_json( + store_path, + json.loads(v3_metadata_bytes), + runtime_configuration=runtime_configuration or RuntimeConfiguration(), + ) + else: + raise ValueError("no v2 support yet") + # return await ArrayV2.open_async(store_path) + + @property + def ndim(self) -> int: + return len(self.metadata.shape) + + @property + def shape(self) -> ChunkCoords: + return self.metadata.shape + + @property + def size(self) -> int: + return np.prod(self.metadata.shape) + + @property + def dtype(self) -> np.dtype: + return self.metadata.dtype + + @property + def attrs(self) -> dict: + return self.metadata.attributes + + async def getitem(self, selection: Selection): + indexer = BasicIndexer( + selection, + shape=self.metadata.shape, + chunk_shape=self.metadata.chunk_grid.configuration.chunk_shape, + ) + + # setup output array + out = np.zeros( + indexer.shape, + dtype=self.metadata.dtype, + order=self.runtime_configuration.order, + ) + + # reading chunks and decoding them + await concurrent_map( + [ + (chunk_coords, chunk_selection, out_selection, out) + for chunk_coords, chunk_selection, out_selection in indexer + ], + self._read_chunk, + self.runtime_configuration.concurrency, + ) + + if out.shape: + return out + else: + return out[()] + + async def _save_metadata(self) -> None: + self._validate_metadata() + + await (self.store_path / ZARR_JSON).set_async(self.metadata.to_bytes()) + + def _validate_metadata(self) -> None: + assert len(self.metadata.shape) == len( + self.metadata.chunk_grid.configuration.chunk_shape + ), "`chunk_shape` and `shape` need to have the same number of dimensions." + assert self.metadata.dimension_names is None or len(self.metadata.shape) == len( + self.metadata.dimension_names + ), "`dimension_names` and `shape` need to have the same number of dimensions." + assert self.metadata.fill_value is not None, "`fill_value` is required." + + async def _read_chunk( + self, + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + out: np.ndarray, + ): + chunk_key_encoding = self.metadata.chunk_key_encoding + chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords) + store_path = self.store_path / chunk_key + + if len(self.codec_pipeline.codecs) == 1 and isinstance( + self.codec_pipeline.codecs[0], ShardingCodec + ): + chunk_array = await self.codec_pipeline.codecs[0].decode_partial( + store_path, chunk_selection + ) + if chunk_array is not None: + out[out_selection] = chunk_array + else: + out[out_selection] = self.metadata.fill_value + else: + chunk_bytes = await store_path.get_async() + if chunk_bytes is not None: + chunk_array = await self.codec_pipeline.decode(chunk_bytes) + tmp = chunk_array[chunk_selection] + out[out_selection] = tmp + else: + out[out_selection] = self.metadata.fill_value + + async def setitem(self, selection: Selection, value: np.ndarray) -> None: + chunk_shape = self.metadata.chunk_grid.configuration.chunk_shape + indexer = BasicIndexer( + selection, + shape=self.metadata.shape, + chunk_shape=chunk_shape, + ) + + sel_shape = indexer.shape + + # check value shape + if np.isscalar(value): + # setting a scalar value + pass + else: + if not hasattr(value, "shape"): + value = np.asarray(value, self.metadata.dtype) + assert value.shape == sel_shape + if value.dtype.name != self.metadata.dtype.name: + value = value.astype(self.metadata.dtype, order="A") + + # merging with existing data and encoding chunks + await concurrent_map( + [ + ( + value, + chunk_shape, + chunk_coords, + chunk_selection, + out_selection, + ) + for chunk_coords, chunk_selection, out_selection in indexer + ], + self._write_chunk, + self.runtime_configuration.concurrency, + ) + + async def _write_chunk( + self, + value: np.ndarray, + chunk_shape: ChunkCoords, + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + ): + chunk_key_encoding = self.metadata.chunk_key_encoding + chunk_key = chunk_key_encoding.encode_chunk_key(chunk_coords) + store_path = self.store_path / chunk_key + + if is_total_slice(chunk_selection, chunk_shape): + # write entire chunks + if np.isscalar(value): + chunk_array = np.empty( + chunk_shape, + dtype=self.metadata.dtype, + ) + chunk_array.fill(value) + else: + chunk_array = value[out_selection] + await self._write_chunk_to_store(store_path, chunk_array) + + elif len(self.codec_pipeline.codecs) == 1 and isinstance( + self.codec_pipeline.codecs[0], ShardingCodec + ): + sharding_codec = self.codec_pipeline.codecs[0] + # print("encode_partial", chunk_coords, chunk_selection, repr(self)) + await sharding_codec.encode_partial( + store_path, + value[out_selection], + chunk_selection, + ) + else: + # writing partial chunks + # read chunk first + chunk_bytes = await store_path.get_async() + + # merge new value + if chunk_bytes is None: + chunk_array = np.empty( + chunk_shape, + dtype=self.metadata.dtype, + ) + chunk_array.fill(self.metadata.fill_value) + else: + chunk_array = ( + await self.codec_pipeline.decode(chunk_bytes) + ).copy() # make a writable copy + chunk_array[chunk_selection] = value[out_selection] + + await self._write_chunk_to_store(store_path, chunk_array) + + async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray): + if np.all(chunk_array == self.metadata.fill_value): + # chunks that only contain fill_value will be removed + await store_path.delete_async() + else: + chunk_bytes = await self.codec_pipeline.encode(chunk_array) + if chunk_bytes is None: + await store_path.delete_async() + else: + await store_path.set_async(chunk_bytes) + + async def resize(self, new_shape: ChunkCoords) -> Array: + assert len(new_shape) == len(self.metadata.shape) + new_metadata = evolve(self.metadata, shape=new_shape) + + # Remove all chunks outside of the new shape + chunk_shape = self.metadata.chunk_grid.configuration.chunk_shape + chunk_key_encoding = self.metadata.chunk_key_encoding + old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) + new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) + + async def _delete_key(key: str) -> None: + await (self.store_path / key).delete_async() + + await concurrent_map( + [ + (chunk_key_encoding.encode_chunk_key(chunk_coords),) + for chunk_coords in old_chunk_coords.difference(new_chunk_coords) + ], + _delete_key, + self.runtime_configuration.concurrency, + ) + + # Write new metadata + await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) + return evolve(self, metadata=new_metadata) + + async def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: + new_metadata = evolve(self.metadata, attributes=new_attributes) + + # Write new metadata + await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) + return evolve(self, metadata=new_metadata) + + def __repr__(self): + return f"" + + async def info(self): + return NotImplemented + + +@frozen +class Array(SynchronousArray): + _async_array: AsyncArray + + @classmethod + def create( + cls, + store: StoreLike, + *, + shape: ChunkCoords, + dtype: Union[str, np.dtype], + chunk_shape: ChunkCoords, + fill_value: Optional[Any] = None, + chunk_key_encoding: Union[ + Tuple[Literal["default"], Literal[".", "/"]], + Tuple[Literal["v2"], Literal[".", "/"]], + ] = ("default", "/"), + codecs: Optional[Iterable[CodecMetadata]] = None, + dimension_names: Optional[Iterable[str]] = None, + attributes: Optional[Dict[str, Any]] = None, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + exists_ok: bool = False, + ) -> Array: + async_array = sync( + AsyncArray.create( + store=store, + shape=shape, + dtype=dtype, + chunk_shape=chunk_shape, + fill_value=fill_value, + chunk_key_encoding=chunk_key_encoding, + codecs=codecs, + dimension_names=dimension_names, + attributes=attributes, + runtime_configuration=runtime_configuration, + exists_ok=exists_ok, + ), + runtime_configuration.asyncio_loop, + ) + return cls(async_array) + + @classmethod + def from_json( + cls, + store_path: StorePath, + zarr_json: Any, + runtime_configuration: RuntimeConfiguration, + ) -> Array: + async_array = AsyncArray.from_json( + store_path=store_path, zarr_json=zarr_json, runtime_configuration=runtime_configuration + ) + return cls(async_array) + + @classmethod + def open( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Array: + + async_array = sync( + AsyncArray.open(store, runtime_configuration=runtime_configuration), + runtime_configuration.asyncio_loop, + ) + async_array._validate_metadata() + return cls(async_array) + + @classmethod + def open_auto( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Array: # TODO: Union[Array, ArrayV2]: + async_array = sync( + AsyncArray.open_auto(store, runtime_configuration), + runtime_configuration.asyncio_loop, + ) + return cls(async_array) + + @property + def ndim(self) -> int: + return self._async_array.ndim + + @property + def shape(self) -> ChunkCoords: + return self._async_array.shape + + @property + def size(self) -> int: + return self._async_array.size + + @property + def dtype(self) -> np.dtype: + return self._async_array.dtype + + @property + def attrs(self) -> dict: + return self._async_array.attrs + + @property + def store_path(self) -> str: + return self._async_array.store_path + + def __getitem__(self, selection: Selection): + return sync( + self._async_array.getitem(selection), + self._async_array.runtime_configuration.asyncio_loop, + ) + + def __setitem__(self, selection: Selection, value: np.ndarray) -> None: + sync( + self._async_array.setitem(selection, value), + self._async_array.runtime_configuration.asyncio_loop, + ) + + def resize(self, new_shape: ChunkCoords) -> Array: + return sync( + self._async_array.resize(new_shape), + self._async_array.runtime_configuration.asyncio_loop, + ) + + def update_attributes(self, new_attributes: Dict[str, Any]) -> Array: + return sync( + self._async_array.update_attributes(new_attributes), + self._async_array.runtime_configuration.asyncio_loop, + ) + + def __repr__(self): + return f"" + + def info(self): + return sync( + self._async_array.info(), + self._async_array.runtime_configuration.asyncio_loop, + ) diff --git a/zarr/v3/array_v2.py b/zarr/v3/array_v2.py new file mode 100644 index 0000000000..a2f26f01b0 --- /dev/null +++ b/zarr/v3/array_v2.py @@ -0,0 +1,552 @@ +from __future__ import annotations + +import asyncio +import json +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union + +import numcodecs +import numpy as np +from attr import evolve, frozen +from numcodecs.compat import ensure_bytes, ensure_ndarray + +from zarr.v3.common import ( + ZARRAY_JSON, + ZATTRS_JSON, + BytesLike, + ChunkCoords, + Selection, + SliceSelection, + concurrent_map, + to_thread, +) +from zarr.v3.indexing import BasicIndexer, all_chunk_coords, is_total_slice +from zarr.v3.metadata import ArrayV2Metadata, RuntimeConfiguration +from zarr.v3.store import StoreLike, StorePath, make_store_path +from zarr.v3.sync import sync + +if TYPE_CHECKING: + from zarr.v3.array import Array + + +@frozen +class _AsyncArrayProxy: + array: ArrayV2 + + def __getitem__(self, selection: Selection) -> _AsyncArraySelectionProxy: + return _AsyncArraySelectionProxy(self.array, selection) + + +@frozen +class _AsyncArraySelectionProxy: + array: ArrayV2 + selection: Selection + + async def get(self) -> np.ndarray: + return await self.array.get_async(self.selection) + + async def set(self, value: np.ndarray): + return await self.array.set_async(self.selection, value) + + +@frozen +class ArrayV2: + metadata: ArrayV2Metadata + attributes: Optional[Dict[str, Any]] + store_path: StorePath + runtime_configuration: RuntimeConfiguration + + @classmethod + async def create_async( + cls, + store: StoreLike, + *, + shape: ChunkCoords, + dtype: np.dtype, + chunks: ChunkCoords, + dimension_separator: Literal[".", "/"] = ".", + fill_value: Optional[Union[None, int, float]] = None, + order: Literal["C", "F"] = "C", + filters: Optional[List[Dict[str, Any]]] = None, + compressor: Optional[Dict[str, Any]] = None, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> ArrayV2: + store_path = make_store_path(store) + if not exists_ok: + assert not await (store_path / ZARRAY_JSON).exists_async() + + metadata = ArrayV2Metadata( + shape=shape, + dtype=np.dtype(dtype), + chunks=chunks, + order=order, + dimension_separator=dimension_separator, + fill_value=0 if fill_value is None else fill_value, + compressor=numcodecs.get_codec(compressor).get_config() + if compressor is not None + else None, + filters=[numcodecs.get_codec(filter).get_config() for filter in filters] + if filters is not None + else None, + ) + array = cls( + metadata=metadata, + store_path=store_path, + attributes=attributes, + runtime_configuration=runtime_configuration, + ) + await array._save_metadata() + return array + + @classmethod + def create( + cls, + store: StoreLike, + *, + shape: ChunkCoords, + dtype: np.dtype, + chunks: ChunkCoords, + dimension_separator: Literal[".", "/"] = ".", + fill_value: Optional[Union[None, int, float]] = None, + order: Literal["C", "F"] = "C", + filters: Optional[List[Dict[str, Any]]] = None, + compressor: Optional[Dict[str, Any]] = None, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> ArrayV2: + return sync( + cls.create_async( + store, + shape=shape, + dtype=dtype, + chunks=chunks, + order=order, + dimension_separator=dimension_separator, + fill_value=0 if fill_value is None else fill_value, + compressor=compressor, + filters=filters, + attributes=attributes, + exists_ok=exists_ok, + runtime_configuration=runtime_configuration, + ), + runtime_configuration.asyncio_loop, + ) + + @classmethod + async def open_async( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> ArrayV2: + store_path = make_store_path(store) + zarray_bytes, zattrs_bytes = await asyncio.gather( + (store_path / ZARRAY_JSON).get_async(), + (store_path / ZATTRS_JSON).get_async(), + ) + assert zarray_bytes is not None + return cls.from_json( + store_path, + zarray_json=json.loads(zarray_bytes), + zattrs_json=json.loads(zattrs_bytes) if zattrs_bytes is not None else None, + runtime_configuration=runtime_configuration, + ) + + @classmethod + def open( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> ArrayV2: + return sync( + cls.open_async(store, runtime_configuration), + runtime_configuration.asyncio_loop, + ) + + @classmethod + def from_json( + cls, + store_path: StorePath, + zarray_json: Any, + zattrs_json: Optional[Any], + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> ArrayV2: + metadata = ArrayV2Metadata.from_json(zarray_json) + out = cls( + store_path=store_path, + metadata=metadata, + attributes=zattrs_json, + runtime_configuration=runtime_configuration, + ) + out._validate_metadata() + return out + + async def _save_metadata(self) -> None: + self._validate_metadata() + + await (self.store_path / ZARRAY_JSON).set_async(self.metadata.to_bytes()) + if self.attributes is not None and len(self.attributes) > 0: + await (self.store_path / ZATTRS_JSON).set_async( + json.dumps(self.attributes).encode(), + ) + else: + await (self.store_path / ZATTRS_JSON).delete_async() + + def _validate_metadata(self) -> None: + assert len(self.metadata.shape) == len( + self.metadata.chunks + ), "`chunks` and `shape` need to have the same number of dimensions." + + @property + def ndim(self) -> int: + return len(self.metadata.shape) + + @property + def shape(self) -> ChunkCoords: + return self.metadata.shape + + @property + def dtype(self) -> np.dtype: + return self.metadata.dtype + + @property + def async_(self) -> _AsyncArrayProxy: + return _AsyncArrayProxy(self) + + def __getitem__(self, selection: Selection): + return sync(self.get_async(selection), self.runtime_configuration.asyncio_loop) + + async def get_async(self, selection: Selection): + indexer = BasicIndexer( + selection, + shape=self.metadata.shape, + chunk_shape=self.metadata.chunks, + ) + + # setup output array + out = np.zeros( + indexer.shape, + dtype=self.metadata.dtype, + order=self.metadata.order, + ) + + # reading chunks and decoding them + await concurrent_map( + [ + (chunk_coords, chunk_selection, out_selection, out) + for chunk_coords, chunk_selection, out_selection in indexer + ], + self._read_chunk, + ) + + if out.shape: + return out + else: + return out[()] + + async def _read_chunk( + self, + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + out: np.ndarray, + ): + store_path = self.store_path / self._encode_chunk_key(chunk_coords) + + chunk_array = await self._decode_chunk(await store_path.get_async()) + if chunk_array is not None: + tmp = chunk_array[chunk_selection] + out[out_selection] = tmp + else: + out[out_selection] = self.metadata.fill_value + + async def _decode_chunk(self, chunk_bytes: Optional[BytesLike]) -> Optional[np.ndarray]: + if chunk_bytes is None: + return None + + if self.metadata.compressor is not None: + compressor = numcodecs.get_codec(self.metadata.compressor) + chunk_array = ensure_ndarray(await to_thread(compressor.decode, chunk_bytes)) + else: + chunk_array = ensure_ndarray(chunk_bytes) + + # ensure correct dtype + if str(chunk_array.dtype) != self.metadata.dtype: + chunk_array = chunk_array.view(self.metadata.dtype) + + # apply filters in reverse order + if self.metadata.filters is not None: + for filter_metadata in self.metadata.filters[::-1]: + filter = numcodecs.get_codec(filter_metadata) + chunk_array = await to_thread(filter.decode, chunk_array) + + # ensure correct chunk shape + if chunk_array.shape != self.metadata.chunks: + chunk_array = chunk_array.reshape( + self.metadata.chunks, + order=self.metadata.order, + ) + + return chunk_array + + def __setitem__(self, selection: Selection, value: np.ndarray) -> None: + sync(self.set_async(selection, value), self.runtime_configuration.asyncio_loop) + + async def set_async(self, selection: Selection, value: np.ndarray) -> None: + chunk_shape = self.metadata.chunks + indexer = BasicIndexer( + selection, + shape=self.metadata.shape, + chunk_shape=chunk_shape, + ) + + sel_shape = indexer.shape + + # check value shape + if np.isscalar(value): + # setting a scalar value + pass + else: + if not hasattr(value, "shape"): + value = np.asarray(value, self.metadata.dtype) + assert value.shape == sel_shape + if value.dtype != self.metadata.dtype: + value = value.astype(self.metadata.dtype, order="A") + + # merging with existing data and encoding chunks + await concurrent_map( + [ + ( + value, + chunk_shape, + chunk_coords, + chunk_selection, + out_selection, + ) + for chunk_coords, chunk_selection, out_selection in indexer + ], + self._write_chunk, + ) + + async def _write_chunk( + self, + value: np.ndarray, + chunk_shape: ChunkCoords, + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + ): + store_path = self.store_path / self._encode_chunk_key(chunk_coords) + + if is_total_slice(chunk_selection, chunk_shape): + # write entire chunks + if np.isscalar(value): + chunk_array = np.empty( + chunk_shape, + dtype=self.metadata.dtype, + order=self.metadata.order, + ) + chunk_array.fill(value) + else: + chunk_array = value[out_selection] + await self._write_chunk_to_store(store_path, chunk_array) + + else: + # writing partial chunks + # read chunk first + tmp = await self._decode_chunk(await store_path.get_async()) + + # merge new value + if tmp is None: + chunk_array = np.empty( + chunk_shape, + dtype=self.metadata.dtype, + order=self.metadata.order, + ) + chunk_array.fill(self.metadata.fill_value) + else: + chunk_array = tmp.copy( + order=self.metadata.order, + ) # make a writable copy + chunk_array[chunk_selection] = value[out_selection] + + await self._write_chunk_to_store(store_path, chunk_array) + + async def _write_chunk_to_store(self, store_path: StorePath, chunk_array: np.ndarray): + chunk_bytes: Optional[BytesLike] + if np.all(chunk_array == self.metadata.fill_value): + # chunks that only contain fill_value will be removed + await store_path.delete_async() + else: + chunk_bytes = await self._encode_chunk(chunk_array) + if chunk_bytes is None: + await store_path.delete_async() + else: + await store_path.set_async(chunk_bytes) + + async def _encode_chunk(self, chunk_array: np.ndarray) -> Optional[BytesLike]: + chunk_array = chunk_array.ravel(order=self.metadata.order) + + if self.metadata.filters is not None: + for filter_metadata in self.metadata.filters: + filter = numcodecs.get_codec(filter_metadata) + chunk_array = await to_thread(filter.encode, chunk_array) + + if self.metadata.compressor is not None: + compressor = numcodecs.get_codec(self.metadata.compressor) + if not chunk_array.flags.c_contiguous and not chunk_array.flags.f_contiguous: + chunk_array = chunk_array.copy(order="A") + encoded_chunk_bytes = ensure_bytes(await to_thread(compressor.encode, chunk_array)) + else: + encoded_chunk_bytes = ensure_bytes(chunk_array) + + return encoded_chunk_bytes + + def _encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + chunk_identifier = self.metadata.dimension_separator.join(map(str, chunk_coords)) + return "0" if chunk_identifier == "" else chunk_identifier + + async def resize_async(self, new_shape: ChunkCoords) -> ArrayV2: + assert len(new_shape) == len(self.metadata.shape) + new_metadata = evolve(self.metadata, shape=new_shape) + + # Remove all chunks outside of the new shape + chunk_shape = self.metadata.chunks + old_chunk_coords = set(all_chunk_coords(self.metadata.shape, chunk_shape)) + new_chunk_coords = set(all_chunk_coords(new_shape, chunk_shape)) + + async def _delete_key(key: str) -> None: + await (self.store_path / key).delete_async() + + await concurrent_map( + [ + (self._encode_chunk_key(chunk_coords),) + for chunk_coords in old_chunk_coords.difference(new_chunk_coords) + ], + _delete_key, + ) + + # Write new metadata + await (self.store_path / ZARRAY_JSON).set_async(new_metadata.to_bytes()) + return evolve(self, metadata=new_metadata) + + def resize(self, new_shape: ChunkCoords) -> ArrayV2: + return sync(self.resize_async(new_shape), self.runtime_configuration.asyncio_loop) + + async def convert_to_v3_async(self) -> Array: + from sys import byteorder as sys_byteorder + + from zarr.v3.array import Array + from zarr.v3.common import ZARR_JSON + from zarr.v3.metadata import ( + ArrayMetadata, + BloscCodecConfigurationMetadata, + BloscCodecMetadata, + BytesCodecConfigurationMetadata, + BytesCodecMetadata, + CodecMetadata, + DataType, + GzipCodecConfigurationMetadata, + GzipCodecMetadata, + RegularChunkGridConfigurationMetadata, + RegularChunkGridMetadata, + TransposeCodecConfigurationMetadata, + TransposeCodecMetadata, + V2ChunkKeyEncodingConfigurationMetadata, + V2ChunkKeyEncodingMetadata, + blosc_shuffle_int_to_str, + dtype_to_data_type, + ) + + data_type = DataType[dtype_to_data_type[self.metadata.dtype.str]] + endian: Literal["little", "big"] + if self.metadata.dtype.byteorder == "=": + endian = sys_byteorder + elif self.metadata.dtype.byteorder == ">": + endian = "big" + else: + endian = "little" + + assert ( + self.metadata.filters is None or len(self.metadata.filters) == 0 + ), "Filters are not supported by v3." + + codecs: List[CodecMetadata] = [] + + if self.metadata.order == "F": + codecs.append( + TransposeCodecMetadata(configuration=TransposeCodecConfigurationMetadata(order="F")) + ) + codecs.append( + BytesCodecMetadata(configuration=BytesCodecConfigurationMetadata(endian=endian)) + ) + + if self.metadata.compressor is not None: + v2_codec = numcodecs.get_codec(self.metadata.compressor).get_config() + assert v2_codec["id"] in ( + "blosc", + "gzip", + ), "Only blosc and gzip are supported by v3." + if v2_codec["id"] == "blosc": + shuffle = blosc_shuffle_int_to_str[v2_codec.get("shuffle", 0)] + codecs.append( + BloscCodecMetadata( + configuration=BloscCodecConfigurationMetadata( + typesize=data_type.byte_count, + cname=v2_codec["cname"], + clevel=v2_codec["clevel"], + shuffle=shuffle, + blocksize=v2_codec.get("blocksize", 0), + ) + ) + ) + elif v2_codec["id"] == "gzip": + codecs.append( + GzipCodecMetadata( + configuration=GzipCodecConfigurationMetadata(level=v2_codec.get("level", 5)) + ) + ) + + new_metadata = ArrayMetadata( + shape=self.metadata.shape, + chunk_grid=RegularChunkGridMetadata( + configuration=RegularChunkGridConfigurationMetadata( + chunk_shape=self.metadata.chunks + ) + ), + data_type=data_type, + fill_value=0 if self.metadata.fill_value is None else self.metadata.fill_value, + chunk_key_encoding=V2ChunkKeyEncodingMetadata( + configuration=V2ChunkKeyEncodingConfigurationMetadata( + separator=self.metadata.dimension_separator + ) + ), + codecs=codecs, + attributes=self.attributes or {}, + ) + + new_metadata_bytes = new_metadata.to_bytes() + await (self.store_path / ZARR_JSON).set_async(new_metadata_bytes) + + return Array.from_json( + store_path=self.store_path, + zarr_json=json.loads(new_metadata_bytes), + runtime_configuration=self.runtime_configuration, + ) + + async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> ArrayV2: + await (self.store_path / ZATTRS_JSON).set_async(json.dumps(new_attributes).encode()) + return evolve(self, attributes=new_attributes) + + def update_attributes(self, new_attributes: Dict[str, Any]) -> ArrayV2: + return sync( + self.update_attributes_async(new_attributes), + self.runtime_configuration.asyncio_loop, + ) + + def convert_to_v3(self) -> Array: + return sync(self.convert_to_v3_async(), loop=self.runtime_configuration.asyncio_loop) + + def __repr__(self): + return f"" diff --git a/zarr/v3/codecs.py b/zarr/v3/codecs.py new file mode 100644 index 0000000000..ff913c42b2 --- /dev/null +++ b/zarr/v3/codecs.py @@ -0,0 +1,514 @@ +from __future__ import annotations + +from functools import reduce +from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Tuple, Union +from warnings import warn + +import numcodecs +import numpy as np +from attr import asdict, evolve, frozen +from crc32c import crc32c +from numcodecs.blosc import Blosc +from numcodecs.gzip import GZip +from zstandard import ZstdCompressor, ZstdDecompressor + +from zarr.v3.abc.codec import Codec, ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec +from zarr.v3.common import BytesLike, to_thread +from zarr.v3.metadata import ( + BloscCodecConfigurationMetadata, + BloscCodecMetadata, + BytesCodecConfigurationMetadata, + BytesCodecMetadata, + CodecMetadata, + Crc32cCodecMetadata, + GzipCodecConfigurationMetadata, + GzipCodecMetadata, + ShardingCodecConfigurationMetadata, + ShardingCodecMetadata, + TransposeCodecConfigurationMetadata, + TransposeCodecMetadata, + ZstdCodecConfigurationMetadata, + ZstdCodecMetadata, +) + +if TYPE_CHECKING: + from zarr.v3.metadata import CoreArrayMetadata + +# See https://zarr.readthedocs.io/en/stable/tutorial.html#configuring-blosc +numcodecs.blosc.use_threads = False + + +@frozen +class CodecPipeline: + codecs: List[Codec] + + @classmethod + def from_metadata( + cls, + codecs_metadata: Iterable[CodecMetadata], + array_metadata: CoreArrayMetadata, + ) -> CodecPipeline: + out: List[Codec] = [] + for codec_metadata in codecs_metadata or []: + if codec_metadata.name == "endian": + codec_metadata = evolve(codec_metadata, name="bytes") # type: ignore + + codec: Codec + if codec_metadata.name == "blosc": + codec = BloscCodec.from_metadata(codec_metadata, array_metadata) + elif codec_metadata.name == "gzip": + codec = GzipCodec.from_metadata(codec_metadata, array_metadata) + elif codec_metadata.name == "zstd": + codec = ZstdCodec.from_metadata(codec_metadata, array_metadata) + elif codec_metadata.name == "transpose": + codec = TransposeCodec.from_metadata(codec_metadata, array_metadata) + elif codec_metadata.name == "bytes": + codec = BytesCodec.from_metadata(codec_metadata, array_metadata) + elif codec_metadata.name == "crc32c": + codec = Crc32cCodec.from_metadata(codec_metadata, array_metadata) + elif codec_metadata.name == "sharding_indexed": + from zarr.v3.sharding import ShardingCodec + + codec = ShardingCodec.from_metadata(codec_metadata, array_metadata) + else: + raise RuntimeError(f"Unsupported codec: {codec_metadata}") + + out.append(codec) + array_metadata = codec.resolve_metadata() + CodecPipeline._validate_codecs(out, array_metadata) + return cls(out) + + @staticmethod + def _validate_codecs(codecs: List[Codec], array_metadata: CoreArrayMetadata) -> None: + from zarr.v3.sharding import ShardingCodec + + assert any( + isinstance(codec, ArrayBytesCodec) for codec in codecs + ), "Exactly one array-to-bytes codec is required." + + prev_codec: Optional[Codec] = None + for codec in codecs: + if prev_codec is not None: + assert not isinstance(codec, ArrayBytesCodec) or not isinstance( + prev_codec, ArrayBytesCodec + ), ( + f"ArrayBytesCodec '{type(codec)}' cannot follow after " + + f"ArrayBytesCodec '{type(prev_codec)}' because exactly " + + "1 ArrayBytesCodec is allowed." + ) + assert not isinstance(codec, ArrayBytesCodec) or not isinstance( + prev_codec, BytesBytesCodec + ), ( + f"ArrayBytesCodec '{type(codec)}' cannot follow after " + + f"BytesBytesCodec '{type(prev_codec)}'." + ) + assert not isinstance(codec, ArrayArrayCodec) or not isinstance( + prev_codec, ArrayBytesCodec + ), ( + f"ArrayArrayCodec '{type(codec)}' cannot follow after " + + f"ArrayBytesCodec '{type(prev_codec)}'." + ) + assert not isinstance(codec, ArrayArrayCodec) or not isinstance( + prev_codec, BytesBytesCodec + ), ( + f"ArrayArrayCodec '{type(codec)}' cannot follow after " + + f"BytesBytesCodec '{type(prev_codec)}'." + ) + + if isinstance(codec, ShardingCodec): + assert len(codec.configuration.chunk_shape) == len(array_metadata.shape), ( + "The shard's `chunk_shape` and array's `shape` need to have the " + + "same number of dimensions." + ) + assert all( + s % c == 0 + for s, c in zip( + array_metadata.chunk_shape, + codec.configuration.chunk_shape, + ) + ), ( + "The array's `chunk_shape` needs to be divisible by the " + + "shard's inner `chunk_shape`." + ) + prev_codec = codec + + if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: + warn( + "Combining a `sharding_indexed` codec disables partial reads and " + + "writes, which may lead to inefficient performance." + ) + + def _array_array_codecs(self) -> List[ArrayArrayCodec]: + return [codec for codec in self.codecs if isinstance(codec, ArrayArrayCodec)] + + def _array_bytes_codec(self) -> ArrayBytesCodec: + return next(codec for codec in self.codecs if isinstance(codec, ArrayBytesCodec)) + + def _bytes_bytes_codecs(self) -> List[BytesBytesCodec]: + return [codec for codec in self.codecs if isinstance(codec, BytesBytesCodec)] + + async def decode(self, chunk_bytes: BytesLike) -> np.ndarray: + for bb_codec in self._bytes_bytes_codecs()[::-1]: + chunk_bytes = await bb_codec.decode(chunk_bytes) + + chunk_array = await self._array_bytes_codec().decode(chunk_bytes) + + for aa_codec in self._array_array_codecs()[::-1]: + chunk_array = await aa_codec.decode(chunk_array) + + return chunk_array + + async def encode(self, chunk_array: np.ndarray) -> Optional[BytesLike]: + for aa_codec in self._array_array_codecs(): + chunk_array_maybe = await aa_codec.encode(chunk_array) + if chunk_array_maybe is None: + return None + chunk_array = chunk_array_maybe + + chunk_bytes_maybe = await self._array_bytes_codec().encode(chunk_array) + if chunk_bytes_maybe is None: + return None + chunk_bytes = chunk_bytes_maybe + + for bb_codec in self._bytes_bytes_codecs(): + chunk_bytes_maybe = await bb_codec.encode(chunk_bytes) + if chunk_bytes_maybe is None: + return None + chunk_bytes = chunk_bytes_maybe + + return chunk_bytes + + def compute_encoded_size(self, byte_length: int) -> int: + return reduce(lambda acc, codec: codec.compute_encoded_size(acc), self.codecs, byte_length) + + +@frozen +class BloscCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + configuration: BloscCodecConfigurationMetadata + blosc_codec: Blosc + is_fixed_size = False + + @classmethod + def from_metadata( + cls, codec_metadata: BloscCodecMetadata, array_metadata: CoreArrayMetadata + ) -> BloscCodec: + configuration = codec_metadata.configuration + if configuration.typesize == 0: + configuration = evolve(configuration, typesize=array_metadata.data_type.byte_count) + config_dict = asdict(codec_metadata.configuration) + config_dict.pop("typesize", None) + map_shuffle_str_to_int = {"noshuffle": 0, "shuffle": 1, "bitshuffle": 2} + config_dict["shuffle"] = map_shuffle_str_to_int[config_dict["shuffle"]] + return cls( + array_metadata=array_metadata, + configuration=configuration, + blosc_codec=Blosc.from_config(config_dict), + ) + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + return await to_thread(self.blosc_codec.decode, chunk_bytes) + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + chunk_array = np.frombuffer(chunk_bytes, dtype=self.array_metadata.dtype) + return await to_thread(self.blosc_codec.encode, chunk_array) + + def compute_encoded_size(self, _input_byte_length: int) -> int: + raise NotImplementedError + + +@frozen +class BytesCodec(ArrayBytesCodec): + array_metadata: CoreArrayMetadata + configuration: BytesCodecConfigurationMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: BytesCodecMetadata, array_metadata: CoreArrayMetadata + ) -> BytesCodec: + assert ( + array_metadata.dtype.itemsize == 1 or codec_metadata.configuration.endian is not None + ), "The `endian` configuration needs to be specified for multi-byte data types." + return cls( + array_metadata=array_metadata, + configuration=codec_metadata.configuration, + ) + + def _get_byteorder(self, array: np.ndarray) -> Literal["big", "little"]: + if array.dtype.byteorder == "<": + return "little" + elif array.dtype.byteorder == ">": + return "big" + else: + import sys + + return sys.byteorder + + async def decode( + self, + chunk_bytes: BytesLike, + ) -> np.ndarray: + if self.array_metadata.dtype.itemsize > 0: + if self.configuration.endian == "little": + prefix = "<" + else: + prefix = ">" + dtype = np.dtype(f"{prefix}{self.array_metadata.data_type.to_numpy_shortname()}") + else: + dtype = np.dtype(f"|{self.array_metadata.data_type.to_numpy_shortname()}") + chunk_array = np.frombuffer(chunk_bytes, dtype) + + # ensure correct chunk shape + if chunk_array.shape != self.array_metadata.chunk_shape: + chunk_array = chunk_array.reshape( + self.array_metadata.chunk_shape, + ) + return chunk_array + + async def encode( + self, + chunk_array: np.ndarray, + ) -> Optional[BytesLike]: + if chunk_array.dtype.itemsize > 1: + byteorder = self._get_byteorder(chunk_array) + if self.configuration.endian != byteorder: + new_dtype = chunk_array.dtype.newbyteorder(self.configuration.endian) + chunk_array = chunk_array.astype(new_dtype) + return chunk_array.tobytes() + + def compute_encoded_size(self, input_byte_length: int) -> int: + return input_byte_length + + +@frozen +class TransposeCodec(ArrayArrayCodec): + array_metadata: CoreArrayMetadata + order: Tuple[int, ...] + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: TransposeCodecMetadata, array_metadata: CoreArrayMetadata + ) -> TransposeCodec: + configuration = codec_metadata.configuration + if configuration.order == "F": + order = tuple(array_metadata.ndim - x - 1 for x in range(array_metadata.ndim)) + + elif configuration.order == "C": + order = tuple(range(array_metadata.ndim)) + + else: + assert len(configuration.order) == array_metadata.ndim, ( + "The `order` tuple needs have as many entries as " + + f"there are dimensions in the array. Got: {configuration.order}" + ) + assert len(configuration.order) == len(set(configuration.order)), ( + "There must not be duplicates in the `order` tuple. " + + f"Got: {configuration.order}" + ) + assert all(0 <= x < array_metadata.ndim for x in configuration.order), ( + "All entries in the `order` tuple must be between 0 and " + + f"the number of dimensions in the array. Got: {configuration.order}" + ) + order = tuple(configuration.order) + + return cls( + array_metadata=array_metadata, + order=order, + ) + + def resolve_metadata(self) -> CoreArrayMetadata: + from zarr.v3.metadata import CoreArrayMetadata + + return CoreArrayMetadata( + shape=tuple( + self.array_metadata.shape[self.order[i]] for i in range(self.array_metadata.ndim) + ), + chunk_shape=tuple( + self.array_metadata.chunk_shape[self.order[i]] + for i in range(self.array_metadata.ndim) + ), + data_type=self.array_metadata.data_type, + fill_value=self.array_metadata.fill_value, + runtime_configuration=self.array_metadata.runtime_configuration, + ) + + async def decode( + self, + chunk_array: np.ndarray, + ) -> np.ndarray: + inverse_order = [0 for _ in range(self.array_metadata.ndim)] + for x, i in enumerate(self.order): + inverse_order[x] = i + chunk_array = chunk_array.transpose(inverse_order) + return chunk_array + + async def encode( + self, + chunk_array: np.ndarray, + ) -> Optional[np.ndarray]: + chunk_array = chunk_array.transpose(self.order) + return chunk_array + + def compute_encoded_size(self, input_byte_length: int) -> int: + return input_byte_length + + +@frozen +class GzipCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + configuration: GzipCodecConfigurationMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: GzipCodecMetadata, array_metadata: CoreArrayMetadata + ) -> GzipCodec: + return cls( + array_metadata=array_metadata, + configuration=codec_metadata.configuration, + ) + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + return await to_thread(GZip(self.configuration.level).decode, chunk_bytes) + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + return await to_thread(GZip(self.configuration.level).encode, chunk_bytes) + + def compute_encoded_size(self, _input_byte_length: int) -> int: + raise NotImplementedError + + +@frozen +class ZstdCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + configuration: ZstdCodecConfigurationMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: ZstdCodecMetadata, array_metadata: CoreArrayMetadata + ) -> ZstdCodec: + return cls( + array_metadata=array_metadata, + configuration=codec_metadata.configuration, + ) + + def _compress(self, data: bytes) -> bytes: + ctx = ZstdCompressor( + level=self.configuration.level, write_checksum=self.configuration.checksum + ) + return ctx.compress(data) + + def _decompress(self, data: bytes) -> bytes: + ctx = ZstdDecompressor() + return ctx.decompress(data) + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + return await to_thread(self._decompress, chunk_bytes) + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + return await to_thread(self._compress, chunk_bytes) + + def compute_encoded_size(self, _input_byte_length: int) -> int: + raise NotImplementedError + + +@frozen +class Crc32cCodec(BytesBytesCodec): + array_metadata: CoreArrayMetadata + is_fixed_size = True + + @classmethod + def from_metadata( + cls, codec_metadata: Crc32cCodecMetadata, array_metadata: CoreArrayMetadata + ) -> Crc32cCodec: + return cls(array_metadata=array_metadata) + + async def decode( + self, + chunk_bytes: bytes, + ) -> BytesLike: + crc32_bytes = chunk_bytes[-4:] + inner_bytes = chunk_bytes[:-4] + + assert np.uint32(crc32c(inner_bytes)).tobytes() == bytes(crc32_bytes) + return inner_bytes + + async def encode( + self, + chunk_bytes: bytes, + ) -> Optional[BytesLike]: + return chunk_bytes + np.uint32(crc32c(chunk_bytes)).tobytes() + + def compute_encoded_size(self, input_byte_length: int) -> int: + return input_byte_length + 4 + + +def blosc_codec( + typesize: int, + cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd", + clevel: int = 5, + shuffle: Literal["noshuffle", "shuffle", "bitshuffle"] = "noshuffle", + blocksize: int = 0, +) -> BloscCodecMetadata: + return BloscCodecMetadata( + configuration=BloscCodecConfigurationMetadata( + cname=cname, + clevel=clevel, + shuffle=shuffle, + blocksize=blocksize, + typesize=typesize, + ) + ) + + +def bytes_codec(endian: Optional[Literal["big", "little"]] = "little") -> BytesCodecMetadata: + return BytesCodecMetadata(configuration=BytesCodecConfigurationMetadata(endian)) + + +def transpose_codec(order: Union[Tuple[int, ...], Literal["C", "F"]]) -> TransposeCodecMetadata: + return TransposeCodecMetadata(configuration=TransposeCodecConfigurationMetadata(order)) + + +def gzip_codec(level: int = 5) -> GzipCodecMetadata: + return GzipCodecMetadata(configuration=GzipCodecConfigurationMetadata(level)) + + +def zstd_codec(level: int = 0, checksum: bool = False) -> ZstdCodecMetadata: + return ZstdCodecMetadata(configuration=ZstdCodecConfigurationMetadata(level, checksum)) + + +def crc32c_codec() -> Crc32cCodecMetadata: + return Crc32cCodecMetadata() + + +def sharding_codec( + chunk_shape: Tuple[int, ...], + codecs: Optional[List[CodecMetadata]] = None, + index_codecs: Optional[List[CodecMetadata]] = None, +) -> ShardingCodecMetadata: + codecs = codecs or [bytes_codec()] + index_codecs = index_codecs or [bytes_codec(), crc32c_codec()] + return ShardingCodecMetadata( + configuration=ShardingCodecConfigurationMetadata(chunk_shape, codecs, index_codecs) + ) diff --git a/zarr/v3/common.py b/zarr/v3/common.py new file mode 100644 index 0000000000..0e55a7c1fd --- /dev/null +++ b/zarr/v3/common.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import asyncio +import contextvars +import functools +from typing import ( + Any, + Awaitable, + Callable, + Dict, + List, + Literal, + Optional, + Tuple, + TypeVar, + Union, +) + +import numpy as np +from cattr import Converter + +ZARR_JSON = "zarr.json" +ZARRAY_JSON = ".zarray" +ZGROUP_JSON = ".zgroup" +ZATTRS_JSON = ".zattrs" + +BytesLike = Union[bytes, bytearray, memoryview] +ChunkCoords = Tuple[int, ...] +SliceSelection = Tuple[slice, ...] +Selection = Union[slice, SliceSelection] + + +def make_cattr(): + from zarr.v3.metadata import ( + BloscCodecMetadata, + BytesCodecMetadata, + ChunkKeyEncodingMetadata, + CodecMetadata, + Crc32cCodecMetadata, + DefaultChunkKeyEncodingMetadata, + GzipCodecMetadata, + ShardingCodecMetadata, + TransposeCodecMetadata, + V2ChunkKeyEncodingMetadata, + ZstdCodecMetadata, + ) + + converter = Converter() + + def _structure_chunk_key_encoding_metadata(d: Dict[str, Any], _t) -> ChunkKeyEncodingMetadata: + if d["name"] == "default": + return converter.structure(d, DefaultChunkKeyEncodingMetadata) + if d["name"] == "v2": + return converter.structure(d, V2ChunkKeyEncodingMetadata) + raise KeyError + + converter.register_structure_hook( + ChunkKeyEncodingMetadata, _structure_chunk_key_encoding_metadata + ) + + def _structure_codec_metadata(d: Dict[str, Any], _t=None) -> CodecMetadata: + if d["name"] == "endian": + d["name"] = "bytes" + + if d["name"] == "blosc": + return converter.structure(d, BloscCodecMetadata) + if d["name"] == "bytes": + return converter.structure(d, BytesCodecMetadata) + if d["name"] == "transpose": + return converter.structure(d, TransposeCodecMetadata) + if d["name"] == "gzip": + return converter.structure(d, GzipCodecMetadata) + if d["name"] == "zstd": + return converter.structure(d, ZstdCodecMetadata) + if d["name"] == "sharding_indexed": + return converter.structure(d, ShardingCodecMetadata) + if d["name"] == "crc32c": + return converter.structure(d, Crc32cCodecMetadata) + raise KeyError + + converter.register_structure_hook(CodecMetadata, _structure_codec_metadata) + + converter.register_structure_hook_factory( + lambda t: str(t) == "ForwardRef('CodecMetadata')", + lambda t: _structure_codec_metadata, + ) + + def _structure_order(d: Any, _t=None) -> Union[Literal["C", "F"], Tuple[int, ...]]: + if d == "C": + return "C" + if d == "F": + return "F" + if isinstance(d, list): + return tuple(d) + raise KeyError + + converter.register_structure_hook_factory( + lambda t: str(t) == "typing.Union[typing.Literal['C', 'F'], typing.Tuple[int, ...]]", + lambda t: _structure_order, + ) + + # Needed for v2 fill_value + def _structure_fill_value(d: Any, _t=None) -> Union[None, int, float]: + if d is None: + return None + try: + return int(d) + except ValueError: + pass + try: + return float(d) + except ValueError: + pass + raise ValueError + + converter.register_structure_hook_factory( + lambda t: str(t) == "typing.Union[NoneType, int, float]", + lambda t: _structure_fill_value, + ) + + # Needed for v2 dtype + converter.register_structure_hook( + np.dtype, + lambda d, _: np.dtype(d), + ) + + return converter + + +def product(tup: ChunkCoords) -> int: + return functools.reduce(lambda x, y: x * y, tup, 1) + + +T = TypeVar("T", bound=Tuple) +V = TypeVar("V") + + +async def concurrent_map( + items: List[T], func: Callable[..., Awaitable[V]], limit: Optional[int] = None +) -> List[V]: + if limit is None: + return await asyncio.gather(*[func(*item) for item in items]) + + else: + sem = asyncio.Semaphore(limit) + + async def run(item): + async with sem: + return await func(*item) + + return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items]) + + +async def to_thread(func, /, *args, **kwargs): + loop = asyncio.get_running_loop() + ctx = contextvars.copy_context() + func_call = functools.partial(ctx.run, func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) diff --git a/zarr/v3/group.py b/zarr/v3/group.py new file mode 100644 index 0000000000..aa43c706a5 --- /dev/null +++ b/zarr/v3/group.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, Literal, Optional, Union + +from attr import asdict, evolve, field, frozen + +from zarr.v3.array import Array +from zarr.v3.common import ZARR_JSON, make_cattr +from zarr.v3.metadata import RuntimeConfiguration +from zarr.v3.store import StoreLike, StorePath, make_store_path +from zarr.v3.sync import sync + + +@frozen +class GroupMetadata: + attributes: Dict[str, Any] = field(factory=dict) + zarr_format: Literal[3] = 3 + node_type: Literal["group"] = "group" + + def to_bytes(self) -> bytes: + return json.dumps(asdict(self)).encode() + + @classmethod + def from_json(cls, zarr_json: Any) -> GroupMetadata: + return make_cattr().structure(zarr_json, GroupMetadata) + + +@frozen +class Group: + metadata: GroupMetadata + store_path: StorePath + runtime_configuration: RuntimeConfiguration + + @classmethod + async def create_async( + cls, + store: StoreLike, + *, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + store_path = make_store_path(store) + if not exists_ok: + assert not await (store_path / ZARR_JSON).exists_async() + group = cls( + metadata=GroupMetadata(attributes=attributes or {}), + store_path=store_path, + runtime_configuration=runtime_configuration, + ) + await group._save_metadata() + return group + + @classmethod + def create( + cls, + store: StoreLike, + *, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + return sync( + cls.create_async( + store, + attributes=attributes, + exists_ok=exists_ok, + runtime_configuration=runtime_configuration, + ), + runtime_configuration.asyncio_loop, + ) + + @classmethod + async def open_async( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + store_path = make_store_path(store) + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + assert zarr_json_bytes is not None + return cls.from_json(store_path, json.loads(zarr_json_bytes), runtime_configuration) + + @classmethod + def open( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Group: + return sync( + cls.open_async(store, runtime_configuration), + runtime_configuration.asyncio_loop, + ) + + @classmethod + def from_json( + cls, + store_path: StorePath, + zarr_json: Any, + runtime_configuration: RuntimeConfiguration, + ) -> Group: + group = cls( + metadata=GroupMetadata.from_json(zarr_json), + store_path=store_path, + runtime_configuration=runtime_configuration, + ) + return group + + @classmethod + async def open_or_array( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Union[Array, Group]: + store_path = make_store_path(store) + zarr_json_bytes = await (store_path / ZARR_JSON).get_async() + if zarr_json_bytes is None: + raise KeyError + zarr_json = json.loads(zarr_json_bytes) + if zarr_json["node_type"] == "group": + return cls.from_json(store_path, zarr_json, runtime_configuration) + if zarr_json["node_type"] == "array": + return Array.from_json( + store_path, zarr_json, runtime_configuration=runtime_configuration + ) + raise KeyError + + async def _save_metadata(self) -> None: + await (self.store_path / ZARR_JSON).set_async(self.metadata.to_bytes()) + + async def get_async(self, path: str) -> Union[Array, Group]: + return await self.__class__.open_or_array( + self.store_path / path, self.runtime_configuration + ) + + def __getitem__(self, path: str) -> Union[Array, Group]: + return sync(self.get_async(path), self.runtime_configuration.asyncio_loop) + + async def create_group_async(self, path: str, **kwargs) -> Group: + runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) + return await self.__class__.create_async( + self.store_path / path, + runtime_configuration=runtime_configuration, + **kwargs, + ) + + def create_group(self, path: str, **kwargs) -> Group: + return sync(self.create_group_async(path), self.runtime_configuration.asyncio_loop) + + async def create_array_async(self, path: str, **kwargs) -> Array: + runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) + return await Array.create_async( + self.store_path / path, + runtime_configuration=runtime_configuration, + **kwargs, + ) + + def create_array(self, path: str, **kwargs) -> Array: + return sync( + self.create_array_async(path, **kwargs), + self.runtime_configuration.asyncio_loop, + ) + + async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> Group: + new_metadata = evolve(self.metadata, attributes=new_attributes) + + # Write new metadata + await (self.store_path / ZARR_JSON).set_async(new_metadata.to_bytes()) + return evolve(self, metadata=new_metadata) + + def update_attributes(self, new_attributes: Dict[str, Any]) -> Group: + return sync( + self.update_attributes_async(new_attributes), + self.runtime_configuration.asyncio_loop, + ) + + def __repr__(self): + return f"" diff --git a/zarr/v3/group_v2.py b/zarr/v3/group_v2.py new file mode 100644 index 0000000000..3b1a369ae2 --- /dev/null +++ b/zarr/v3/group_v2.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import asyncio +import json +from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union + +from attr import asdict, evolve, frozen + +from zarr.v3.array_v2 import ArrayV2 +from zarr.v3.common import ZARRAY_JSON, ZATTRS_JSON, ZGROUP_JSON, make_cattr +from zarr.v3.metadata import RuntimeConfiguration +from zarr.v3.store import StoreLike, StorePath, make_store_path +from zarr.v3.sync import sync + +if TYPE_CHECKING: + from zarr.v3.group import Group + + +@frozen +class GroupV2Metadata: + zarr_format: Literal[2] = 2 + + def to_bytes(self) -> bytes: + return json.dumps(asdict(self)).encode() + + @classmethod + def from_json(cls, zarr_json: Any) -> GroupV2Metadata: + return make_cattr().structure(zarr_json, cls) + + +@frozen +class GroupV2: + metadata: GroupV2Metadata + store_path: StorePath + runtime_configuration: RuntimeConfiguration + attributes: Optional[Dict[str, Any]] = None + + @classmethod + async def create_async( + cls, + store: StoreLike, + *, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> GroupV2: + store_path = make_store_path(store) + if not exists_ok: + assert not await (store_path / ZGROUP_JSON).exists_async() + group = cls( + metadata=GroupV2Metadata(), + attributes=attributes, + store_path=store_path, + runtime_configuration=runtime_configuration, + ) + await group._save_metadata() + return group + + @classmethod + def create( + cls, + store: StoreLike, + *, + attributes: Optional[Dict[str, Any]] = None, + exists_ok: bool = False, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> GroupV2: + return sync( + cls.create_async( + store, + attributes=attributes, + exists_ok=exists_ok, + runtime_configuration=runtime_configuration, + ), + runtime_configuration.asyncio_loop if runtime_configuration else None, + ) + + @classmethod + async def open_async( + cls, + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> GroupV2: + store_path = make_store_path(store) + zgroup_bytes = await (store_path / ZGROUP_JSON).get_async() + assert zgroup_bytes is not None + zattrs_bytes = await (store_path / ZATTRS_JSON).get_async() + metadata = json.loads(zgroup_bytes) + attributes = json.loads(zattrs_bytes) if zattrs_bytes is not None else None + + return cls.from_json( + store_path, + metadata, + runtime_configuration, + attributes, + ) + + @classmethod + def open( + cls, + store_path: StorePath, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> GroupV2: + return sync( + cls.open_async(store_path, runtime_configuration), + runtime_configuration.asyncio_loop, + ) + + @classmethod + def from_json( + cls, + store_path: StorePath, + zarr_json: Any, + runtime_configuration: RuntimeConfiguration, + attributes: Optional[Dict[str, Any]] = None, + ) -> GroupV2: + group = cls( + metadata=GroupV2Metadata.from_json(zarr_json), + store_path=store_path, + runtime_configuration=runtime_configuration, + attributes=attributes, + ) + return group + + @staticmethod + async def open_or_array( + store: StoreLike, + runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(), + ) -> Union[ArrayV2, GroupV2]: + store_path = make_store_path(store) + zgroup_bytes, zattrs_bytes = await asyncio.gather( + (store_path / ZGROUP_JSON).get_async(), + (store_path / ZATTRS_JSON).get_async(), + ) + attributes = json.loads(zattrs_bytes) if zattrs_bytes is not None else None + if zgroup_bytes is not None: + return GroupV2.from_json( + store_path, json.loads(zgroup_bytes), runtime_configuration, attributes + ) + zarray_bytes = await (store_path / ZARRAY_JSON).get_async() + if zarray_bytes is not None: + return ArrayV2.from_json( + store_path, json.loads(zarray_bytes), attributes, runtime_configuration + ) + raise KeyError + + async def _save_metadata(self) -> None: + await (self.store_path / ZGROUP_JSON).set_async(self.metadata.to_bytes()) + if self.attributes is not None and len(self.attributes) > 0: + await (self.store_path / ZATTRS_JSON).set_async( + json.dumps(self.attributes).encode(), + ) + else: + await (self.store_path / ZATTRS_JSON).delete_async() + + async def get_async(self, path: str) -> Union[ArrayV2, GroupV2]: + return await self.__class__.open_or_array( + self.store_path / path, self.runtime_configuration + ) + + def __getitem__(self, path: str) -> Union[ArrayV2, GroupV2]: + return sync(self.get_async(path), self.runtime_configuration.asyncio_loop) + + async def create_group_async(self, path: str, **kwargs) -> GroupV2: + runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) + return await self.__class__.create_async( + self.store_path / path, + runtime_configuration=runtime_configuration, + **kwargs, + ) + + def create_group(self, path: str, **kwargs) -> GroupV2: + return sync(self.create_group_async(path), self.runtime_configuration.asyncio_loop) + + async def create_array_async(self, path: str, **kwargs) -> ArrayV2: + runtime_configuration = kwargs.pop("runtime_configuration", self.runtime_configuration) + return await ArrayV2.create_async( + self.store_path / path, + runtime_configuration=runtime_configuration, + **kwargs, + ) + + def create_array(self, path: str, **kwargs) -> ArrayV2: + return sync( + self.create_array_async(path, **kwargs), + self.runtime_configuration.asyncio_loop, + ) + + async def convert_to_v3_async(self) -> Group: + from zarr.v3.common import ZARR_JSON + from zarr.v3.group import Group, GroupMetadata + + new_metadata = GroupMetadata(attributes=self.attributes or {}) + new_metadata_bytes = new_metadata.to_bytes() + + await (self.store_path / ZARR_JSON).set_async(new_metadata_bytes) + + return Group.from_json( + store_path=self.store_path, + zarr_json=json.loads(new_metadata_bytes), + runtime_configuration=self.runtime_configuration, + ) + + async def update_attributes_async(self, new_attributes: Dict[str, Any]) -> GroupV2: + await (self.store_path / ZATTRS_JSON).set_async(json.dumps(new_attributes).encode()) + return evolve(self, attributes=new_attributes) + + def update_attributes(self, new_attributes: Dict[str, Any]) -> GroupV2: + return sync( + self.update_attributes_async(new_attributes), + self.runtime_configuration.asyncio_loop, + ) + + def convert_to_v3(self) -> Group: + return sync(self.convert_to_v3_async(), loop=self.runtime_configuration.asyncio_loop) + + def __repr__(self): + return f"" diff --git a/zarr/v3/indexing.py b/zarr/v3/indexing.py new file mode 100644 index 0000000000..15adad111d --- /dev/null +++ b/zarr/v3/indexing.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +import itertools +import math +from typing import Iterator, List, NamedTuple, Optional, Tuple + +from zarr.v3.common import ChunkCoords, Selection, SliceSelection, product + + +def _ensure_tuple(v: Selection) -> SliceSelection: + if not isinstance(v, tuple): + v = (v,) + return v + + +def _err_too_many_indices(selection: SliceSelection, shape: ChunkCoords): + raise IndexError( + "too many indices for array; expected {}, got {}".format(len(shape), len(selection)) + ) + + +def _err_negative_step(): + raise IndexError("only slices with step >= 1 are supported") + + +def _check_selection_length(selection: SliceSelection, shape: ChunkCoords): + if len(selection) > len(shape): + _err_too_many_indices(selection, shape) + + +def _ensure_selection( + selection: Selection, + shape: ChunkCoords, +) -> SliceSelection: + selection = _ensure_tuple(selection) + + # fill out selection if not completely specified + if len(selection) < len(shape): + selection += (slice(None),) * (len(shape) - len(selection)) + + # check selection not too long + _check_selection_length(selection, shape) + + return selection + + +class _ChunkDimProjection(NamedTuple): + dim_chunk_ix: int + dim_chunk_sel: slice + dim_out_sel: Optional[slice] + + +def _ceildiv(a, b): + return math.ceil(a / b) + + +class _SliceDimIndexer: + dim_sel: slice + dim_len: int + dim_chunk_len: int + nitems: int + + start: int + stop: int + step: int + + def __init__(self, dim_sel: slice, dim_len: int, dim_chunk_len: int): + self.start, self.stop, self.step = dim_sel.indices(dim_len) + if self.step < 1: + _err_negative_step() + + self.dim_len = dim_len + self.dim_chunk_len = dim_chunk_len + self.nitems = max(0, _ceildiv((self.stop - self.start), self.step)) + self.nchunks = _ceildiv(self.dim_len, self.dim_chunk_len) + + def __iter__(self) -> Iterator[_ChunkDimProjection]: + # figure out the range of chunks we need to visit + dim_chunk_ix_from = self.start // self.dim_chunk_len + dim_chunk_ix_to = _ceildiv(self.stop, self.dim_chunk_len) + + # iterate over chunks in range + for dim_chunk_ix in range(dim_chunk_ix_from, dim_chunk_ix_to): + # compute offsets for chunk within overall array + dim_offset = dim_chunk_ix * self.dim_chunk_len + dim_limit = min(self.dim_len, (dim_chunk_ix + 1) * self.dim_chunk_len) + + # determine chunk length, accounting for trailing chunk + dim_chunk_len = dim_limit - dim_offset + + if self.start < dim_offset: + # selection starts before current chunk + dim_chunk_sel_start = 0 + remainder = (dim_offset - self.start) % self.step + if remainder: + dim_chunk_sel_start += self.step - remainder + # compute number of previous items, provides offset into output array + dim_out_offset = _ceildiv((dim_offset - self.start), self.step) + + else: + # selection starts within current chunk + dim_chunk_sel_start = self.start - dim_offset + dim_out_offset = 0 + + if self.stop > dim_limit: + # selection ends after current chunk + dim_chunk_sel_stop = dim_chunk_len + + else: + # selection ends within current chunk + dim_chunk_sel_stop = self.stop - dim_offset + + dim_chunk_sel = slice(dim_chunk_sel_start, dim_chunk_sel_stop, self.step) + dim_chunk_nitems = _ceildiv((dim_chunk_sel_stop - dim_chunk_sel_start), self.step) + dim_out_sel = slice(dim_out_offset, dim_out_offset + dim_chunk_nitems) + + yield _ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel) + + +class _ChunkProjection(NamedTuple): + chunk_coords: ChunkCoords + chunk_selection: SliceSelection + out_selection: SliceSelection + + +class BasicIndexer: + dim_indexers: List[_SliceDimIndexer] + shape: ChunkCoords + + def __init__( + self, + selection: Selection, + shape: Tuple[int, ...], + chunk_shape: Tuple[int, ...], + ): + # setup per-dimension indexers + self.dim_indexers = [ + _SliceDimIndexer(dim_sel, dim_len, dim_chunk_len) + for dim_sel, dim_len, dim_chunk_len in zip( + _ensure_selection(selection, shape), shape, chunk_shape + ) + ] + self.shape = tuple(s.nitems for s in self.dim_indexers) + + def __iter__(self) -> Iterator[_ChunkProjection]: + for dim_projections in itertools.product(*self.dim_indexers): + chunk_coords = tuple(p.dim_chunk_ix for p in dim_projections) + chunk_selection = tuple(p.dim_chunk_sel for p in dim_projections) + out_selection = tuple( + p.dim_out_sel for p in dim_projections if p.dim_out_sel is not None + ) + + yield _ChunkProjection(chunk_coords, chunk_selection, out_selection) + + +def morton_order_iter(chunk_shape: ChunkCoords) -> Iterator[ChunkCoords]: + def decode_morton(z: int, chunk_shape: ChunkCoords) -> ChunkCoords: + # Inspired by compressed morton code as implemented in Neuroglancer + # https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/volume.md#compressed-morton-code + bits = tuple(math.ceil(math.log2(c)) for c in chunk_shape) + max_coords_bits = max(*bits) + input_bit = 0 + input_value = z + out = [0 for _ in range(len(chunk_shape))] + + for coord_bit in range(max_coords_bits): + for dim in range(len(chunk_shape)): + if coord_bit < bits[dim]: + bit = (input_value >> input_bit) & 1 + out[dim] |= bit << coord_bit + input_bit += 1 + return tuple(out) + + for i in range(product(chunk_shape)): + yield decode_morton(i, chunk_shape) + + +def c_order_iter(chunks_per_shard: ChunkCoords) -> Iterator[ChunkCoords]: + return itertools.product(*(range(x) for x in chunks_per_shard)) + + +def is_total_slice(item: Selection, shape: ChunkCoords): + """Determine whether `item` specifies a complete slice of array with the + given `shape`. Used to optimize __setitem__ operations on the Chunk + class.""" + + # N.B., assume shape is normalized + if item == slice(None): + return True + if isinstance(item, slice): + item = (item,) + if isinstance(item, tuple): + return all( + ( + isinstance(dim_sel, slice) + and ( + (dim_sel == slice(None)) + or ((dim_sel.stop - dim_sel.start == dim_len) and (dim_sel.step in [1, None])) + ) + ) + for dim_sel, dim_len in zip(item, shape) + ) + else: + raise TypeError("expected slice or tuple of slices, found %r" % item) + + +def all_chunk_coords(shape: ChunkCoords, chunk_shape: ChunkCoords) -> Iterator[ChunkCoords]: + return itertools.product(*(range(0, _ceildiv(s, c)) for s, c in zip(shape, chunk_shape))) diff --git a/zarr/v3/metadata.py b/zarr/v3/metadata.py new file mode 100644 index 0000000000..1fc43b19f0 --- /dev/null +++ b/zarr/v3/metadata.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +import json +from asyncio import AbstractEventLoop +from enum import Enum +from typing import Any, Dict, List, Literal, Optional, Tuple, Union + +import numpy as np +from attr import asdict, field, frozen + +from zarr.v3.common import ChunkCoords, make_cattr + + +@frozen +class RuntimeConfiguration: + order: Literal["C", "F"] = "C" + concurrency: Optional[int] = None + asyncio_loop: Optional[AbstractEventLoop] = None + + +def runtime_configuration( + order: Literal["C", "F"], concurrency: Optional[int] = None +) -> RuntimeConfiguration: + return RuntimeConfiguration(order=order, concurrency=concurrency) + + +class DataType(Enum): + bool = "bool" + int8 = "int8" + int16 = "int16" + int32 = "int32" + int64 = "int64" + uint8 = "uint8" + uint16 = "uint16" + uint32 = "uint32" + uint64 = "uint64" + float32 = "float32" + float64 = "float64" + + @property + def byte_count(self) -> int: + data_type_byte_counts = { + DataType.bool: 1, + DataType.int8: 1, + DataType.int16: 2, + DataType.int32: 4, + DataType.int64: 8, + DataType.uint8: 1, + DataType.uint16: 2, + DataType.uint32: 4, + DataType.uint64: 8, + DataType.float32: 4, + DataType.float64: 8, + } + return data_type_byte_counts[self] + + def to_numpy_shortname(self) -> str: + data_type_to_numpy = { + DataType.bool: "bool", + DataType.int8: "i1", + DataType.int16: "i2", + DataType.int32: "i4", + DataType.int64: "i8", + DataType.uint8: "u1", + DataType.uint16: "u2", + DataType.uint32: "u4", + DataType.uint64: "u8", + DataType.float32: "f4", + DataType.float64: "f8", + } + return data_type_to_numpy[self] + + +dtype_to_data_type = { + "|b1": "bool", + "bool": "bool", + "|i1": "int8", + " ChunkCoords: + if chunk_key == "c": + return () + return tuple(map(int, chunk_key[1:].split(self.configuration.separator))) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + return self.configuration.separator.join(map(str, ("c",) + chunk_coords)) + + +@frozen +class V2ChunkKeyEncodingConfigurationMetadata: + separator: Literal[".", "/"] = "." + + +@frozen +class V2ChunkKeyEncodingMetadata: + configuration: V2ChunkKeyEncodingConfigurationMetadata = ( + V2ChunkKeyEncodingConfigurationMetadata() + ) + name: Literal["v2"] = "v2" + + def decode_chunk_key(self, chunk_key: str) -> ChunkCoords: + return tuple(map(int, chunk_key.split(self.configuration.separator))) + + def encode_chunk_key(self, chunk_coords: ChunkCoords) -> str: + chunk_identifier = self.configuration.separator.join(map(str, chunk_coords)) + return "0" if chunk_identifier == "" else chunk_identifier + + +ChunkKeyEncodingMetadata = Union[DefaultChunkKeyEncodingMetadata, V2ChunkKeyEncodingMetadata] + + +BloscShuffle = Literal["noshuffle", "shuffle", "bitshuffle"] + + +@frozen +class BloscCodecConfigurationMetadata: + typesize: int + cname: Literal["lz4", "lz4hc", "blosclz", "zstd", "snappy", "zlib"] = "zstd" + clevel: int = 5 + shuffle: BloscShuffle = "noshuffle" + blocksize: int = 0 + + +blosc_shuffle_int_to_str: Dict[int, BloscShuffle] = { + 0: "noshuffle", + 1: "shuffle", + 2: "bitshuffle", +} + + +@frozen +class BloscCodecMetadata: + configuration: BloscCodecConfigurationMetadata + name: Literal["blosc"] = "blosc" + + +@frozen +class BytesCodecConfigurationMetadata: + endian: Optional[Literal["big", "little"]] = "little" + + +@frozen +class BytesCodecMetadata: + configuration: BytesCodecConfigurationMetadata + name: Literal["bytes"] = "bytes" + + +@frozen +class TransposeCodecConfigurationMetadata: + order: Union[Literal["C", "F"], Tuple[int, ...]] = "C" + + +@frozen +class TransposeCodecMetadata: + configuration: TransposeCodecConfigurationMetadata + name: Literal["transpose"] = "transpose" + + +@frozen +class GzipCodecConfigurationMetadata: + level: int = 5 + + +@frozen +class GzipCodecMetadata: + configuration: GzipCodecConfigurationMetadata + name: Literal["gzip"] = "gzip" + + +@frozen +class ZstdCodecConfigurationMetadata: + level: int = 0 + checksum: bool = False + + +@frozen +class ZstdCodecMetadata: + configuration: ZstdCodecConfigurationMetadata + name: Literal["zstd"] = "zstd" + + +@frozen +class Crc32cCodecMetadata: + name: Literal["crc32c"] = "crc32c" + + +@frozen +class ShardingCodecConfigurationMetadata: + chunk_shape: ChunkCoords + codecs: List["CodecMetadata"] + index_codecs: List["CodecMetadata"] + + +@frozen +class ShardingCodecMetadata: + configuration: ShardingCodecConfigurationMetadata + name: Literal["sharding_indexed"] = "sharding_indexed" + + +CodecMetadata = Union[ + BloscCodecMetadata, + BytesCodecMetadata, + TransposeCodecMetadata, + GzipCodecMetadata, + ZstdCodecMetadata, + ShardingCodecMetadata, + Crc32cCodecMetadata, +] + + +@frozen +class CoreArrayMetadata: + shape: ChunkCoords + chunk_shape: ChunkCoords + data_type: DataType + fill_value: Any + runtime_configuration: RuntimeConfiguration + + @property + def dtype(self) -> np.dtype: + return np.dtype(self.data_type.value) + + @property + def ndim(self) -> int: + return len(self.shape) + + +@frozen +class ArrayMetadata: + shape: ChunkCoords + data_type: DataType + chunk_grid: RegularChunkGridMetadata + chunk_key_encoding: ChunkKeyEncodingMetadata + fill_value: Any + codecs: List[CodecMetadata] + attributes: Dict[str, Any] = field(factory=dict) + dimension_names: Optional[Tuple[str, ...]] = None + zarr_format: Literal[3] = 3 + node_type: Literal["array"] = "array" + + @property + def dtype(self) -> np.dtype: + return np.dtype(self.data_type.value) + + @property + def ndim(self) -> int: + return len(self.shape) + + def get_core_metadata(self, runtime_configuration: RuntimeConfiguration) -> CoreArrayMetadata: + return CoreArrayMetadata( + shape=self.shape, + chunk_shape=self.chunk_grid.configuration.chunk_shape, + data_type=self.data_type, + fill_value=self.fill_value, + runtime_configuration=runtime_configuration, + ) + + def to_bytes(self) -> bytes: + def _json_convert(o): + if isinstance(o, DataType): + return o.name + raise TypeError + + return json.dumps( + asdict( + self, + filter=lambda attr, value: attr.name != "dimension_names" or value is not None, + ), + default=_json_convert, + ).encode() + + @classmethod + def from_json(cls, zarr_json: Any) -> ArrayMetadata: + return make_cattr().structure(zarr_json, cls) + + +@frozen +class ArrayV2Metadata: + shape: ChunkCoords + chunks: ChunkCoords + dtype: np.dtype + fill_value: Union[None, int, float] = 0 + order: Literal["C", "F"] = "C" + filters: Optional[List[Dict[str, Any]]] = None + dimension_separator: Literal[".", "/"] = "." + compressor: Optional[Dict[str, Any]] = None + zarr_format: Literal[2] = 2 + + @property + def ndim(self) -> int: + return len(self.shape) + + def to_bytes(self) -> bytes: + def _json_convert(o): + if isinstance(o, np.dtype): + if o.fields is None: + return o.str + else: + return o.descr + raise TypeError + + return json.dumps(asdict(self), default=_json_convert).encode() + + @classmethod + def from_json(cls, zarr_json: Any) -> ArrayV2Metadata: + return make_cattr().structure(zarr_json, cls) diff --git a/zarr/v3/sharding.py b/zarr/v3/sharding.py new file mode 100644 index 0000000000..3c5b4bd12d --- /dev/null +++ b/zarr/v3/sharding.py @@ -0,0 +1,516 @@ +from __future__ import annotations + +from typing import Iterator, List, Mapping, NamedTuple, Optional, Set, Tuple + +import numpy as np +from attrs import frozen + +from zarr.v3.codecs import ArrayBytesCodec, CodecPipeline +from zarr.v3.common import ( + BytesLike, + ChunkCoords, + SliceSelection, + concurrent_map, + product, +) +from zarr.v3.indexing import ( + BasicIndexer, + c_order_iter, + is_total_slice, + morton_order_iter, +) +from zarr.v3.metadata import ( + CoreArrayMetadata, + DataType, + ShardingCodecConfigurationMetadata, + ShardingCodecMetadata, +) +from zarr.v3.store import StorePath + +MAX_UINT_64 = 2**64 - 1 + + +class _ShardIndex(NamedTuple): + # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) + offsets_and_lengths: np.ndarray + + def _localize_chunk(self, chunk_coords: ChunkCoords) -> ChunkCoords: + return tuple( + chunk_i % shard_i + for chunk_i, shard_i in zip(chunk_coords, self.offsets_and_lengths.shape) + ) + + def is_all_empty(self) -> bool: + return bool(np.array_equiv(self.offsets_and_lengths, MAX_UINT_64)) + + def get_chunk_slice(self, chunk_coords: ChunkCoords) -> Optional[Tuple[int, int]]: + localized_chunk = self._localize_chunk(chunk_coords) + chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] + if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): + return None + else: + return (int(chunk_start), int(chunk_start + chunk_len)) + + def set_chunk_slice(self, chunk_coords: ChunkCoords, chunk_slice: Optional[slice]) -> None: + localized_chunk = self._localize_chunk(chunk_coords) + if chunk_slice is None: + self.offsets_and_lengths[localized_chunk] = (MAX_UINT_64, MAX_UINT_64) + else: + self.offsets_and_lengths[localized_chunk] = ( + chunk_slice.start, + chunk_slice.stop - chunk_slice.start, + ) + + def is_dense(self, chunk_byte_length: int) -> bool: + sorted_offsets_and_lengths = sorted( + [ + (offset, length) + for offset, length in self.offsets_and_lengths + if offset != MAX_UINT_64 + ], + key=lambda entry: entry[0], + ) + + # Are all non-empty offsets unique? + if len( + set(offset for offset, _ in sorted_offsets_and_lengths if offset != MAX_UINT_64) + ) != len(sorted_offsets_and_lengths): + return False + + return all( + offset % chunk_byte_length == 0 and length == chunk_byte_length + for offset, length in sorted_offsets_and_lengths + ) + + @classmethod + def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardIndex: + offsets_and_lengths = np.zeros(chunks_per_shard + (2,), dtype=" _ShardProxy: + obj = cls() + obj.buf = memoryview(buf) + obj.index = await codec._decode_shard_index(obj.buf[-codec._shard_index_size() :]) + return obj + + @classmethod + def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardProxy: + index = _ShardIndex.create_empty(chunks_per_shard) + obj = cls() + obj.buf = memoryview(b"") + obj.index = index + return obj + + def __getitem__(self, chunk_coords: ChunkCoords) -> Optional[BytesLike]: + chunk_byte_slice = self.index.get_chunk_slice(chunk_coords) + if chunk_byte_slice: + return self.buf[chunk_byte_slice[0] : chunk_byte_slice[1]] + return None + + def __len__(self) -> int: + return int(self.index.offsets_and_lengths.size / 2) + + def __iter__(self) -> Iterator[ChunkCoords]: + return c_order_iter(self.index.offsets_and_lengths.shape[:-1]) + + +class _ShardBuilder(_ShardProxy): + buf: bytearray + index: _ShardIndex + + @classmethod + def merge_with_morton_order( + cls, + chunks_per_shard: ChunkCoords, + tombstones: Set[ChunkCoords], + *shard_dicts: Mapping[ChunkCoords, BytesLike], + ) -> _ShardBuilder: + obj = cls.create_empty(chunks_per_shard) + for chunk_coords in morton_order_iter(chunks_per_shard): + if tombstones is not None and chunk_coords in tombstones: + continue + for shard_dict in shard_dicts: + maybe_value = shard_dict.get(chunk_coords, None) + if maybe_value is not None: + obj.append(chunk_coords, maybe_value) + break + return obj + + @classmethod + def create_empty(cls, chunks_per_shard: ChunkCoords) -> _ShardBuilder: + obj = cls() + obj.buf = bytearray() + obj.index = _ShardIndex.create_empty(chunks_per_shard) + return obj + + def append(self, chunk_coords: ChunkCoords, value: BytesLike): + chunk_start = len(self.buf) + chunk_length = len(value) + self.buf.extend(value) + self.index.set_chunk_slice(chunk_coords, slice(chunk_start, chunk_start + chunk_length)) + + def finalize(self, index_bytes: BytesLike) -> BytesLike: + self.buf.extend(index_bytes) + return self.buf + + +@frozen +class ShardingCodec(ArrayBytesCodec): + array_metadata: CoreArrayMetadata + configuration: ShardingCodecConfigurationMetadata + codec_pipeline: CodecPipeline + index_codec_pipeline: CodecPipeline + chunks_per_shard: Tuple[int, ...] + + @classmethod + def from_metadata( + cls, + codec_metadata: ShardingCodecMetadata, + array_metadata: CoreArrayMetadata, + ) -> ShardingCodec: + chunks_per_shard = tuple( + s // c + for s, c in zip( + array_metadata.chunk_shape, + codec_metadata.configuration.chunk_shape, + ) + ) + # rewriting the metadata to scope it to the shard + shard_metadata = CoreArrayMetadata( + shape=array_metadata.chunk_shape, + chunk_shape=codec_metadata.configuration.chunk_shape, + data_type=array_metadata.data_type, + fill_value=array_metadata.fill_value, + runtime_configuration=array_metadata.runtime_configuration, + ) + codec_pipeline = CodecPipeline.from_metadata( + codec_metadata.configuration.codecs, shard_metadata + ) + index_codec_pipeline = CodecPipeline.from_metadata( + codec_metadata.configuration.index_codecs, + CoreArrayMetadata( + shape=chunks_per_shard + (2,), + chunk_shape=chunks_per_shard + (2,), + data_type=DataType.uint64, + fill_value=MAX_UINT_64, + runtime_configuration=array_metadata.runtime_configuration, + ), + ) + return cls( + array_metadata=array_metadata, + configuration=codec_metadata.configuration, + codec_pipeline=codec_pipeline, + index_codec_pipeline=index_codec_pipeline, + chunks_per_shard=chunks_per_shard, + ) + + async def decode( + self, + shard_bytes: BytesLike, + ) -> np.ndarray: + # print("decode") + shard_shape = self.array_metadata.chunk_shape + chunk_shape = self.configuration.chunk_shape + + indexer = BasicIndexer( + tuple(slice(0, s) for s in shard_shape), + shape=shard_shape, + chunk_shape=chunk_shape, + ) + + # setup output array + out = np.zeros( + shard_shape, + dtype=self.array_metadata.dtype, + order=self.array_metadata.runtime_configuration.order, + ) + shard_dict = await _ShardProxy.from_bytes(shard_bytes, self) + + if shard_dict.index.is_all_empty(): + out.fill(self.array_metadata.fill_value) + return out + + # decoding chunks and writing them into the output buffer + await concurrent_map( + [ + ( + shard_dict, + chunk_coords, + chunk_selection, + out_selection, + out, + ) + for chunk_coords, chunk_selection, out_selection in indexer + ], + self._read_chunk, + self.array_metadata.runtime_configuration.concurrency, + ) + + return out + + async def decode_partial( + self, + store_path: StorePath, + selection: SliceSelection, + ) -> Optional[np.ndarray]: + # print("decode_partial") + shard_shape = self.array_metadata.chunk_shape + chunk_shape = self.configuration.chunk_shape + + indexer = BasicIndexer( + selection, + shape=shard_shape, + chunk_shape=chunk_shape, + ) + + # setup output array + out = np.zeros( + indexer.shape, + dtype=self.array_metadata.dtype, + order=self.array_metadata.runtime_configuration.order, + ) + + indexed_chunks = list(indexer) + all_chunk_coords = set(chunk_coords for chunk_coords, _, _ in indexed_chunks) + + # reading bytes of all requested chunks + shard_dict: Mapping[ChunkCoords, BytesLike] = {} + if self._is_total_shard(all_chunk_coords): + # read entire shard + shard_dict_maybe = await self._load_full_shard_maybe(store_path) + if shard_dict_maybe is None: + return None + shard_dict = shard_dict_maybe + else: + # read some chunks within the shard + shard_index = await self._load_shard_index_maybe(store_path) + if shard_index is None: + return None + shard_dict = {} + for chunk_coords in all_chunk_coords: + chunk_byte_slice = shard_index.get_chunk_slice(chunk_coords) + if chunk_byte_slice: + chunk_bytes = await store_path.get_async(chunk_byte_slice) + if chunk_bytes: + shard_dict[chunk_coords] = chunk_bytes + + # decoding chunks and writing them into the output buffer + await concurrent_map( + [ + ( + shard_dict, + chunk_coords, + chunk_selection, + out_selection, + out, + ) + for chunk_coords, chunk_selection, out_selection in indexed_chunks + ], + self._read_chunk, + self.array_metadata.runtime_configuration.concurrency, + ) + + return out + + async def _read_chunk( + self, + shard_dict: Mapping[ChunkCoords, Optional[BytesLike]], + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + out: np.ndarray, + ): + chunk_bytes = shard_dict.get(chunk_coords, None) + if chunk_bytes is not None: + chunk_array = await self.codec_pipeline.decode(chunk_bytes) + tmp = chunk_array[chunk_selection] + out[out_selection] = tmp + else: + out[out_selection] = self.array_metadata.fill_value + + async def encode( + self, + shard_array: np.ndarray, + ) -> Optional[BytesLike]: + shard_shape = self.array_metadata.chunk_shape + chunk_shape = self.configuration.chunk_shape + + indexer = list( + BasicIndexer( + tuple(slice(0, s) for s in shard_shape), + shape=shard_shape, + chunk_shape=chunk_shape, + ) + ) + + async def _write_chunk( + shard_array: np.ndarray, + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + ) -> Tuple[ChunkCoords, Optional[BytesLike]]: + if is_total_slice(chunk_selection, chunk_shape): + chunk_array = shard_array[out_selection] + else: + # handling writing partial chunks + chunk_array = np.empty( + chunk_shape, + dtype=self.array_metadata.dtype, + ) + chunk_array.fill(self.array_metadata.fill_value) + chunk_array[chunk_selection] = shard_array[out_selection] + if not np.array_equiv(chunk_array, self.array_metadata.fill_value): + return ( + chunk_coords, + await self.codec_pipeline.encode(chunk_array), + ) + return (chunk_coords, None) + + # assembling and encoding chunks within the shard + encoded_chunks: List[Tuple[ChunkCoords, Optional[BytesLike]]] = await concurrent_map( + [ + (shard_array, chunk_coords, chunk_selection, out_selection) + for chunk_coords, chunk_selection, out_selection in indexer + ], + _write_chunk, + self.array_metadata.runtime_configuration.concurrency, + ) + if len(encoded_chunks) == 0: + return None + + shard_builder = _ShardBuilder.create_empty(self.chunks_per_shard) + for chunk_coords, chunk_bytes in encoded_chunks: + if chunk_bytes is not None: + shard_builder.append(chunk_coords, chunk_bytes) + + return shard_builder.finalize(await self._encode_shard_index(shard_builder.index)) + + async def encode_partial( + self, + store_path: StorePath, + shard_array: np.ndarray, + selection: SliceSelection, + ) -> None: + # print("encode_partial") + shard_shape = self.array_metadata.chunk_shape + chunk_shape = self.configuration.chunk_shape + + old_shard_dict = ( + await self._load_full_shard_maybe(store_path) + ) or _ShardProxy.create_empty(self.chunks_per_shard) + new_shard_builder = _ShardBuilder.create_empty(self.chunks_per_shard) + tombstones: Set[ChunkCoords] = set() + + indexer = list( + BasicIndexer( + selection, + shape=shard_shape, + chunk_shape=chunk_shape, + ) + ) + + async def _write_chunk( + chunk_coords: ChunkCoords, + chunk_selection: SliceSelection, + out_selection: SliceSelection, + ) -> Tuple[ChunkCoords, Optional[BytesLike]]: + chunk_array = None + if is_total_slice(chunk_selection, self.configuration.chunk_shape): + chunk_array = shard_array[out_selection] + else: + # handling writing partial chunks + # read chunk first + chunk_bytes = old_shard_dict.get(chunk_coords, None) + + # merge new value + if chunk_bytes is None: + chunk_array = np.empty( + self.configuration.chunk_shape, + dtype=self.array_metadata.dtype, + ) + chunk_array.fill(self.array_metadata.fill_value) + else: + chunk_array = ( + await self.codec_pipeline.decode(chunk_bytes) + ).copy() # make a writable copy + chunk_array[chunk_selection] = shard_array[out_selection] + + if not np.array_equiv(chunk_array, self.array_metadata.fill_value): + return ( + chunk_coords, + await self.codec_pipeline.encode(chunk_array), + ) + else: + return (chunk_coords, None) + + encoded_chunks: List[Tuple[ChunkCoords, Optional[BytesLike]]] = await concurrent_map( + [ + ( + chunk_coords, + chunk_selection, + out_selection, + ) + for chunk_coords, chunk_selection, out_selection in indexer + ], + _write_chunk, + self.array_metadata.runtime_configuration.concurrency, + ) + + for chunk_coords, chunk_bytes in encoded_chunks: + if chunk_bytes is not None: + new_shard_builder.append(chunk_coords, chunk_bytes) + else: + tombstones.add(chunk_coords) + + shard_builder = _ShardBuilder.merge_with_morton_order( + self.chunks_per_shard, tombstones, new_shard_builder, old_shard_dict + ) + + if shard_builder.index.is_all_empty(): + await store_path.delete_async() + else: + await store_path.set_async( + shard_builder.finalize(await self._encode_shard_index(shard_builder.index)) + ) + + def _is_total_shard(self, all_chunk_coords: Set[ChunkCoords]) -> bool: + return len(all_chunk_coords) == product(self.chunks_per_shard) and all( + chunk_coords in all_chunk_coords for chunk_coords in c_order_iter(self.chunks_per_shard) + ) + + async def _decode_shard_index(self, index_bytes: BytesLike) -> _ShardIndex: + return _ShardIndex(await self.index_codec_pipeline.decode(index_bytes)) + + async def _encode_shard_index(self, index: _ShardIndex) -> BytesLike: + index_bytes = await self.index_codec_pipeline.encode(index.offsets_and_lengths) + assert index_bytes is not None + return index_bytes + + def _shard_index_size(self) -> int: + return self.index_codec_pipeline.compute_encoded_size(16 * product(self.chunks_per_shard)) + + async def _load_shard_index_maybe(self, store_path: StorePath) -> Optional[_ShardIndex]: + index_bytes = await store_path.get_async((-self._shard_index_size(), None)) + if index_bytes is not None: + return await self._decode_shard_index(index_bytes) + return None + + async def _load_shard_index(self, store_path: StorePath) -> _ShardIndex: + return (await self._load_shard_index_maybe(store_path)) or _ShardIndex.create_empty( + self.chunks_per_shard + ) + + async def _load_full_shard_maybe(self, store_path: StorePath) -> Optional[_ShardProxy]: + shard_bytes = await store_path.get_async() + + return await _ShardProxy.from_bytes(shard_bytes, self) if shard_bytes else None + + def compute_encoded_size(self, input_byte_length: int) -> int: + return input_byte_length + self._shard_index_size() diff --git a/zarr/v3/store.py b/zarr/v3/store.py new file mode 100644 index 0000000000..f7472c68d2 --- /dev/null +++ b/zarr/v3/store.py @@ -0,0 +1,304 @@ +# TODO: +# 1. Stores should inherit from zarr.v3.abc.store classes +# 2. remove "_async" suffix from all methods? + +# Changes I've made here: +# 1. Make delay import of fsspec + +from __future__ import annotations + +import asyncio +import io +from pathlib import Path +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union + +from zarr.v3.common import BytesLike, to_thread + +if TYPE_CHECKING: + from upath import UPath + from fsspec.asyn import AsyncFileSystem + + +def _dereference_path(root: str, path: str) -> str: + assert isinstance(root, str) + assert isinstance(path, str) + root = root.rstrip("/") + path = f"{root}/{path}" if root != "" else path + path = path.rstrip("/") + return path + + +class StorePath: + store: Store + path: str + + def __init__(self, store: Store, path: Optional[str] = None): + self.store = store + self.path = path or "" + + @classmethod + def from_path(cls, pth: Path) -> StorePath: + return cls(Store.from_path(pth)) + + async def get_async( + self, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + return await self.store.get_async(self.path, byte_range) + + async def set_async( + self, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + await self.store.set_async(self.path, value, byte_range) + + async def delete_async(self) -> None: + await self.store.delete_async(self.path) + + async def exists_async(self) -> bool: + return await self.store.exists_async(self.path) + + def __truediv__(self, other: str) -> StorePath: + return self.__class__(self.store, _dereference_path(self.path, other)) + + def __str__(self) -> str: + return _dereference_path(str(self.store), self.path) + + def __repr__(self) -> str: + return f"StorePath({self.store.__class__.__name__}, {repr(str(self))})" + + +class Store: + supports_partial_writes = False + + @classmethod + def from_path(cls, pth: Path) -> Store: + try: + from upath import UPath + from upath.implementations.local import PosixUPath, WindowsUPath + + if isinstance(pth, UPath) and not isinstance(pth, (PosixUPath, WindowsUPath)): + storage_options = pth._kwargs.copy() + storage_options.pop("_url", None) + return RemoteStore(str(pth), **storage_options) + except ImportError: + pass + + return LocalStore(pth) + + async def multi_get_async( + self, keys: List[Tuple[str, Optional[Tuple[int, int]]]] + ) -> List[Optional[BytesLike]]: + return await asyncio.gather(*[self.get_async(key, byte_range) for key, byte_range in keys]) + + async def get_async( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + raise NotImplementedError + + async def multi_set_async( + self, key_values: List[Tuple[str, BytesLike, Optional[Tuple[int, int]]]] + ) -> None: + await asyncio.gather( + *[self.set_async(key, value, byte_range) for key, value, byte_range in key_values] + ) + + async def set_async( + self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + raise NotImplementedError + + async def delete_async(self, key: str) -> None: + raise NotImplementedError + + async def exists_async(self, key: str) -> bool: + raise NotImplementedError + + def __truediv__(self, other: str) -> StorePath: + return StorePath(self, other) + + +class LocalStore(Store): + supports_partial_writes = True + root: Path + auto_mkdir: bool + + def __init__(self, root: Union[Path, str], auto_mkdir: bool = True): + if isinstance(root, str): + root = Path(root) + assert isinstance(root, Path) + + self.root = root + self.auto_mkdir = auto_mkdir + + def _cat_file( + self, path: Path, start: Optional[int] = None, end: Optional[int] = None + ) -> BytesLike: + if start is None and end is None: + return path.read_bytes() + with path.open("rb") as f: + size = f.seek(0, io.SEEK_END) + if start is not None: + if start >= 0: + f.seek(start) + else: + f.seek(max(0, size + start)) + if end is not None: + if end < 0: + end = size + end + return f.read(end - f.tell()) + return f.read() + + def _put_file( + self, + path: Path, + value: BytesLike, + start: Optional[int] = None, + ): + if self.auto_mkdir: + path.parent.mkdir(parents=True, exist_ok=True) + if start is not None: + with path.open("r+b") as f: + f.seek(start) + f.write(value) + else: + return path.write_bytes(value) + + async def get_async( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + assert isinstance(key, str) + path = self.root / key + + try: + value = await ( + to_thread(self._cat_file, path, byte_range[0], byte_range[1]) + if byte_range is not None + else to_thread(self._cat_file, path) + ) + except (FileNotFoundError, IsADirectoryError, NotADirectoryError): + return None + + return value + + async def set_async( + self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + assert isinstance(key, str) + path = self.root / key + + if byte_range is not None: + await to_thread(self._put_file, path, value, byte_range[0]) + else: + await to_thread(self._put_file, path, value) + + async def delete_async(self, key: str) -> None: + path = self.root / key + await to_thread(path.unlink, True) + + async def exists_async(self, key: str) -> bool: + path = self.root / key + return await to_thread(path.exists) + + def __str__(self) -> str: + return f"file://{self.root}" + + def __repr__(self) -> str: + return f"LocalStore({repr(str(self))})" + + +class RemoteStore(Store): + root: UPath + + def __init__(self, url: Union[UPath, str], **storage_options: Dict[str, Any]): + from upath import UPath + import fsspec + + if isinstance(url, str): + self.root = UPath(url, **storage_options) + else: + assert len(storage_options) == 0, ( + "If constructed with a UPath object, no additional " + + "storage_options are allowed." + ) + self.root = url.rstrip("/") + # test instantiate file system + fs, _ = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) + assert fs.__class__.async_impl, "FileSystem needs to support async operations." + + def make_fs(self) -> Tuple[AsyncFileSystem, str]: + import fsspec + + storage_options = self.root._kwargs.copy() + storage_options.pop("_url", None) + fs, root = fsspec.core.url_to_fs(str(self.root), asynchronous=True, **self.root._kwargs) + assert fs.__class__.async_impl, "FileSystem needs to support async operations." + return fs, root + + async def get_async( + self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None + ) -> Optional[BytesLike]: + assert isinstance(key, str) + fs, root = self.make_fs() + path = _dereference_path(root, key) + + try: + value = await ( + fs._cat_file(path, start=byte_range[0], end=byte_range[1]) + if byte_range + else fs._cat_file(path) + ) + except (FileNotFoundError, IsADirectoryError, NotADirectoryError): + return None + + return value + + async def set_async( + self, key: str, value: BytesLike, byte_range: Optional[Tuple[int, int]] = None + ) -> None: + assert isinstance(key, str) + fs, root = self.make_fs() + path = _dereference_path(root, key) + + # write data + if byte_range: + with fs._open(path, "r+b") as f: + f.seek(byte_range[0]) + f.write(value) + else: + await fs._pipe_file(path, value) + + async def delete_async(self, key: str) -> None: + fs, root = self.make_fs() + path = _dereference_path(root, key) + if await fs._exists(path): + await fs._rm(path) + + async def exists_async(self, key: str) -> bool: + fs, root = self.make_fs() + path = _dereference_path(root, key) + return await fs._exists(path) + + def __str__(self) -> str: + return str(self.root) + + def __repr__(self) -> str: + return f"RemoteStore({repr(str(self))})" + + +StoreLike = Union[Store, StorePath, Path, str] + + +def make_store_path(store_like: StoreLike) -> StorePath: + if isinstance(store_like, StorePath): + return store_like + elif isinstance(store_like, Store): + return StorePath(store_like) + elif isinstance(store_like, Path): + return StorePath(Store.from_path(store_like)) + elif isinstance(store_like, str): + try: + from upath import UPath + + return StorePath(Store.from_path(UPath(store_like))) + except ImportError: + return StorePath(LocalStore(Path(store_like))) + raise TypeError diff --git a/zarr/v3/sync.py b/zarr/v3/sync.py new file mode 100644 index 0000000000..ef3a6e08c0 --- /dev/null +++ b/zarr/v3/sync.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import asyncio +import threading +from typing import Any, Coroutine, List, Optional + +# From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py + +iothread: List[Optional[threading.Thread]] = [None] # dedicated IO thread +loop: List[Optional[asyncio.AbstractEventLoop]] = [ + None +] # global event loop for any non-async instance +_lock: Optional[threading.Lock] = None # global lock placeholder +get_running_loop = asyncio.get_running_loop + + +def _get_lock() -> threading.Lock: + """Allocate or return a threading lock. + + The lock is allocated on first use to allow setting one lock per forked process. + """ + global _lock + if not _lock: + _lock = threading.Lock() + return _lock + + +async def _runner(event: threading.Event, coro: Coroutine, result_box: List[Optional[Any]]): + try: + result_box[0] = await coro + except Exception as ex: + result_box[0] = ex + finally: + event.set() + + +def sync(coro: Coroutine, loop: Optional[asyncio.AbstractEventLoop] = None): + """ + Make loop run coroutine until it returns. Runs in other thread + + Examples + -------- + >>> sync(async_function(), existing_loop) + """ + if loop is None: + # NB: if the loop is not running *yet*, it is OK to submit work + # and we will wait for it + loop = _get_loop() + if loop is None or loop.is_closed(): + raise RuntimeError("Loop is not running") + try: + loop0 = asyncio.events.get_running_loop() + if loop0 is loop: + raise NotImplementedError("Calling sync() from within a running loop") + except RuntimeError: + pass + result_box: List[Optional[Any]] = [None] + event = threading.Event() + asyncio.run_coroutine_threadsafe(_runner(event, coro, result_box), loop) + while True: + # this loops allows thread to get interrupted + if event.wait(1): + break + + return_result = result_box[0] + if isinstance(return_result, BaseException): + raise return_result + else: + return return_result + + +def _get_loop(): + """Create or return the default fsspec IO loop + + The loop will be running on a separate thread. + """ + if loop[0] is None: + with _get_lock(): + # repeat the check just in case the loop got filled between the + # previous two calls from another thread + if loop[0] is None: + loop[0] = asyncio.new_event_loop() + th = threading.Thread(target=loop[0].run_forever, name="zarrIO") + th.daemon = True + th.start() + iothread[0] = th + return loop[0]