From 4ab3b312ebd5696c7fde98589f00aa086d7a3946 Mon Sep 17 00:00:00 2001 From: Tom White Date: Fri, 7 Jul 2023 16:49:20 +0100 Subject: [PATCH] Surface rechunk intermediates in DAG Assert DAG has a node with intermediate in name Remove intermediate_array from CubedPipeline --- cubed/core/ops.py | 58 +++++++++++---- cubed/core/plan.py | 15 +--- cubed/primitive/blockwise.py | 4 +- cubed/primitive/rechunk.py | 101 ++++++++------------------ cubed/primitive/types.py | 3 +- cubed/runtime/pipeline.py | 46 ++---------- cubed/tests/primitive/test_rechunk.py | 31 ++++---- cubed/tests/test_core.py | 10 +++ 8 files changed, 116 insertions(+), 152 deletions(-) diff --git a/cubed/core/ops.py b/cubed/core/ops.py index ee92a09e..6a64558c 100644 --- a/cubed/core/ops.py +++ b/cubed/core/ops.py @@ -281,7 +281,6 @@ def blockwise( name, "blockwise", pipeline.target_array, - None, pipeline, pipeline.projected_mem + extra_projected_mem, spec.reserved_mem, @@ -595,8 +594,9 @@ def rechunk(x, chunks, target_store=None): spec = x.spec if target_store is None: target_store = new_temp_path(name=name, spec=spec) - temp_store = new_temp_path(name=f"{name}-intermediate", spec=spec) - pipeline = primitive_rechunk( + name_int = f"{name}-int" + temp_store = new_temp_path(name=name_int, spec=spec) + pipelines = primitive_rechunk( x.zarray_maybe_lazy, target_chunks=target_chunks, allowed_mem=spec.allowed_mem, @@ -604,21 +604,49 @@ def rechunk(x, chunks, target_store=None): target_store=target_store, temp_store=temp_store, ) - plan = Plan._new( - name, - "rechunk", - pipeline.target_array, - pipeline.intermediate_array, - pipeline, - pipeline.projected_mem, - spec.reserved_mem, - pipeline.num_tasks, - x, - ) from cubed.array_api import Array - return Array(name, pipeline.target_array, spec, plan) + if len(pipelines) == 1: + pipeline = pipelines[0] + plan = Plan._new( + name, + "rechunk", + pipeline.target_array, + pipeline, + pipeline.projected_mem, + spec.reserved_mem, + pipeline.num_tasks, + x, + ) + return Array(name, pipeline.target_array, spec, plan) + + else: + pipeline1 = pipelines[0] + plan1 = Plan._new( + name_int, + "rechunk", + pipeline1.target_array, + pipeline1, + pipeline1.projected_mem, + spec.reserved_mem, + pipeline1.num_tasks, + x, + ) + x_int = Array(name_int, pipeline1.target_array, spec, plan1) + + pipeline2 = pipelines[1] + plan2 = Plan._new( + name, + "rechunk", + pipeline2.target_array, + pipeline2, + pipeline2.projected_mem, + spec.reserved_mem, + pipeline2.num_tasks, + x_int, + ) + return Array(name, pipeline2.target_array, spec, plan2) def reduction( diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 7f5168a6..7e60810c 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -41,7 +41,6 @@ def _new( name, op_name, target, - intermediate_target=None, pipeline=None, projected_mem=None, reserved_mem=None, @@ -78,16 +77,6 @@ def _new( reserved_mem=reserved_mem, num_tasks=num_tasks, ) - if intermediate_target is not None: - intermediate_name = f"{name}-intermediate" - dag.add_node( - intermediate_name, - name=intermediate_name, - op_name=op_name, - target=intermediate_target, - stack_summaries=stack_summaries, - hidden=True, - ) for x in source_arrays: if hasattr(x, "name"): dag.add_edge(x.name, name) @@ -288,6 +277,8 @@ def visualize( pipeline = d["pipeline"] tooltip += f"\nprojected memory: {memory_repr(pipeline.projected_mem)}" tooltip += f"\ntasks: {pipeline.num_tasks}" + if pipeline.write_chunks is not None: + tooltip += f"\nwrite chunks: {pipeline.write_chunks}" if "stack_summaries" in d and d["stack_summaries"] is not None: # add call stack information stack_summaries = d["stack_summaries"] @@ -416,4 +407,4 @@ def create_zarr_arrays(lazy_zarr_arrays, reserved_mem): ) num_tasks = len(lazy_zarr_arrays) - return CubedPipeline(stages, None, None, None, projected_mem, num_tasks) + return CubedPipeline(stages, None, None, projected_mem, num_tasks, None) diff --git a/cubed/primitive/blockwise.py b/cubed/primitive/blockwise.py index 2be0af83..b0e657e3 100644 --- a/cubed/primitive/blockwise.py +++ b/cubed/primitive/blockwise.py @@ -228,7 +228,7 @@ def blockwise( f"Projected blockwise memory ({projected_mem}) exceeds allowed_mem ({allowed_mem}), including reserved_mem ({reserved_mem})" ) - return CubedPipeline(stages, spec, target_array, None, projected_mem, num_tasks) + return CubedPipeline(stages, spec, target_array, projected_mem, num_tasks, None) # Code for fusing pipelines @@ -281,7 +281,7 @@ def fused_func(*args): projected_mem = max(pipeline1.projected_mem, pipeline2.projected_mem) num_tasks = pipeline2.num_tasks - return CubedPipeline(stages, spec, target_array, None, projected_mem, num_tasks) + return CubedPipeline(stages, spec, target_array, projected_mem, num_tasks, None) # blockwise functions diff --git a/cubed/primitive/rechunk.py b/cubed/primitive/rechunk.py index b8d4de38..7089fd7d 100644 --- a/cubed/primitive/rechunk.py +++ b/cubed/primitive/rechunk.py @@ -1,12 +1,11 @@ from math import ceil, prod -from typing import Any, Dict, List, Optional, Tuple +from typing import List, Optional, Tuple from cubed.primitive.types import CubedArrayProxy, CubedCopySpec, CubedPipeline from cubed.runtime.pipeline import spec_to_pipeline from cubed.storage.zarr import T_ZarrArray, lazy_empty from cubed.types import T_RegularChunks, T_Shape, T_Store from cubed.vendor.rechunker.algorithm import rechunking_plan -from cubed.vendor.rechunker.api import _validate_options def rechunk( @@ -16,7 +15,7 @@ def rechunk( reserved_mem: int, target_store: T_Store, temp_store: Optional[T_Store] = None, -) -> CubedPipeline: +) -> List[CubedPipeline]: """Rechunk a Zarr array to have target_chunks. Parameters @@ -42,83 +41,51 @@ def rechunk( # input and output array chunk/selection, so adjust appropriately rechunker_max_mem = (allowed_mem - reserved_mem) // 4 - copy_specs, intermediate, target = _setup_rechunk( - source=source, + # we assume that rechunker is going to use all the memory it is allowed to + projected_mem = allowed_mem + + read_proxy, int_proxy, write_proxy = _setup_array_rechunk( + source_array=source, target_chunks=target_chunks, max_mem=rechunker_max_mem, target_store=target_store, temp_store=temp_store, ) - # source is a Zarr array, so only a single copy spec - if len(copy_specs) != 1: # pragma: no cover - raise ValueError(f"Source must be a Zarr array, but was {source}") - copy_spec = copy_specs[0] + intermediate = int_proxy.array + target = write_proxy.array - num_tasks = total_chunks(copy_spec.write.array.shape, copy_spec.write.chunks) - if intermediate is not None: - num_tasks += total_chunks(copy_spec.read.array.shape, copy_spec.read.chunks) + if intermediate is None: + copy_spec = CubedCopySpec(read_proxy, write_proxy) + num_tasks = total_chunks(write_proxy.array.shape, write_proxy.chunks) + return [spec_to_pipeline(copy_spec, target, projected_mem, num_tasks)] - # we assume that rechunker is going to use all the memory it is allowed to - projected_mem = allowed_mem - return spec_to_pipeline(copy_spec, target, projected_mem, num_tasks) + else: + # break spec into two if there's an intermediate + copy_spec1 = CubedCopySpec(read_proxy, int_proxy) + num_tasks = total_chunks(copy_spec1.write.array.shape, copy_spec1.write.chunks) + pipeline1 = spec_to_pipeline(copy_spec1, intermediate, projected_mem, num_tasks) + copy_spec2 = CubedCopySpec(int_proxy, write_proxy) + num_tasks = total_chunks(copy_spec2.write.array.shape, copy_spec2.write.chunks) + pipeline2 = spec_to_pipeline(copy_spec2, target, projected_mem, num_tasks) -# from rechunker, but simpler since it only has to handle Zarr arrays -def _setup_rechunk( - source: T_ZarrArray, - target_chunks: T_RegularChunks, - max_mem: int, - target_store: T_Store, - target_options: Optional[Dict[Any, Any]] = None, - temp_store: Optional[T_Store] = None, - temp_options: Optional[Dict[Any, Any]] = None, -) -> Tuple[List[CubedCopySpec], T_ZarrArray, T_ZarrArray]: - if temp_options is None: - temp_options = target_options - target_options = target_options or {} - temp_options = temp_options or {} - - copy_spec = _setup_array_rechunk( - source, - target_chunks, - max_mem, - target_store, - target_options=target_options, - temp_store_or_group=temp_store, - temp_options=temp_options, - ) - intermediate = copy_spec.intermediate.array - target = copy_spec.write.array - return [copy_spec], intermediate, target + return [pipeline1, pipeline2] +# from rechunker, but simpler since it only has to handle Zarr arrays def _setup_array_rechunk( source_array: T_ZarrArray, target_chunks: T_RegularChunks, max_mem: int, - target_store_or_group: T_Store, - target_options: Optional[Dict[Any, Any]] = None, - temp_store_or_group: Optional[T_Store] = None, - temp_options: Optional[Dict[Any, Any]] = None, - name: Optional[str] = None, -) -> CubedCopySpec: - _validate_options(target_options) - _validate_options(temp_options) + target_store: T_Store, + temp_store: Optional[T_Store] = None, +) -> Tuple[CubedArrayProxy, CubedArrayProxy, CubedArrayProxy]: shape = source_array.shape - # source_chunks = ( - # source_array.chunksize - # if isinstance(source_array, dask.array.Array) - # else source_array.chunks - # ) source_chunks = source_array.chunks dtype = source_array.dtype itemsize = dtype.itemsize - if target_chunks is None: - # this is just a pass-through copy - target_chunks = source_chunks - read_chunks, int_chunks, write_chunks = rechunking_plan( shape, source_chunks, @@ -137,32 +104,26 @@ def _setup_array_rechunk( shape, dtype=dtype, chunks=target_chunks, - store=target_store_or_group, - **(target_options or {}), + store=target_store, ) if read_chunks == write_chunks: int_array = None else: # do intermediate store - if temp_store_or_group is None: - raise ValueError( - "A temporary store location must be provided{}.".format( - f" (array={name})" if name else "" - ) - ) + if temp_store is None: + raise ValueError("A temporary store location must be provided.") int_array = lazy_empty( shape, dtype=dtype, chunks=int_chunks, - store=temp_store_or_group, - **(target_options or {}), + store=temp_store, ) read_proxy = CubedArrayProxy(source_array, read_chunks) int_proxy = CubedArrayProxy(int_array, int_chunks) write_proxy = CubedArrayProxy(target_array, write_chunks) - return CubedCopySpec(read_proxy, int_proxy, write_proxy) + return read_proxy, int_proxy, write_proxy def total_chunks(shape: T_Shape, chunks: T_RegularChunks) -> int: diff --git a/cubed/primitive/types.py b/cubed/primitive/types.py index 3abab9d7..3af517cf 100644 --- a/cubed/primitive/types.py +++ b/cubed/primitive/types.py @@ -15,9 +15,9 @@ class CubedPipeline: stages: Sequence[Stage] config: Config target_array: Any - intermediate_array: Optional[Any] projected_mem: int num_tasks: int + write_chunks: Optional[T_RegularChunks] class CubedArrayProxy: @@ -36,5 +36,4 @@ class CubedCopySpec: """Generalisation of rechunker ``CopySpec`` with support for ``LazyZarrArray``.""" read: CubedArrayProxy - intermediate: CubedArrayProxy write: CubedArrayProxy diff --git a/cubed/runtime/pipeline.py b/cubed/runtime/pipeline.py index 93e7bf26..d21bc3a2 100644 --- a/cubed/runtime/pipeline.py +++ b/cubed/runtime/pipeline.py @@ -47,50 +47,20 @@ def copy_read_to_write(chunk_key, *, config: CubedCopySpec): config.write.open()[chunk_key] = data -def copy_read_to_intermediate(chunk_key, *, config: CubedCopySpec): - # workaround limitation of lithops.utils.verify_args - if isinstance(chunk_key, list): - chunk_key = tuple(chunk_key) - data = np.asarray(config.read.open()[chunk_key]) - config.intermediate.open()[chunk_key] = data - - -def copy_intermediate_to_write(chunk_key, *, config: CubedCopySpec): - # workaround limitation of lithops.utils.verify_args - if isinstance(chunk_key, list): - chunk_key = tuple(chunk_key) - data = np.asarray(config.intermediate.open()[chunk_key]) - config.write.open()[chunk_key] = data - - def spec_to_pipeline( spec: CubedCopySpec, target_array: Any, projected_mem: int, num_tasks: int ) -> CubedPipeline: # typing won't work until we start using numpy types shape = spec.read.array.shape # type: ignore - if spec.intermediate.array is None: - stages = [ - Stage( - copy_read_to_write, - gensym("copy_read_to_write"), - mappable=ChunkKeys(shape, spec.write.chunks), - ) - ] - else: - stages = [ - Stage( - copy_read_to_intermediate, - gensym("copy_read_to_intermediate"), - mappable=ChunkKeys(shape, spec.intermediate.chunks), - ), - Stage( - copy_intermediate_to_write, - gensym("copy_intermediate_to_write"), - mappable=ChunkKeys(shape, spec.write.chunks), - ), - ] + stages = [ + Stage( + copy_read_to_write, + gensym("copy_read_to_write"), + mappable=ChunkKeys(shape, spec.write.chunks), + ) + ] return CubedPipeline( - stages, spec, target_array, spec.intermediate.array, projected_mem, num_tasks + stages, spec, target_array, projected_mem, num_tasks, spec.write.chunks ) diff --git a/cubed/tests/primitive/test_rechunk.py b/cubed/tests/primitive/test_rechunk.py index 9eef3ed4..e10f3ad1 100644 --- a/cubed/tests/primitive/test_rechunk.py +++ b/cubed/tests/primitive/test_rechunk.py @@ -24,7 +24,7 @@ def executor(request): 0, (1, 4), 1000, - 1, + (1,), ), # everything still works with 100 bytes of reserved_mem ( @@ -34,7 +34,7 @@ def executor(request): 100, (1, 4), 1000, - 1, + (1,), ), # only enough memory for one source/target chunk ( @@ -44,7 +44,7 @@ def executor(request): 0, (4, 1), 4 * 8 * 4, # elts x itemsize x copies - 8, + (16, 4), ), ], ) @@ -63,7 +63,7 @@ def test_rechunk( target_store = tmp_path / "target.zarr" temp_store = tmp_path / "temp.zarr" - pipeline = rechunk( + pipelines = rechunk( source, target_chunks=target_chunks, allowed_mem=allowed_mem, @@ -72,20 +72,25 @@ def test_rechunk( temp_store=temp_store, ) - assert pipeline.target_array.shape == shape - assert pipeline.target_array.dtype == source.dtype - assert pipeline.target_array.chunks == target_chunks + assert len(pipelines) == len(expected_num_tasks) - assert pipeline.projected_mem == expected_projected_mem + for i, pipeline in enumerate(pipelines): + assert pipeline.target_array.shape == shape + assert pipeline.target_array.dtype == source.dtype - assert pipeline.num_tasks == expected_num_tasks + assert pipeline.projected_mem == expected_projected_mem + + assert pipeline.num_tasks == expected_num_tasks[i] + + last_pipeline = pipelines[-1] + assert last_pipeline.target_array.chunks == target_chunks # create lazy zarr arrays - pipeline.target_array.create() - if pipeline.intermediate_array is not None: - pipeline.intermediate_array.create() + for pipeline in pipelines: + pipeline.target_array.create() - execute_pipeline(pipeline, executor=executor) + for pipeline in pipelines: + execute_pipeline(pipeline, executor=executor) res = zarr.open(target_store) assert_array_equal(res[:], np.ones(shape)) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 99ea8138..6a9cc333 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -220,6 +220,16 @@ def test_rechunk_same_chunks(spec): assert_array_equal(res, np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])) +# see also test_rechunk.py +def test_rechunk_intermediate(tmp_path): + spec = cubed.Spec(tmp_path, allowed_mem=4 * 8 * 4) + a = xp.ones((4, 4), chunks=(1, 4), spec=spec) + b = a.rechunk((4, 1)) + assert_array_equal(b.compute(), np.ones((4, 4))) + intermediates = [n for (n, d) in b.plan.dag.nodes(data=True) if "-int" in d["name"]] + assert len(intermediates) == 1 + + def test_compute_is_idempotent(spec, executor): a = xp.ones((3, 3), chunks=(2, 2), spec=spec) b = xp.negative(a)