Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add VirtualInMemoryArray that keeps small arrays in memory #336

Merged
merged 5 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions cubed/array_api/creation_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from cubed.backend_array_api import namespace as nxp
from cubed.core import Plan, gensym
from cubed.core.ops import map_direct
from cubed.core.plan import new_temp_path
from cubed.storage.virtual import virtual_empty, virtual_full, virtual_offsets
from cubed.storage.zarr import lazy_from_array
from cubed.storage.virtual import (
virtual_empty,
virtual_full,
virtual_in_memory,
virtual_offsets,
)
from cubed.utils import to_chunksize
from cubed.vendor.dask.array.core import normalize_chunks

Expand Down Expand Up @@ -70,11 +73,9 @@ def asarray(
if dtype is None:
dtype = a.dtype

# write to zarr
chunksize = to_chunksize(normalize_chunks(chunks, shape=a.shape, dtype=dtype))
name = gensym()
zarr_path = new_temp_path(name=name, spec=spec)
target = lazy_from_array(a, dtype=dtype, chunks=chunksize, store=zarr_path)
target = virtual_in_memory(a, chunks=chunksize)

plan = Plan._new(name, "asarray", target)
return Array(name, target, spec, plan)
Expand Down
15 changes: 2 additions & 13 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import networkx as nx

from cubed.backend_array_api import backend_array_to_numpy_array
from cubed.primitive.blockwise import can_fuse_pipelines, fuse
from cubed.runtime.pipeline import visit_nodes
from cubed.runtime.types import CubedPipeline
Expand Down Expand Up @@ -364,19 +363,9 @@ def create_zarr_array(lazy_zarr_array, *, config=None):


def create_zarr_arrays(lazy_zarr_arrays, reserved_mem):
# projected memory is size of largest initial values, or dtype size if there aren't any
# projected memory is size of largest dtype size (for a fill value)
projected_mem = (
max(
[
# TODO: calculate nbytes from size and dtype itemsize
backend_array_to_numpy_array(lza.initial_values).nbytes
if lza.initial_values is not None
else lza.dtype.itemsize
for lza in lazy_zarr_arrays
],
default=0,
)
+ reserved_mem
max([lza.dtype.itemsize for lza in lazy_zarr_arrays], default=0) + reserved_mem
)
num_tasks = len(lazy_zarr_arrays)

Expand Down
45 changes: 45 additions & 0 deletions cubed/storage/virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cubed.backend_array_api import namespace as nxp
from cubed.backend_array_api import numpy_array_to_backend_array
from cubed.types import T_DType, T_RegularChunks, T_Shape
from cubed.utils import memory_repr


class VirtualEmptyArray:
Expand Down Expand Up @@ -97,6 +98,43 @@ def __getitem__(self, key):
)


class VirtualInMemoryArray:
"""A small array that is held in memory but never materialized on disk."""

def __init__(
self,
array: np.ndarray, # TODO: generalise
chunks: T_RegularChunks,
max_nbytes: int = 10**6,
):
if array.nbytes > max_nbytes:
raise ValueError(
f"Size of in memory array is {memory_repr(array.nbytes)} which exceeds maximum of {memory_repr(max_nbytes)}. Consider loading the array from storage using `from_array`."
)
self.array = array
# use an in-memory Zarr array as a template since it normalizes its properties
# and is needed for oindex
template = zarr.empty(
array.shape,
dtype=array.dtype,
chunks=chunks,
store=zarr.storage.MemoryStore(),
)
self.shape = template.shape
self.dtype = template.dtype
self.chunks = template.chunks
self.template = template
if array.size > 0:
template[...] = array

def __getitem__(self, key):
return self.array.__getitem__(key)

@property
def oindex(self):
return self.template.oindex


def _key_to_index_tuple(selection):
if isinstance(selection, slice):
selection = (selection,)
Expand Down Expand Up @@ -131,3 +169,10 @@ def virtual_full(

def virtual_offsets(shape: T_Shape) -> VirtualOffsetsArray:
return VirtualOffsetsArray(shape)


def virtual_in_memory(
array: np.ndarray,
chunks: T_RegularChunks,
) -> VirtualInMemoryArray:
return VirtualInMemoryArray(array, chunks)
16 changes: 1 addition & 15 deletions cubed/storage/zarr.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from typing import Any, Optional, Union
from typing import Any, Union

import zarr
from numpy import ndarray

from cubed.backend_array_api import backend_array_to_numpy_array
from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store


Expand All @@ -21,7 +19,6 @@ def __init__(
dtype: T_DType,
chunks: T_RegularChunks,
store: T_Store,
initial_values: Optional[ndarray] = None,
fill_value: Any = None,
**kwargs,
):
Expand All @@ -35,7 +32,6 @@ def __init__(
self.chunks = template.chunks

self.store = store
self.initial_values = initial_values
self.fill_value = fill_value
self.kwargs = kwargs

Expand All @@ -60,8 +56,6 @@ def create(self, mode: str = "w-") -> zarr.Array:
fill_value=self.fill_value,
**self.kwargs,
)
if self.initial_values is not None and self.initial_values.size > 0:
target[...] = backend_array_to_numpy_array(self.initial_values)
return target

def open(self) -> zarr.Array:
Expand Down Expand Up @@ -91,14 +85,6 @@ def lazy_empty(
return LazyZarrArray(shape, dtype, chunks, store, **kwargs)


def lazy_from_array(
array: ndarray, *, dtype: T_DType, chunks: T_RegularChunks, store: T_Store, **kwargs
) -> LazyZarrArray:
return LazyZarrArray(
array.shape, dtype, chunks, store, initial_values=array, **kwargs
)


def lazy_full(
shape: T_Shape,
fill_value: Any,
Expand Down
16 changes: 1 addition & 15 deletions cubed/tests/storage/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest
from numpy.testing import assert_array_equal

from cubed.storage.zarr import lazy_empty, lazy_from_array, lazy_full
from cubed.storage.zarr import lazy_empty, lazy_full


def test_lazy_empty(tmp_path):
Expand All @@ -18,20 +18,6 @@ def test_lazy_empty(tmp_path):
arr.open()


def test_lazy_from_array(tmp_path):
zarr_path = tmp_path / "lazy.zarr"
a = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]], dtype=int)
arr = lazy_from_array(a, dtype=a.dtype, chunks=(2, 2), store=zarr_path)

