Skip to content

Commit

Permalink
Handle the case where an array is created by calling map_blocks wit…
Browse files Browse the repository at this point in the history
…h no input arrays (#343)

* Handle the case where an array is created by calling `map_blocks` with no input arrays

* Use `map_blocks` rather than `map_direct` for creation functions

* Use `map_blocks` rather than `map_direct` for `from_array`
  • Loading branch information
tomwhite authored Jan 5, 2024
1 parent 2827308 commit ba947af
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 32 deletions.
24 changes: 8 additions & 16 deletions cubed/array_api/creation_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from cubed.backend_array_api import namespace as nxp
from cubed.core import Plan, gensym
from cubed.core.ops import map_direct
from cubed.core.ops import map_blocks
from cubed.storage.virtual import (
virtual_empty,
virtual_full,
Expand All @@ -26,18 +26,15 @@ def arange(
if stop is None:
start, stop = 0, start
num = int(max(math.ceil((stop - start) / step), 0))
shape = (num,)
if dtype is None:
dtype = nxp.arange(start, stop, step * num if num else step).dtype
chunks = normalize_chunks(chunks, shape=(num,), dtype=dtype)
chunksize = chunks[0][0]

return map_direct(
return map_blocks(
_arange,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=0,
spec=spec,
size=chunksize,
start=start,
Expand All @@ -47,7 +44,7 @@ def arange(
)


def _arange(x, *arrays, size, start, stop, step, arange_dtype, block_id=None):
def _arange(x, size, start, stop, step, arange_dtype, block_id=None):
i = block_id[0]
blockstart = start + (i * size * step)
blockstop = start + ((i + 1) * size * step)
Expand Down Expand Up @@ -120,19 +117,17 @@ def eye(
chunks = normalize_chunks(chunks, shape=shape, dtype=dtype)
chunksize = to_chunksize(chunks)[0]

return map_direct(
return map_blocks(
_eye,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=0,
spec=spec,
k=k,
chunksize=chunksize,
)


def _eye(x, *arrays, k=None, chunksize=None, block_id=None):
def _eye(x, k=None, chunksize=None, block_id=None):
i, j = block_id
bk = (j - i) * chunksize
if bk - chunksize <= k <= bk + chunksize:
Expand Down Expand Up @@ -198,21 +193,18 @@ def linspace(
if div == 0:
div = 1
step = float(range_) / div
shape = (num,)
if dtype is None:
dtype = np.float64
chunks = normalize_chunks(chunks, shape=shape, dtype=dtype)
chunks = normalize_chunks(chunks, shape=(num,), dtype=dtype)
chunksize = chunks[0][0]

if num == 0:
return asarray(0.0, dtype=dtype, spec=spec)

return map_direct(
return map_blocks(
_linspace,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=0,
spec=spec,
size=chunksize,
start=start,
Expand All @@ -222,7 +214,7 @@ def linspace(
)


def _linspace(x, *arrays, size, start, step, endpoint, linspace_dtype, block_id=None):
def _linspace(x, size, start, step, endpoint, linspace_dtype, block_id=None):
bs = x.shape[0]
i = block_id[0]
adjusted_bs = bs - 1 if endpoint else bs
Expand Down
27 changes: 20 additions & 7 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,19 @@ def from_array(x, chunks="auto", asarray=None, spec=None) -> "Array":
if asarray is None:
asarray = not hasattr(x, "__array_function__")

return map_direct(
return map_blocks(
_from_array,
x,
shape=x.shape,
dtype=x.dtype,
chunks=outchunks,
extra_projected_mem=0,
spec=spec,
input_array=x,
outchunks=outchunks,
asarray=asarray,
)


def _from_array(e, x, outchunks=None, asarray=None, block_id=None):
out = x[get_item(outchunks, block_id)]
def _from_array(block, input_array, outchunks=None, asarray=None, block_id=None):
out = input_array[get_item(outchunks, block_id)]
if asarray:
out = np.asarray(out)
out = numpy_array_to_backend_array(out)
Expand Down Expand Up @@ -457,9 +455,24 @@ def _target_chunk_selection(target_chunks, idx, selection):


def map_blocks(
func, *args: "Array", dtype=None, chunks=None, drop_axis=[], new_axis=None, **kwargs
func,
*args: "Array",
dtype=None,
chunks=None,
drop_axis=[],
new_axis=None,
spec=None,
**kwargs,
) -> "Array":
"""Apply a function to corresponding blocks from multiple input arrays."""

# Handle the case where an array is created by calling `map_blocks` with no input arrays
if len(args) == 0:
from cubed.array_api.creation_functions import empty_virtual_array

shape = tuple(map(sum, chunks))
args = (empty_virtual_array(shape, dtype=dtype, chunks=chunks, spec=spec),)

if has_keyword(func, "block_id"):
from cubed.array_api.creation_functions import offsets_virtual_array

Expand Down
12 changes: 3 additions & 9 deletions cubed/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from cubed.backend_array_api import namespace as nxp
from cubed.backend_array_api import numpy_array_to_backend_array
from cubed.core.ops import map_direct
from cubed.core.ops import map_blocks
from cubed.vendor.dask.array.core import normalize_chunks


Expand All @@ -18,23 +18,17 @@ def random(size, *, chunks=None, spec=None):
numblocks = tuple(map(len, chunks))
root_seed = pyrandom.getrandbits(128)

# no extra memory is projected to be needed since input is an empty array whose
# memory is never allocated, see https://pythonspeed.com/articles/measuring-memory-python/#phantom-memory
extra_projected_mem = 0

return map_direct(
return map_blocks(
_random,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=extra_projected_mem,
spec=spec,
numblocks=numblocks,
root_seed=root_seed,
)


def _random(x, *arrays, numblocks=None, root_seed=None, block_id=None):
def _random(x, numblocks=None, root_seed=None, block_id=None):
stream_id = block_id_to_offset(block_id, numblocks)
rg = Generator(Philox(key=root_seed + stream_id))
out = rg.random(x.shape)
Expand Down
13 changes: 13 additions & 0 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,19 @@ def func(block, block_id=None, c=0):
)


def test_map_blocks_no_array_args(spec, executor):
def func(block, block_id=None):
return nxp.ones_like(block) * int(sum(block_id))

a = cubed.map_blocks(func, dtype="int64", chunks=((5, 3),), spec=spec)
assert a.chunks == ((5, 3),)

assert_array_equal(
a.compute(executor=executor),
np.array([0, 0, 0, 0, 0, 1, 1, 1], dtype="int64"),
)


def test_map_blocks_with_different_block_shapes(spec):
def func(x, y):
return x
Expand Down

0 comments on commit ba947af

Please sign in to comment.