Skip to content

Commit

Permalink
Remove duplicate copy of projected_mem and num_tasks from DAG (#271)
Browse files Browse the repository at this point in the history
Both are already on the `pipeline` object (which is on the DAG).
Also move `reserved_mem` to `pipeline` from DAG.
  • Loading branch information
tomwhite authored Jul 21, 2023
1 parent 1b21371 commit ef38775
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 29 deletions.
13 changes: 1 addition & 12 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ def blockwise(
*zargs,
allowed_mem=spec.allowed_mem,
reserved_mem=spec.reserved_mem,
extra_projected_mem=extra_projected_mem,
target_store=target_store,
shape=shape,
dtype=dtype,
Expand All @@ -282,9 +283,6 @@ def blockwise(
"blockwise",
pipeline.target_array,
pipeline,
pipeline.projected_mem + extra_projected_mem,
spec.reserved_mem,
pipeline.num_tasks,
*source_arrays,
)
from cubed.array_api import Array
Expand Down Expand Up @@ -614,9 +612,6 @@ def rechunk(x, chunks, target_store=None):
"rechunk",
pipeline.target_array,
pipeline,
pipeline.projected_mem,
spec.reserved_mem,
pipeline.num_tasks,
x,
)
return Array(name, pipeline.target_array, spec, plan)
Expand All @@ -628,9 +623,6 @@ def rechunk(x, chunks, target_store=None):
"rechunk",
pipeline1.target_array,
pipeline1,
pipeline1.projected_mem,
spec.reserved_mem,
pipeline1.num_tasks,
x,
)
x_int = Array(name_int, pipeline1.target_array, spec, plan1)
Expand All @@ -641,9 +633,6 @@ def rechunk(x, chunks, target_store=None):
"rechunk",
pipeline2.target_array,
pipeline2,
pipeline2.projected_mem,
spec.reserved_mem,
pipeline2.num_tasks,
x_int,
)
return Array(name, pipeline2.target_array, spec, plan2)
Expand Down
11 changes: 3 additions & 8 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ def _new(
op_name,
target,
pipeline=None,
projected_mem=None,
reserved_mem=None,
num_tasks=None,
*source_arrays,
):
# create an empty DAG or combine from sources
Expand Down Expand Up @@ -73,9 +70,6 @@ def _new(
target=target,
stack_summaries=stack_summaries,
pipeline=pipeline,
projected_mem=projected_mem,
reserved_mem=reserved_mem,
num_tasks=num_tasks,
)
for x in source_arrays:
if hasattr(x, "name"):
Expand Down Expand Up @@ -148,7 +142,6 @@ def create_lazy_zarr_arrays(self, dag):
target=None,
pipeline=pipeline,
projected_mem=pipeline.projected_mem,
reserved_mem=reserved_mem,
num_tasks=pipeline.num_tasks,
)
# make create arrays node a dependency of all lazy array nodes
Expand Down Expand Up @@ -407,4 +400,6 @@ def create_zarr_arrays(lazy_zarr_arrays, reserved_mem):
)
num_tasks = len(lazy_zarr_arrays)

