From 09cd2af7b89b893677f4a11e44b676fc4f811b67 Mon Sep 17 00:00:00 2001 From: Tom White Date: Tue, 5 Dec 2023 15:19:38 +0000 Subject: [PATCH 1/5] Add VirtualInMemoryArray that keeps small arrays in memory rather than materializing to disk. --- cubed/array_api/creation_functions.py | 13 ++++----- cubed/storage/virtual.py | 39 +++++++++++++++++++++++++++ cubed/tests/test_core.py | 5 ++-- cubed/tests/test_executor_features.py | 8 +++--- cubed/tests/test_optimization.py | 8 +++--- 5 files changed, 56 insertions(+), 17 deletions(-) diff --git a/cubed/array_api/creation_functions.py b/cubed/array_api/creation_functions.py index 7aa6e0fb..c540bb95 100644 --- a/cubed/array_api/creation_functions.py +++ b/cubed/array_api/creation_functions.py @@ -7,9 +7,12 @@ from cubed.backend_array_api import namespace as nxp from cubed.core import Plan, gensym from cubed.core.ops import map_direct -from cubed.core.plan import new_temp_path -from cubed.storage.virtual import virtual_empty, virtual_full, virtual_offsets -from cubed.storage.zarr import lazy_from_array +from cubed.storage.virtual import ( + virtual_empty, + virtual_full, + virtual_in_memory, + virtual_offsets, +) from cubed.utils import to_chunksize from cubed.vendor.dask.array.core import normalize_chunks @@ -70,11 +73,9 @@ def asarray( if dtype is None: dtype = a.dtype - # write to zarr chunksize = to_chunksize(normalize_chunks(chunks, shape=a.shape, dtype=dtype)) name = gensym() - zarr_path = new_temp_path(name=name, spec=spec) - target = lazy_from_array(a, dtype=dtype, chunks=chunksize, store=zarr_path) + target = virtual_in_memory(a, chunks=chunksize) plan = Plan._new(name, "asarray", target) return Array(name, target, spec, plan) diff --git a/cubed/storage/virtual.py b/cubed/storage/virtual.py index 2a48f6ec..b510f8e1 100644 --- a/cubed/storage/virtual.py +++ b/cubed/storage/virtual.py @@ -97,6 +97,38 @@ def __getitem__(self, key): ) +class VirtualInMemoryArray: + """A small array that is held in memory but never materialized on disk.""" + + def __init__( + self, + array: np.ndarray, # TODO: generalise + chunks: T_RegularChunks, + ): + self.array = array + # use an in-memory Zarr array as a template since it normalizes its properties + # and is needed for oindex + template = zarr.empty( + array.shape, + dtype=array.dtype, + chunks=chunks, + store=zarr.storage.MemoryStore(), + ) + self.shape = template.shape + self.dtype = template.dtype + self.chunks = template.chunks + self.template = template + if array.size > 0: + template[...] = array + + def __getitem__(self, key): + return self.array.__getitem__(key) + + @property + def oindex(self): + return self.template.oindex + + def _key_to_index_tuple(selection): if isinstance(selection, slice): selection = (selection,) @@ -131,3 +163,10 @@ def virtual_full( def virtual_offsets(shape: T_Shape) -> VirtualOffsetsArray: return VirtualOffsetsArray(shape) + + +def virtual_in_memory( + array: np.ndarray, + chunks: T_RegularChunks, +) -> VirtualInMemoryArray: + return VirtualInMemoryArray(array, chunks) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 85e3a3f5..8518c8e7 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -220,9 +220,8 @@ def test_rechunk_same_chunks(spec): b = a.rechunk((2, 1)) task_counter = TaskCounter() res = b.compute(callbacks=[task_counter]) - # no tasks except array creation task should have run since chunks are same - num_created_arrays = 1 - assert task_counter.value == num_created_arrays + # no tasks should have run since chunks are same + assert task_counter.value == 0 assert_array_equal(res, np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])) diff --git a/cubed/tests/test_executor_features.py b/cubed/tests/test_executor_features.py index 944640d4..dcc8e2b4 100644 --- a/cubed/tests/test_executor_features.py +++ b/cubed/tests/test_executor_features.py @@ -93,7 +93,7 @@ def test_callbacks(spec, executor): np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]), ) - num_created_arrays = 3 + num_created_arrays = 1 assert task_counter.value == num_created_arrays + 4 @@ -132,12 +132,12 @@ def test_resume(spec, executor): c = xp.add(a, b) d = xp.negative(c) - num_created_arrays = 4 # a, b, c, d + num_created_arrays = 2 # c, d assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 8 task_counter = TaskCounter() c.compute(executor=executor, callbacks=[task_counter], optimize_graph=False) - num_created_arrays = 3 # a, b, c + num_created_arrays = 1 # c assert task_counter.value == num_created_arrays + 4 # since c has already been computed, when computing d only 4 tasks are run, instead of 8 @@ -146,7 +146,7 @@ def test_resume(spec, executor): executor=executor, callbacks=[task_counter], optimize_graph=False, resume=True ) # the create arrays tasks are run again, even though they exist - num_created_arrays = 4 # a, b, c, d + num_created_arrays = 2 # c, d assert task_counter.value == num_created_arrays + 4 diff --git a/cubed/tests/test_optimization.py b/cubed/tests/test_optimization.py index 7a538ce6..8186bbd4 100644 --- a/cubed/tests/test_optimization.py +++ b/cubed/tests/test_optimization.py @@ -18,10 +18,10 @@ def test_fusion(spec): c = xp.astype(b, np.float32) d = xp.negative(c) - num_created_arrays = 4 # a, b, c, d + num_created_arrays = 3 # b, c, d assert d.plan.num_arrays(optimize_graph=False) == num_created_arrays assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12 - num_created_arrays = 2 # a, d + num_created_arrays = 1 # d assert d.plan.num_arrays(optimize_graph=True) == num_created_arrays assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4 @@ -41,9 +41,9 @@ def test_fusion_transpose(spec): c = xp.astype(b, np.float32) d = c.T - num_created_arrays = 4 # a, b, c, d + num_created_arrays = 3 # b, c, d assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12 - num_created_arrays = 2 # a, d + num_created_arrays = 1 # d assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4 task_counter = TaskCounter() From 88a7add01a31c46a48c5bad7ada8cb4b0c8047c8 Mon Sep 17 00:00:00 2001 From: Tom White Date: Sat, 9 Dec 2023 10:36:12 +0000 Subject: [PATCH 2/5] Fail if array created with `asarray` is greater than 1MB --- cubed/storage/virtual.py | 6 ++++++ cubed/tests/test_core.py | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/cubed/storage/virtual.py b/cubed/storage/virtual.py index b510f8e1..2b3bf489 100644 --- a/cubed/storage/virtual.py +++ b/cubed/storage/virtual.py @@ -8,6 +8,7 @@ from cubed.backend_array_api import namespace as nxp from cubed.backend_array_api import numpy_array_to_backend_array from cubed.types import T_DType, T_RegularChunks, T_Shape +from cubed.utils import memory_repr class VirtualEmptyArray: @@ -104,7 +105,12 @@ def __init__( self, array: np.ndarray, # TODO: generalise chunks: T_RegularChunks, + max_nbytes: int = 10**6, ): + if array.nbytes > max_nbytes: + raise ValueError( + f"Size of in memory array is {memory_repr(array.nbytes)} which exceeds maximum of {memory_repr(max_nbytes)}. Consider loading the array from storage using `from_array`." + ) self.array = array # use an in-memory Zarr array as a template since it normalizes its properties # and is needed for oindex diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 8518c8e7..cc98566f 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -40,6 +40,15 @@ def modal_executor(request): return request.param +def test_as_array_fails(spec): + a = np.ones((1000, 1000)) + with pytest.raises( + ValueError, + match="Size of in memory array is 8.0 MB which exceeds maximum of 1.0 MB.", + ): + xp.asarray(a, chunks=(100, 100), spec=spec) + + def test_regular_chunks(spec): xp.ones((5, 5), chunks=((2, 2, 1), (5,)), spec=spec) with pytest.raises(ValueError): From 31722bc087035b7093851f0c93a40e88d51cfc4e Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 21 Dec 2023 14:26:02 +0000 Subject: [PATCH 3/5] Remove `initial_values` in LazyZarrArray as it is no longer used --- cubed/core/plan.py | 13 +------------ cubed/storage/zarr.py | 16 +--------------- cubed/tests/storage/test_zarr.py | 16 +--------------- 3 files changed, 3 insertions(+), 42 deletions(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 2df1e185..546c0389 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -6,7 +6,6 @@ import networkx as nx -from cubed.backend_array_api import backend_array_to_numpy_array from cubed.primitive.blockwise import can_fuse_pipelines, fuse from cubed.runtime.pipeline import visit_nodes from cubed.runtime.types import CubedPipeline @@ -366,17 +365,7 @@ def create_zarr_array(lazy_zarr_array, *, config=None): def create_zarr_arrays(lazy_zarr_arrays, reserved_mem): # projected memory is size of largest initial values, or dtype size if there aren't any projected_mem = ( - max( - [ - # TODO: calculate nbytes from size and dtype itemsize - backend_array_to_numpy_array(lza.initial_values).nbytes - if lza.initial_values is not None - else lza.dtype.itemsize - for lza in lazy_zarr_arrays - ], - default=0, - ) - + reserved_mem + max([lza.dtype.itemsize for lza in lazy_zarr_arrays], default=0) + reserved_mem ) num_tasks = len(lazy_zarr_arrays) diff --git a/cubed/storage/zarr.py b/cubed/storage/zarr.py index 60c33874..d1f17e05 100644 --- a/cubed/storage/zarr.py +++ b/cubed/storage/zarr.py @@ -1,9 +1,7 @@ -from typing import Any, Optional, Union +from typing import Any, Union import zarr -from numpy import ndarray -from cubed.backend_array_api import backend_array_to_numpy_array from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store @@ -21,7 +19,6 @@ def __init__( dtype: T_DType, chunks: T_RegularChunks, store: T_Store, - initial_values: Optional[ndarray] = None, fill_value: Any = None, **kwargs, ): @@ -35,7 +32,6 @@ def __init__( self.chunks = template.chunks self.store = store - self.initial_values = initial_values self.fill_value = fill_value self.kwargs = kwargs @@ -60,8 +56,6 @@ def create(self, mode: str = "w-") -> zarr.Array: fill_value=self.fill_value, **self.kwargs, ) - if self.initial_values is not None and self.initial_values.size > 0: - target[...] = backend_array_to_numpy_array(self.initial_values) return target def open(self) -> zarr.Array: @@ -91,14 +85,6 @@ def lazy_empty( return LazyZarrArray(shape, dtype, chunks, store, **kwargs) -def lazy_from_array( - array: ndarray, *, dtype: T_DType, chunks: T_RegularChunks, store: T_Store, **kwargs -) -> LazyZarrArray: - return LazyZarrArray( - array.shape, dtype, chunks, store, initial_values=array, **kwargs - ) - - def lazy_full( shape: T_Shape, fill_value: Any, diff --git a/cubed/tests/storage/test_zarr.py b/cubed/tests/storage/test_zarr.py index c7da964b..e4a6de44 100644 --- a/cubed/tests/storage/test_zarr.py +++ b/cubed/tests/storage/test_zarr.py @@ -2,7 +2,7 @@ import pytest from numpy.testing import assert_array_equal -from cubed.storage.zarr import lazy_empty, lazy_from_array, lazy_full +from cubed.storage.zarr import lazy_empty, lazy_full def test_lazy_empty(tmp_path): @@ -18,20 +18,6 @@ def test_lazy_empty(tmp_path): arr.open() -def test_lazy_from_array(tmp_path): - zarr_path = tmp_path / "lazy.zarr" - a = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=int) - arr = lazy_from_array(a, dtype=a.dtype, chunks=(2, 2), store=zarr_path) - - assert not zarr_path.exists() - with pytest.raises(ValueError): - arr.open() - - arr.create() - assert zarr_path.exists() - assert_array_equal(arr.open()[:], a) - - def test_lazy_full(tmp_path): zarr_path = tmp_path / "lazy.zarr" arr = lazy_full( From 47347b49d72ab8586ce0ed514e952f76f2cacd95 Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 21 Dec 2023 15:46:58 +0000 Subject: [PATCH 4/5] Update test for `num_arrays` --- cubed/tests/test_optimization.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cubed/tests/test_optimization.py b/cubed/tests/test_optimization.py index 8186bbd4..803a8777 100644 --- a/cubed/tests/test_optimization.py +++ b/cubed/tests/test_optimization.py @@ -18,11 +18,13 @@ def test_fusion(spec): c = xp.astype(b, np.float32) d = xp.negative(c) - num_created_arrays = 3 # b, c, d - assert d.plan.num_arrays(optimize_graph=False) == num_created_arrays + num_arrays = 4 # a, b, c, d + num_created_arrays = 3 # b, c, d (a is not created on disk) + assert d.plan.num_arrays(optimize_graph=False) == num_arrays assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12 - num_created_arrays = 1 # d - assert d.plan.num_arrays(optimize_graph=True) == num_created_arrays + num_arrays = 2 # a, d + num_created_arrays = 1 # d (a is not created on disk) + assert d.plan.num_arrays(optimize_graph=True) == num_arrays assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4 task_counter = TaskCounter() From 59dcdf37763d5b968789e00ca52c37e9ffb86255 Mon Sep 17 00:00:00 2001 From: Tom White Date: Fri, 22 Dec 2023 09:17:52 +0000 Subject: [PATCH 5/5] Update comment --- cubed/core/plan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 546c0389..bf5091fd 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -363,7 +363,7 @@ def create_zarr_array(lazy_zarr_array, *, config=None): def create_zarr_arrays(lazy_zarr_arrays, reserved_mem): - # projected memory is size of largest initial values, or dtype size if there aren't any + # projected memory is size of largest dtype size (for a fill value) projected_mem = ( max([lza.dtype.itemsize for lza in lazy_zarr_arrays], default=0) + reserved_mem )