Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle the case where an array is created by calling map_blocks with no input arrays #343

Merged
merged 3 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions cubed/array_api/creation_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from cubed.backend_array_api import namespace as nxp
from cubed.core import Plan, gensym
from cubed.core.ops import map_direct
from cubed.core.ops import map_blocks
from cubed.storage.virtual import (
virtual_empty,
virtual_full,
Expand All @@ -26,18 +26,15 @@ def arange(
if stop is None:
start, stop = 0, start
num = int(max(math.ceil((stop - start) / step), 0))
shape = (num,)
if dtype is None:
dtype = nxp.arange(start, stop, step * num if num else step).dtype
chunks = normalize_chunks(chunks, shape=(num,), dtype=dtype)
chunksize = chunks[0][0]

return map_direct(
return map_blocks(
_arange,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=0,
spec=spec,
size=chunksize,
start=start,
Expand All @@ -47,7 +44,7 @@ def arange(
)


def _arange(x, *arrays, size, start, stop, step, arange_dtype, block_id=None):
def _arange(x, size, start, stop, step, arange_dtype, block_id=None):
i = block_id[0]
blockstart = start + (i * size * step)
blockstop = start + ((i + 1) * size * step)
Expand Down Expand Up @@ -120,19 +117,17 @@ def eye(
chunks = normalize_chunks(chunks, shape=shape, dtype=dtype)
chunksize = to_chunksize(chunks)[0]

return map_direct(
return map_blocks(
_eye,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=0,
spec=spec,
k=k,
chunksize=chunksize,
)


def _eye(x, *arrays, k=None, chunksize=None, block_id=None):
def _eye(x, k=None, chunksize=None, block_id=None):
i, j = block_id
bk = (j - i) * chunksize
if bk - chunksize <= k <= bk + chunksize:
Expand Down Expand Up @@ -198,21 +193,18 @@ def linspace(
if div == 0:
div = 1
step = float(range_) / div
shape = (num,)
if dtype is None:
dtype = np.float64
chunks = normalize_chunks(chunks, shape=shape, dtype=dtype)
chunks = normalize_chunks(chunks, shape=(num,), dtype=dtype)
chunksize = chunks[0][0]

if num == 0:
return asarray(0.0, dtype=dtype, spec=spec)

return map_direct(
return map_blocks(
_linspace,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=0,
spec=spec,
size=chunksize,
start=start,
Expand All @@ -222,7 +214,7 @@ def linspace(
)


def _linspace(x, *arrays, size, start, step, endpoint, linspace_dtype, block_id=None):
def _linspace(x, size, start, step, endpoint, linspace_dtype, block_id=None):
bs = x.shape[0]
i = block_id[0]
adjusted_bs = bs - 1 if endpoint else bs
Expand Down
27 changes: 20 additions & 7 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,19 @@ def from_array(x, chunks="auto", asarray=None, spec=None) -> "Array":
if asarray is None:
asarray = not hasattr(x, "__array_function__")

return map_direct(
return map_blocks(
_from_array,
x,
shape=x.shape,
dtype=x.dtype,
chunks=outchunks,
extra_projected_mem=0,
spec=spec,
input_array=x,
outchunks=outchunks,
asarray=asarray,
)


def _from_array(e, x, outchunks=None, asarray=None, block_id=None):
out = x[get_item(outchunks, block_id)]
def _from_array(block, input_array, outchunks=None, asarray=None, block_id=None):
out = input_array[get_item(outchunks, block_id)]
if asarray:
out = np.asarray(out)
out = numpy_array_to_backend_array(out)
Expand Down Expand Up @@ -457,9 +455,24 @@ def _target_chunk_selection(target_chunks, idx, selection):


def map_blocks(
func, *args: "Array", dtype=None, chunks=None, drop_axis=[], new_axis=None, **kwargs
func,
*args: "Array",
dtype=None,
chunks=None,
drop_axis=[],
new_axis=None,
spec=None,
**kwargs,
) -> "Array":
"""Apply a function to corresponding blocks from multiple input arrays."""

# Handle the case where an array is created by calling `map_blocks` with no input arrays
if len(args) == 0:
from cubed.array_api.creation_functions import empty_virtual_array

shape = tuple(map(sum, chunks))
args = (empty_virtual_array(shape, dtype=dtype, chunks=chunks, spec=spec),)

if has_keyword(func, "block_id"):
from cubed.array_api.creation_functions import offsets_virtual_array

Expand Down
12 changes: 3 additions & 9 deletions cubed/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from cubed.backend_array_api import namespace as nxp
from cubed.backend_array_api import numpy_array_to_backend_array
from cubed.core.ops import map_direct
from cubed.core.ops import map_blocks
from cubed.vendor.dask.array.core import normalize_chunks


Expand All @@ -18,23 +18,17 @@ def random(size, *, chunks=None, spec=None):
numblocks = tuple(map(len, chunks))
root_seed = pyrandom.getrandbits(128)

# no extra memory is projected to be needed since input is an empty array whose
# memory is never allocated, see https://pythonspeed.com/articles/measuring-memory-python/#phantom-memory
extra_projected_mem = 0

return map_direct(
return map_blocks(
_random,
shape=shape,
dtype=dtype,
chunks=chunks,
extra_projected_mem=extra_projected_mem,
spec=spec,
numblocks=numblocks,
root_seed=root_seed,
)


def _random(x, *arrays, numblocks=None, root_seed=None, block_id=None):
def _random(x, numblocks=None, root_seed=None, block_id=None):
stream_id = block_id_to_offset(block_id, numblocks)
rg = Generator(Philox(key=root_seed + stream_id))
out = rg.random(x.shape)
Expand Down
13 changes: 13 additions & 0 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,19 @@ def func(block, block_id=None, c=0):
)


def test_map_blocks_no_array_args(spec, executor):
def func(block, block_id=None):
return nxp.ones_like(block) * int(sum(block_id))

a = cubed.map_blocks(func, dtype="int64", chunks=((5, 3),), spec=spec)
assert a.chunks == ((5, 3),)

assert_array_equal(
a.compute(executor=executor),
np.array([0, 0, 0, 0, 0, 1, 1, 1], dtype="int64"),
)


def test_map_blocks_with_different_block_shapes(spec):
def func(x, y):
return x
Expand Down