return CubedPipeline(stages, None, None, projected_mem, num_tasks, None)
return CubedPipeline(
stages, None, None, projected_mem, reserved_mem, num_tasks, None
)
2 changes: 1 addition & 1 deletion cubed/extensions/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def on_compute_start(self, dag, resume):
array_name=name,
op_name=node["op_name"],
projected_mem=pipeline.projected_mem,
reserved_mem=node["reserved_mem"],
reserved_mem=pipeline.reserved_mem,
num_tasks=pipeline.num_tasks,
)
)
Expand Down
15 changes: 12 additions & 3 deletions cubed/primitive/blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def blockwise(
new_axes: Optional[Dict[int, int]] = None,
in_names: Optional[List[str]] = None,
out_name: Optional[str] = None,
extra_projected_mem: int = 0,
**kwargs,
):
"""Apply a function across blocks from multiple source Zarr arrays.
Expand All @@ -123,6 +124,9 @@ def blockwise(
The chunks of the output array.
new_axes : dict
New indexes and their dimension lengths
extra_projected_mem : int
Extra memory projected to be needed (in bytes) in addition to the memory used reading
the input arrays and writing the output.
**kwargs : dict
Extra keyword arguments to pass to function
Expand Down Expand Up @@ -209,7 +213,7 @@ def blockwise(
]

# calculate projected memory
projected_mem = reserved_mem
projected_mem = reserved_mem + extra_projected_mem
# inputs
for array in arrays: # inputs
# memory for a compressed and an uncompressed input array chunk
Expand All @@ -228,7 +232,9 @@ def blockwise(
f"Projected blockwise memory ({projected_mem}) exceeds allowed_mem ({allowed_mem}), including reserved_mem ({reserved_mem})"
)

return CubedPipeline(stages, spec, target_array, projected_mem, num_tasks, None)
return CubedPipeline(
stages, spec, target_array, projected_mem, reserved_mem, num_tasks, None
)


# Code for fusing pipelines
Expand Down Expand Up @@ -279,9 +285,12 @@ def fused_func(*args):

target_array = pipeline2.target_array
projected_mem = max(pipeline1.projected_mem, pipeline2.projected_mem)
reserved_mem = max(pipeline1.reserved_mem, pipeline2.reserved_mem)
num_tasks = pipeline2.num_tasks

return CubedPipeline(stages, spec, target_array, projected_mem, num_tasks, None)
return CubedPipeline(
stages, spec, target_array, projected_mem, reserved_mem, num_tasks, None
)


# blockwise functions
Expand Down
12 changes: 9 additions & 3 deletions cubed/primitive/rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,23 @@ def rechunk(
if intermediate is None:
copy_spec = CubedCopySpec(read_proxy, write_proxy)
num_tasks = total_chunks(write_proxy.array.shape, write_proxy.chunks)
return [spec_to_pipeline(copy_spec, target, projected_mem, num_tasks)]
return [
spec_to_pipeline(copy_spec, target, projected_mem, reserved_mem, num_tasks)
]

else:
# break spec into two if there's an intermediate
copy_spec1 = CubedCopySpec(read_proxy, int_proxy)
num_tasks = total_chunks(copy_spec1.write.array.shape, copy_spec1.write.chunks)
pipeline1 = spec_to_pipeline(copy_spec1, intermediate, projected_mem, num_tasks)
pipeline1 = spec_to_pipeline(
copy_spec1, intermediate, projected_mem, reserved_mem, num_tasks
)

copy_spec2 = CubedCopySpec(int_proxy, write_proxy)
num_tasks = total_chunks(copy_spec2.write.array.shape, copy_spec2.write.chunks)
pipeline2 = spec_to_pipeline(copy_spec2, target, projected_mem, num_tasks)
pipeline2 = spec_to_pipeline(
copy_spec2, target, projected_mem, reserved_mem, num_tasks
)

return [pipeline1, pipeline2]

Expand Down
1 change: 1 addition & 0 deletions cubed/primitive/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class CubedPipeline:
config: Config
target_array: Any
projected_mem: int
reserved_mem: int
num_tasks: int
write_chunks: Optional[T_RegularChunks]

Expand Down
14 changes: 12 additions & 2 deletions cubed/runtime/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ def copy_read_to_write(chunk_key: Sequence[slice], *, config: CubedCopySpec) ->


def spec_to_pipeline(
spec: CubedCopySpec, target_array: Any, projected_mem: int, num_tasks: int
spec: CubedCopySpec,
target_array: Any,
projected_mem: int,
reserved_mem: int,
num_tasks: int,
) -> CubedPipeline:
# typing won't work until we start using numpy types
shape = spec.read.array.shape # type: ignore
Expand All @@ -60,7 +64,13 @@ def spec_to_pipeline(
)
]
return CubedPipeline(
stages, spec, target_array, projected_mem, num_tasks, spec.write.chunks
stages,
spec,
target_array,
projected_mem,
reserved_mem,
num_tasks,
spec.write.chunks,
)


Expand Down

0 comments on commit ef38775

Please sign in to comment.