diff --git a/.github/workflows/slow-tests.yml b/.github/workflows/slow-tests.yml index 3dd5393e..1cf63ab9 100644 --- a/.github/workflows/slow-tests.yml +++ b/.github/workflows/slow-tests.yml @@ -1,6 +1,7 @@ name: Slow tests on: + pull_request: schedule: # Every weekday at 03:49 UTC, see https://crontab.guru/ - cron: "49 3 * * 1-5" diff --git a/cubed/runtime/executors/local.py b/cubed/runtime/executors/local.py index ebb18aa3..588cdd71 100644 --- a/cubed/runtime/executors/local.py +++ b/cubed/runtime/executors/local.py @@ -179,11 +179,19 @@ async def async_execute_dag( if spec is not None: check_runtime_memory(spec, max_workers) if use_processes: + max_tasks_per_child = kwargs.pop("max_tasks_per_child", None) context = multiprocessing.get_context("spawn") # max_tasks_per_child is only supported from Python 3.11 - concurrent_executor = ProcessPoolExecutor( - max_workers=max_workers, mp_context=context, max_tasks_per_child=1 - ) + if max_tasks_per_child is None: + concurrent_executor = ProcessPoolExecutor( + max_workers=max_workers, mp_context=context + ) + else: + concurrent_executor = ProcessPoolExecutor( + max_workers=max_workers, + mp_context=context, + max_tasks_per_child=max_tasks_per_child, + ) else: concurrent_executor = ThreadPoolExecutor(max_workers=max_workers) try: diff --git a/cubed/tests/test_mem_utilization.py b/cubed/tests/test_mem_utilization.py index 85f067c8..c26ef8cf 100644 --- a/cubed/tests/test_mem_utilization.py +++ b/cubed/tests/test_mem_utilization.py @@ -1,32 +1,60 @@ import math +import platform import shutil +import sys from functools import partial, reduce import pytest -from cubed.core.ops import partial_reduce -from cubed.core.optimization import multiple_inputs_optimize_dag - -pytest.importorskip("lithops") - import cubed import cubed.array_api as xp import cubed.random from cubed.backend_array_api import namespace as nxp +from cubed.core.ops import partial_reduce +from cubed.core.optimization import multiple_inputs_optimize_dag from cubed.extensions.history import HistoryCallback from cubed.extensions.mem_warn import MemoryWarningCallback -from cubed.runtime.executors.lithops import LithopsExecutor +from cubed.runtime.create import create_executor from cubed.tests.utils import LITHOPS_LOCAL_CONFIG +ALLOWED_MEM = 2_000_000_000 + +EXECUTORS = {} + +if platform.system() != "Windows": + EXECUTORS["processes"] = create_executor("processes") + + # Run with max_tasks_per_child=1 so that each task is run in a new process, + # allowing us to perform a stronger check on peak memory + if sys.version_info >= (3, 11): + executor_options = dict(max_tasks_per_child=1) + EXECUTORS["processes-single-task"] = create_executor( + "processes", executor_options + ) + +try: + executor_options = dict(config=LITHOPS_LOCAL_CONFIG, wait_dur_sec=0.1) + EXECUTORS["lithops"] = create_executor("lithops", executor_options) +except ImportError: + pass + @pytest.fixture() def spec(tmp_path, reserved_mem): - return cubed.Spec(tmp_path, allowed_mem=2_000_000_000, reserved_mem=reserved_mem) + return cubed.Spec(tmp_path, allowed_mem=ALLOWED_MEM, reserved_mem=reserved_mem) + + +@pytest.fixture( + scope="module", + params=EXECUTORS.values(), + ids=EXECUTORS.keys(), +) +def executor(request): + return request.param @pytest.fixture(scope="module") -def reserved_mem(): - executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG) +def reserved_mem(executor): res = cubed.measure_reserved_mem(executor) * 1.1 # add some wiggle room return round_up_to_multiple(res, 10_000_000) # round up to nearest multiple of 10MB @@ -40,46 +68,46 @@ def round_up_to_multiple(x, multiple=10): @pytest.mark.slow -def test_index(tmp_path, spec): +def test_index(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = a[1:, :] - run_operation(tmp_path, "index", b) + run_operation(tmp_path, executor, "index", b) @pytest.mark.slow -def test_index_step(tmp_path, spec): +def test_index_step(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = a[::2, :] - run_operation(tmp_path, "index_step", b) + run_operation(tmp_path, executor, "index_step", b) # Creation Functions @pytest.mark.slow -def test_eye(tmp_path, spec): +def test_eye(tmp_path, spec, executor): a = xp.eye(10000, 10000, chunks=(5000, 5000), spec=spec) - run_operation(tmp_path, "eye", a) + run_operation(tmp_path, executor, "eye", a) @pytest.mark.slow -def test_tril(tmp_path, spec): +def test_tril(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = xp.tril(a) - run_operation(tmp_path, "tril", b) + run_operation(tmp_path, executor, "tril", b) # Elementwise Functions @pytest.mark.slow -def test_add(tmp_path, spec): +def test_add(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks @@ -87,11 +115,11 @@ def test_add(tmp_path, spec): (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks c = xp.add(a, b) - run_operation(tmp_path, "add", c) + run_operation(tmp_path, executor, "add", c) @pytest.mark.slow -def test_add_reduce_left(tmp_path, spec): +def test_add_reduce_left(tmp_path, spec, executor): # Perform the `add` operation repeatedly on pairs of arrays, also known as fold left. # See https://en.wikipedia.org/wiki/Fold_(higher-order_function) # @@ -111,11 +139,13 @@ def test_add_reduce_left(tmp_path, spec): ] result = reduce(lambda x, y: xp.add(x, y), arrs) opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2) - run_operation(tmp_path, "add_reduce_left", result, optimize_function=opt_fn) + run_operation( + tmp_path, executor, "add_reduce_left", result, optimize_function=opt_fn + ) @pytest.mark.slow -def test_add_reduce_right(tmp_path, spec): +def test_add_reduce_right(tmp_path, spec, executor): # Perform the `add` operation repeatedly on pairs of arrays, also known as fold right. # See https://en.wikipedia.org/wiki/Fold_(higher-order_function) # @@ -137,23 +167,25 @@ def test_add_reduce_right(tmp_path, spec): ] result = reduce(lambda x, y: xp.add(y, x), reversed(arrs)) opt_fn = partial(multiple_inputs_optimize_dag, max_total_source_arrays=n_arrays * 2) - run_operation(tmp_path, "add_reduce_right", result, optimize_function=opt_fn) + run_operation( + tmp_path, executor, "add_reduce_right", result, optimize_function=opt_fn + ) @pytest.mark.slow -def test_negative(tmp_path, spec): +def test_negative(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = xp.negative(a) - run_operation(tmp_path, "negative", b) + run_operation(tmp_path, executor, "negative", b) # Linear Algebra Functions @pytest.mark.slow -def test_matmul(tmp_path, spec): +def test_matmul(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks @@ -163,20 +195,20 @@ def test_matmul(tmp_path, spec): c = xp.astype(a, xp.float32) d = xp.astype(b, xp.float32) e = xp.matmul(c, d) - run_operation(tmp_path, "matmul", e) + run_operation(tmp_path, executor, "matmul", e) @pytest.mark.slow -def test_matrix_transpose(tmp_path, spec): +def test_matrix_transpose(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = xp.matrix_transpose(a) - run_operation(tmp_path, "matrix_transpose", b) + run_operation(tmp_path, executor, "matrix_transpose", b) @pytest.mark.slow -def test_tensordot(tmp_path, spec): +def test_tensordot(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks @@ -186,14 +218,14 @@ def test_tensordot(tmp_path, spec): c = xp.astype(a, xp.float32) d = xp.astype(b, xp.float32) e = xp.tensordot(c, d, axes=1) - run_operation(tmp_path, "tensordot", e) + run_operation(tmp_path, executor, "tensordot", e) # Manipulation Functions @pytest.mark.slow -def test_concat(tmp_path, spec): +def test_concat(tmp_path, spec, executor): # Note 'a' has one fewer element in axis=0 to force chunking to cross array boundaries a = cubed.random.random( (9999, 10000), chunks=(5000, 5000), spec=spec @@ -202,22 +234,22 @@ def test_concat(tmp_path, spec): (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks c = xp.concat((a, b), axis=0) - run_operation(tmp_path, "concat", c) + run_operation(tmp_path, executor, "concat", c) @pytest.mark.slow -def test_reshape(tmp_path, spec): +def test_reshape(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks # need intermediate reshape due to limitations in Dask's reshape_rechunk b = xp.reshape(a, (5000, 2, 10000)) c = xp.reshape(b, (5000, 20000)) - run_operation(tmp_path, "reshape", c) + run_operation(tmp_path, executor, "reshape", c) @pytest.mark.slow -def test_stack(tmp_path, spec): +def test_stack(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks @@ -225,58 +257,57 @@ def test_stack(tmp_path, spec): (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks c = xp.stack((a, b), axis=0) - run_operation(tmp_path, "stack", c) + run_operation(tmp_path, executor, "stack", c) # Searching Functions @pytest.mark.slow -def test_argmax(tmp_path, spec): +def test_argmax(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = xp.argmax(a, axis=0) - run_operation(tmp_path, "argmax", b) + run_operation(tmp_path, executor, "argmax", b) # Statistical Functions @pytest.mark.slow -def test_max(tmp_path, spec): +def test_max(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = xp.max(a, axis=0) - run_operation(tmp_path, "max", b) + run_operation(tmp_path, executor, "max", b) @pytest.mark.slow -def test_mean(tmp_path, spec): +def test_mean(tmp_path, spec, executor): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = xp.mean(a, axis=0) - run_operation(tmp_path, "mean", b) + run_operation(tmp_path, executor, "mean", b) @pytest.mark.slow -def test_sum_partial_reduce(tmp_path, spec): +def test_sum_partial_reduce(tmp_path, spec, executor): a = cubed.random.random( (40000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks b = partial_reduce(a, nxp.sum, split_every={0: 8}) - run_operation(tmp_path, "sum_partial_reduce", b) + run_operation(tmp_path, executor, "sum_partial_reduce", b) # Internal functions -def run_operation(tmp_path, name, result_array, *, optimize_function=None): +def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None): # result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False) # result_array.visualize(f"cubed-{name}", optimize_function=optimize_function) - executor = LithopsExecutor(config=LITHOPS_LOCAL_CONFIG) hist = HistoryCallback() mem_warn = MemoryWarningCallback() # use store=None to write to temporary zarr @@ -291,8 +322,19 @@ def run_operation(tmp_path, name, result_array, *, optimize_function=None): df = hist.stats_df print(df) + # check peak memory does not exceed allowed mem + assert (df["peak_measured_mem_end_mb_max"] <= ALLOWED_MEM // 1_000_000).all() + + # check change in peak memory is no more than projected mem + assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all() + # check projected_mem_utilization does not exceed 1 - assert (df["projected_mem_utilization"] <= 1.0).all() + # except on processes executor that runs multiple tasks in a process + if ( + executor.name != "processes" + or executor.kwargs.get("max_tasks_per_child", None) == 1 + ): + assert (df["projected_mem_utilization"] <= 1.0).all() # delete temp files for this test immediately since they are so large shutil.rmtree(tmp_path) diff --git a/cubed/tests/utils.py b/cubed/tests/utils.py index 28d10307..6e66e585 100644 --- a/cubed/tests/utils.py +++ b/cubed/tests/utils.py @@ -1,5 +1,4 @@ import platform -import sys from typing import Iterable import networkx as nx @@ -29,10 +28,8 @@ # ThreadsExecutor calls `peak_measured_mem` which is not supported on Windows ALL_EXECUTORS.append(create_executor("threads")) - # ProcessesExecutor uses an API available from 3.11 onwards (max_tasks_per_child) - if sys.version_info >= (3, 11): - ALL_EXECUTORS.append(create_executor("processes")) - MAIN_EXECUTORS.append(create_executor("processes")) + ALL_EXECUTORS.append(create_executor("processes")) + MAIN_EXECUTORS.append(create_executor("processes")) try: ALL_EXECUTORS.append(create_executor("beam"))