From 49dd19f1e22c238d5a317463e584a5b4064dbdde Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 18 Mar 2024 10:30:23 +0000 Subject: [PATCH] Use executor name rather than class name (#430) --- cubed/tests/test_core.py | 6 ++-- cubed/tests/test_executor_features.py | 50 +++++++-------------------- cubed/tests/test_random.py | 6 +++- docs/user-guide/executors.md | 3 +- 4 files changed, 21 insertions(+), 44 deletions(-) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index 5d0e0eae..229a15b0 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -525,10 +525,8 @@ def test_array_pickle(spec, executor): def test_measure_reserved_mem(executor): pytest.importorskip("lithops") - from cubed.runtime.executors.lithops import LithopsDagExecutor - - if not isinstance(executor, LithopsDagExecutor): - pytest.skip(f"{type(executor)} does not support measure_reserved_mem") + if executor.name != "lithops": + pytest.skip(f"{executor.name} executor does not support measure_reserved_mem") reserved_memory = cubed.measure_reserved_mem(executor=executor) assert reserved_memory > 1_000_000 # over 1MB diff --git a/cubed/tests/test_executor_features.py b/cubed/tests/test_executor_features.py index ba324b9c..850e6a75 100644 --- a/cubed/tests/test_executor_features.py +++ b/cubed/tests/test_executor_features.py @@ -13,7 +13,7 @@ from cubed.extensions.timeline import TimelineVisualizationCallback from cubed.extensions.tqdm import TqdmProgressBar from cubed.primitive.blockwise import apply_blockwise -from cubed.runtime.executors.python_async import AsyncPythonDagExecutor +from cubed.runtime.create import create_executor from cubed.tests.utils import ( ALL_EXECUTORS, MAIN_EXECUTORS, @@ -71,8 +71,8 @@ def mock_apply_blockwise(*args, **kwargs): platform.system() == "Windows", reason="measuring memory does not run on windows" ) def test_retries(mocker, spec): - # Use AsyncPythonDagExecutor since PythonDagExecutor doesn't support retries - executor = AsyncPythonDagExecutor() + # Use threads executor since single-threaded executor doesn't support retries + executor = create_executor("threads") # Inject faults into the primitive layer mocker.patch( "cubed.primitive.blockwise.apply_blockwise", side_effect=mock_apply_blockwise @@ -145,13 +145,8 @@ def test_callbacks_modal(spec, modal_executor): def test_resume(spec, executor): - try: - from cubed.runtime.executors.beam import BeamDagExecutor - - if isinstance(executor, BeamDagExecutor): - pytest.skip(f"{type(executor)} does not support resume") - except ImportError: - pass + if executor.name == "beam": + pytest.skip(f"{executor.name} executor does not support resume") a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec) b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec) @@ -178,15 +173,10 @@ def test_resume(spec, executor): @pytest.mark.parametrize("compute_arrays_in_parallel", [True, False]) def test_compute_arrays_in_parallel(spec, any_executor, compute_arrays_in_parallel): - try: - from cubed.runtime.executors.beam import BeamDagExecutor - - if isinstance(any_executor, BeamDagExecutor): - pytest.skip( - f"{type(any_executor)} does not support compute_arrays_in_parallel" - ) - except ImportError: - pass + if any_executor.name == "beam": + pytest.skip( + f"{any_executor.name} executor does not support compute_arrays_in_parallel" + ) a = cubed.random.random((10, 10), chunks=(5, 5), spec=spec) b = cubed.random.random((10, 10), chunks=(5, 5), spec=spec) @@ -227,15 +217,8 @@ def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_para def test_check_runtime_memory_dask(spec, executor): pytest.importorskip("dask.distributed") - try: - from cubed.runtime.executors.dask_distributed_async import ( - AsyncDaskDistributedExecutor, - ) - - if not isinstance(executor, AsyncDaskDistributedExecutor): - pytest.skip(f"{type(executor)} does not support check_runtime_memory") - except ImportError: - pass + if executor.name != "dask": + pytest.skip(f"{executor.name} executor does not support check_runtime_memory") spec = cubed.Spec(spec.work_dir, allowed_mem="4GB") # larger than runtime memory a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec) @@ -253,15 +236,8 @@ def test_check_runtime_memory_dask(spec, executor): def test_check_runtime_memory_dask_no_workers(spec, executor): pytest.importorskip("dask.distributed") - try: - from cubed.runtime.executors.dask_distributed_async import ( - AsyncDaskDistributedExecutor, - ) - - if not isinstance(executor, AsyncDaskDistributedExecutor): - pytest.skip(f"{type(executor)} does not support check_runtime_memory") - except ImportError: - pass + if executor.name != "dask": + pytest.skip(f"{executor.name} executor does not support check_runtime_memory") spec = cubed.Spec(spec.work_dir, allowed_mem=100000) a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec) diff --git a/cubed/tests/test_random.py b/cubed/tests/test_random.py index ed8ca854..5144538c 100644 --- a/cubed/tests/test_random.py +++ b/cubed/tests/test_random.py @@ -15,7 +15,11 @@ def spec(tmp_path): return cubed.Spec(tmp_path, allowed_mem=100000) -@pytest.fixture(scope="module", params=MAIN_EXECUTORS) +@pytest.fixture( + scope="module", + params=MAIN_EXECUTORS, + ids=[executor.name for executor in MAIN_EXECUTORS], +) def executor(request): return request.param diff --git a/docs/user-guide/executors.md b/docs/user-guide/executors.md index 7f60db6b..103a2df6 100644 --- a/docs/user-guide/executors.md +++ b/docs/user-guide/executors.md @@ -25,12 +25,11 @@ An executor may be specified as a part of the {py:class}`Spec `: ```python import cubed -from cubed.runtime.executors.modal_async import AsyncModalDagExecutor spec = cubed.Spec( work_dir="s3://cubed-tomwhite-temp", allowed_mem="2GB", - executor=AsyncModalDagExecutor() + executor_name="modal" ) ```