assert not zarr_path.exists()
with pytest.raises(ValueError):
arr.open()

arr.create()
assert zarr_path.exists()
assert_array_equal(arr.open()[:], a)


def test_lazy_full(tmp_path):
zarr_path = tmp_path / "lazy.zarr"
arr = lazy_full(
Expand Down
14 changes: 11 additions & 3 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ def modal_executor(request):
return request.param


def test_as_array_fails(spec):
a = np.ones((1000, 1000))
with pytest.raises(
ValueError,
match="Size of in memory array is 8.0 MB which exceeds maximum of 1.0 MB.",
):
xp.asarray(a, chunks=(100, 100), spec=spec)


def test_regular_chunks(spec):
xp.ones((5, 5), chunks=((2, 2, 1), (5,)), spec=spec)
with pytest.raises(ValueError):
Expand Down Expand Up @@ -220,9 +229,8 @@ def test_rechunk_same_chunks(spec):
b = a.rechunk((2, 1))
task_counter = TaskCounter()
res = b.compute(callbacks=[task_counter])
# no tasks except array creation task should have run since chunks are same
num_created_arrays = 1
assert task_counter.value == num_created_arrays
# no tasks should have run since chunks are same
assert task_counter.value == 0

assert_array_equal(res, np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]))

Expand Down
8 changes: 4 additions & 4 deletions cubed/tests/test_executor_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def test_callbacks(spec, executor):
np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]),
)

num_created_arrays = 3
num_created_arrays = 1
assert task_counter.value == num_created_arrays + 4


Expand Down Expand Up @@ -132,12 +132,12 @@ def test_resume(spec, executor):
c = xp.add(a, b)
d = xp.negative(c)

num_created_arrays = 4 # a, b, c, d
num_created_arrays = 2 # c, d
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 8

task_counter = TaskCounter()
c.compute(executor=executor, callbacks=[task_counter], optimize_graph=False)
num_created_arrays = 3 # a, b, c
num_created_arrays = 1 # c
assert task_counter.value == num_created_arrays + 4

# since c has already been computed, when computing d only 4 tasks are run, instead of 8
Expand All @@ -146,7 +146,7 @@ def test_resume(spec, executor):
executor=executor, callbacks=[task_counter], optimize_graph=False, resume=True
)
# the create arrays tasks are run again, even though they exist
num_created_arrays = 4 # a, b, c, d
num_created_arrays = 2 # c, d
assert task_counter.value == num_created_arrays + 4


Expand Down
14 changes: 8 additions & 6 deletions cubed/tests/test_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ def test_fusion(spec):
c = xp.astype(b, np.float32)
d = xp.negative(c)

num_created_arrays = 4 # a, b, c, d
assert d.plan.num_arrays(optimize_graph=False) == num_created_arrays
num_arrays = 4 # a, b, c, d
num_created_arrays = 3 # b, c, d (a is not created on disk)
assert d.plan.num_arrays(optimize_graph=False) == num_arrays
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12
num_created_arrays = 2 # a, d
assert d.plan.num_arrays(optimize_graph=True) == num_created_arrays
num_arrays = 2 # a, d
num_created_arrays = 1 # d (a is not created on disk)
assert d.plan.num_arrays(optimize_graph=True) == num_arrays
assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4

task_counter = TaskCounter()
Expand All @@ -41,9 +43,9 @@ def test_fusion_transpose(spec):
c = xp.astype(b, np.float32)
d = c.T

num_created_arrays = 4 # a, b, c, d
num_created_arrays = 3 # b, c, d
assert d.plan.num_tasks(optimize_graph=False) == num_created_arrays + 12
num_created_arrays = 2 # a, d
num_created_arrays = 1 # d
assert d.plan.num_tasks(optimize_graph=True) == num_created_arrays + 4

task_counter = TaskCounter()
Expand Down