Skip to content

Commit

Permalink
Surface rechunk intermediates in DAG
Browse files Browse the repository at this point in the history
Assert DAG has a node with intermediate in name

Remove intermediate_array from CubedPipeline
  • Loading branch information
tomwhite committed Jul 10, 2023
1 parent 2eed1bb commit 4ab3b31
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 152 deletions.
58 changes: 43 additions & 15 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ def blockwise(
name,
"blockwise",
pipeline.target_array,
None,
pipeline,
pipeline.projected_mem + extra_projected_mem,
spec.reserved_mem,
Expand Down Expand Up @@ -595,30 +594,59 @@ def rechunk(x, chunks, target_store=None):
spec = x.spec
if target_store is None:
target_store = new_temp_path(name=name, spec=spec)
temp_store = new_temp_path(name=f"{name}-intermediate", spec=spec)
pipeline = primitive_rechunk(
name_int = f"{name}-int"
temp_store = new_temp_path(name=name_int, spec=spec)
pipelines = primitive_rechunk(
x.zarray_maybe_lazy,
target_chunks=target_chunks,
allowed_mem=spec.allowed_mem,
reserved_mem=spec.reserved_mem,
target_store=target_store,
temp_store=temp_store,
)
plan = Plan._new(
name,
"rechunk",
pipeline.target_array,
pipeline.intermediate_array,
pipeline,
pipeline.projected_mem,
spec.reserved_mem,
pipeline.num_tasks,
x,
)

from cubed.array_api import Array

return Array(name, pipeline.target_array, spec, plan)
if len(pipelines) == 1:
pipeline = pipelines[0]
plan = Plan._new(
name,
"rechunk",
pipeline.target_array,
pipeline,
pipeline.projected_mem,
spec.reserved_mem,
pipeline.num_tasks,
x,
)
return Array(name, pipeline.target_array, spec, plan)

else:
pipeline1 = pipelines[0]
plan1 = Plan._new(
name_int,
"rechunk",
pipeline1.target_array,
pipeline1,
pipeline1.projected_mem,
spec.reserved_mem,
pipeline1.num_tasks,
x,
)
x_int = Array(name_int, pipeline1.target_array, spec, plan1)

pipeline2 = pipelines[1]
plan2 = Plan._new(
name,
"rechunk",
pipeline2.target_array,
pipeline2,
pipeline2.projected_mem,
spec.reserved_mem,
pipeline2.num_tasks,
x_int,
)
return Array(name, pipeline2.target_array, spec, plan2)


def reduction(
Expand Down
15 changes: 3 additions & 12 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def _new(
name,
op_name,
target,
intermediate_target=None,
pipeline=None,
projected_mem=None,
reserved_mem=None,
Expand Down Expand Up @@ -78,16 +77,6 @@ def _new(
reserved_mem=reserved_mem,
num_tasks=num_tasks,
)
if intermediate_target is not None:
intermediate_name = f"{name}-intermediate"
dag.add_node(
intermediate_name,
name=intermediate_name,
op_name=op_name,
target=intermediate_target,
stack_summaries=stack_summaries,
hidden=True,
)
for x in source_arrays:
if hasattr(x, "name"):
dag.add_edge(x.name, name)
Expand Down Expand Up @@ -288,6 +277,8 @@ def visualize(
pipeline = d["pipeline"]
tooltip += f"\nprojected memory: {memory_repr(pipeline.projected_mem)}"
tooltip += f"\ntasks: {pipeline.num_tasks}"
if pipeline.write_chunks is not None:
tooltip += f"\nwrite chunks: {pipeline.write_chunks}"
if "stack_summaries" in d and d["stack_summaries"] is not None:
# add call stack information
stack_summaries = d["stack_summaries"]
Expand Down Expand Up @@ -416,4 +407,4 @@ def create_zarr_arrays(lazy_zarr_arrays, reserved_mem):
)
num_tasks = len(lazy_zarr_arrays)

return CubedPipeline(stages, None, None, None, projected_mem, num_tasks)
return CubedPipeline(stages, None, None, projected_mem, num_tasks, None)
4 changes: 2 additions & 2 deletions cubed/primitive/blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def blockwise(
f"Projected blockwise memory ({projected_mem}) exceeds allowed_mem ({allowed_mem}), including reserved_mem ({reserved_mem})"
)

return CubedPipeline(stages, spec, target_array, None, projected_mem, num_tasks)
return CubedPipeline(stages, spec, target_array, projected_mem, num_tasks, None)


# Code for fusing pipelines
Expand Down Expand Up @@ -281,7 +281,7 @@ def fused_func(*args):
projected_mem = max(pipeline1.projected_mem, pipeline2.projected_mem)
num_tasks = pipeline2.num_tasks

return CubedPipeline(stages, spec, target_array, None, projected_mem, num_tasks)
return CubedPipeline(stages, spec, target_array, projected_mem, num_tasks, None)


# blockwise functions
Expand Down
101 changes: 31 additions & 70 deletions cubed/primitive/rechunk.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from math import ceil, prod
from typing import Any, Dict, List, Optional, Tuple
from typing import List, Optional, Tuple

from cubed.primitive.types import CubedArrayProxy, CubedCopySpec, CubedPipeline
from cubed.runtime.pipeline import spec_to_pipeline
from cubed.storage.zarr import T_ZarrArray, lazy_empty
from cubed.types import T_RegularChunks, T_Shape, T_Store
from cubed.vendor.rechunker.algorithm import rechunking_plan
from cubed.vendor.rechunker.api import _validate_options


def rechunk(
Expand All @@ -16,7 +15,7 @@ def rechunk(
reserved_mem: int,
target_store: T_Store,
temp_store: Optional[T_Store] = None,
) -> CubedPipeline:
) -> List[CubedPipeline]:
"""Rechunk a Zarr array to have target_chunks.
Parameters
Expand All @@ -42,83 +41,51 @@ def rechunk(
# input and output array chunk/selection, so adjust appropriately
rechunker_max_mem = (allowed_mem - reserved_mem) // 4

copy_specs, intermediate, target = _setup_rechunk(
source=source,
# we assume that rechunker is going to use all the memory it is allowed to
projected_mem = allowed_mem

read_proxy, int_proxy, write_proxy = _setup_array_rechunk(
source_array=source,
target_chunks=target_chunks,
max_mem=rechunker_max_mem,
target_store=target_store,
temp_store=temp_store,
)

# source is a Zarr array, so only a single copy spec
if len(copy_specs) != 1: # pragma: no cover
raise ValueError(f"Source must be a Zarr array, but was {source}")
copy_spec = copy_specs[0]
intermediate = int_proxy.array
target = write_proxy.array

num_tasks = total_chunks(copy_spec.write.array.shape, copy_spec.write.chunks)
if intermediate is not None:
num_tasks += total_chunks(copy_spec.read.array.shape, copy_spec.read.chunks)
if intermediate is None:
copy_spec = CubedCopySpec(read_proxy, write_proxy)
num_tasks = total_chunks(write_proxy.array.shape, write_proxy.chunks)
return [spec_to_pipeline(copy_spec, target, projected_mem, num_tasks)]

# we assume that rechunker is going to use all the memory it is allowed to
projected_mem = allowed_mem
return spec_to_pipeline(copy_spec, target, projected_mem, num_tasks)
else:
# break spec into two if there's an intermediate
copy_spec1 = CubedCopySpec(read_proxy, int_proxy)
num_tasks = total_chunks(copy_spec1.write.array.shape, copy_spec1.write.chunks)
pipeline1 = spec_to_pipeline(copy_spec1, intermediate, projected_mem, num_tasks)

copy_spec2 = CubedCopySpec(int_proxy, write_proxy)
num_tasks = total_chunks(copy_spec2.write.array.shape, copy_spec2.write.chunks)
pipeline2 = spec_to_pipeline(copy_spec2, target, projected_mem, num_tasks)

# from rechunker, but simpler since it only has to handle Zarr arrays
def _setup_rechunk(
source: T_ZarrArray,
target_chunks: T_RegularChunks,
max_mem: int,
target_store: T_Store,
target_options: Optional[Dict[Any, Any]] = None,
temp_store: Optional[T_Store] = None,
temp_options: Optional[Dict[Any, Any]] = None,
) -> Tuple[List[CubedCopySpec], T_ZarrArray, T_ZarrArray]:
if temp_options is None:
temp_options = target_options
target_options = target_options or {}
temp_options = temp_options or {}

copy_spec = _setup_array_rechunk(
source,
target_chunks,
max_mem,
target_store,
target_options=target_options,
temp_store_or_group=temp_store,
temp_options=temp_options,
)
intermediate = copy_spec.intermediate.array
target = copy_spec.write.array
return [copy_spec], intermediate, target
return [pipeline1, pipeline2]


# from rechunker, but simpler since it only has to handle Zarr arrays
def _setup_array_rechunk(
source_array: T_ZarrArray,
target_chunks: T_RegularChunks,
max_mem: int,
target_store_or_group: T_Store,
target_options: Optional[Dict[Any, Any]] = None,
temp_store_or_group: Optional[T_Store] = None,
temp_options: Optional[Dict[Any, Any]] = None,
name: Optional[str] = None,
) -> CubedCopySpec:
_validate_options(target_options)
_validate_options(temp_options)
target_store: T_Store,
temp_store: Optional[T_Store] = None,
) -> Tuple[CubedArrayProxy, CubedArrayProxy, CubedArrayProxy]:
shape = source_array.shape
# source_chunks = (
# source_array.chunksize
# if isinstance(source_array, dask.array.Array)
# else source_array.chunks
# )
source_chunks = source_array.chunks
dtype = source_array.dtype
itemsize = dtype.itemsize

if target_chunks is None:
# this is just a pass-through copy
target_chunks = source_chunks

read_chunks, int_chunks, write_chunks = rechunking_plan(
shape,
source_chunks,
Expand All @@ -137,32 +104,26 @@ def _setup_array_rechunk(
shape,
dtype=dtype,
chunks=target_chunks,
store=target_store_or_group,
**(target_options or {}),
store=target_store,
)

if read_chunks == write_chunks:
int_array = None
else:
# do intermediate store
if temp_store_or_group is None:
raise ValueError(
"A temporary store location must be provided{}.".format(
f" (array={name})" if name else ""
)
)
if temp_store is None:
raise ValueError("A temporary store location must be provided.")
int_array = lazy_empty(
shape,
dtype=dtype,
chunks=int_chunks,
store=temp_store_or_group,
**(target_options or {}),
store=temp_store,
)

read_proxy = CubedArrayProxy(source_array, read_chunks)
int_proxy = CubedArrayProxy(int_array, int_chunks)
write_proxy = CubedArrayProxy(target_array, write_chunks)
return CubedCopySpec(read_proxy, int_proxy, write_proxy)
return read_proxy, int_proxy, write_proxy


def total_chunks(shape: T_Shape, chunks: T_RegularChunks) -> int:
Expand Down
3 changes: 1 addition & 2 deletions cubed/primitive/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class CubedPipeline:
stages: Sequence[Stage]
config: Config
target_array: Any
intermediate_array: Optional[Any]
projected_mem: int
num_tasks: int
write_chunks: Optional[T_RegularChunks]


class CubedArrayProxy:
Expand All @@ -36,5 +36,4 @@ class CubedCopySpec:
"""Generalisation of rechunker ``CopySpec`` with support for ``LazyZarrArray``."""

read: CubedArrayProxy
intermediate: CubedArrayProxy
write: CubedArrayProxy
46 changes: 8 additions & 38 deletions cubed/runtime/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,50 +47,20 @@ def copy_read_to_write(chunk_key, *, config: CubedCopySpec):
config.write.open()[chunk_key] = data


def copy_read_to_intermediate(chunk_key, *, config: CubedCopySpec):
# workaround limitation of lithops.utils.verify_args
if isinstance(chunk_key, list):
chunk_key = tuple(chunk_key)
data = np.asarray(config.read.open()[chunk_key])
config.intermediate.open()[chunk_key] = data


def copy_intermediate_to_write(chunk_key, *, config: CubedCopySpec):
# workaround limitation of lithops.utils.verify_args
if isinstance(chunk_key, list):
chunk_key = tuple(chunk_key)
data = np.asarray(config.intermediate.open()[chunk_key])
config.write.open()[chunk_key] = data


def spec_to_pipeline(
spec: CubedCopySpec, target_array: Any, projected_mem: int, num_tasks: int
) -> CubedPipeline:
# typing won't work until we start using numpy types
shape = spec.read.array.shape # type: ignore
if spec.intermediate.array is None:
stages = [
Stage(
copy_read_to_write,
gensym("copy_read_to_write"),
mappable=ChunkKeys(shape, spec.write.chunks),
)
]
else:
stages = [
Stage(
copy_read_to_intermediate,
gensym("copy_read_to_intermediate"),
mappable=ChunkKeys(shape, spec.intermediate.chunks),
),
Stage(
copy_intermediate_to_write,
gensym("copy_intermediate_to_write"),
mappable=ChunkKeys(shape, spec.write.chunks),
),
]
stages = [
Stage(
copy_read_to_write,
gensym("copy_read_to_write"),
mappable=ChunkKeys(shape, spec.write.chunks),
)
]
return CubedPipeline(
stages, spec, target_array, spec.intermediate.array, projected_mem, num_tasks
stages, spec, target_array, projected_mem, num_tasks, spec.write.chunks
)


Expand Down
Loading

0 comments on commit 4ab3b31

Please sign in to comment.