Skip to content

Commit

Permalink
Use executor name rather than class name (#430)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite authored Mar 18, 2024
1 parent 3fd2195 commit 49dd19f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 44 deletions.
6 changes: 2 additions & 4 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,8 @@ def test_array_pickle(spec, executor):
def test_measure_reserved_mem(executor):
pytest.importorskip("lithops")

from cubed.runtime.executors.lithops import LithopsDagExecutor

if not isinstance(executor, LithopsDagExecutor):
pytest.skip(f"{type(executor)} does not support measure_reserved_mem")
if executor.name != "lithops":
pytest.skip(f"{executor.name} executor does not support measure_reserved_mem")

reserved_memory = cubed.measure_reserved_mem(executor=executor)
assert reserved_memory > 1_000_000 # over 1MB
Expand Down
50 changes: 13 additions & 37 deletions cubed/tests/test_executor_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from cubed.extensions.timeline import TimelineVisualizationCallback
from cubed.extensions.tqdm import TqdmProgressBar
from cubed.primitive.blockwise import apply_blockwise
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
from cubed.runtime.create import create_executor
from cubed.tests.utils import (
ALL_EXECUTORS,
MAIN_EXECUTORS,
Expand Down Expand Up @@ -71,8 +71,8 @@ def mock_apply_blockwise(*args, **kwargs):
platform.system() == "Windows", reason="measuring memory does not run on windows"
)
def test_retries(mocker, spec):
# Use AsyncPythonDagExecutor since PythonDagExecutor doesn't support retries
executor = AsyncPythonDagExecutor()
# Use threads executor since single-threaded executor doesn't support retries
executor = create_executor("threads")
# Inject faults into the primitive layer
mocker.patch(
"cubed.primitive.blockwise.apply_blockwise", side_effect=mock_apply_blockwise
Expand Down Expand Up @@ -145,13 +145,8 @@ def test_callbacks_modal(spec, modal_executor):


def test_resume(spec, executor):
try:
from cubed.runtime.executors.beam import BeamDagExecutor

if isinstance(executor, BeamDagExecutor):
pytest.skip(f"{type(executor)} does not support resume")
except ImportError:
pass
if executor.name == "beam":
pytest.skip(f"{executor.name} executor does not support resume")

a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
Expand All @@ -178,15 +173,10 @@ def test_resume(spec, executor):

@pytest.mark.parametrize("compute_arrays_in_parallel", [True, False])
def test_compute_arrays_in_parallel(spec, any_executor, compute_arrays_in_parallel):
try:
from cubed.runtime.executors.beam import BeamDagExecutor

if isinstance(any_executor, BeamDagExecutor):
pytest.skip(
f"{type(any_executor)} does not support compute_arrays_in_parallel"
)
except ImportError:
pass
if any_executor.name == "beam":
pytest.skip(
f"{any_executor.name} executor does not support compute_arrays_in_parallel"
)

a = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
b = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
Expand Down Expand Up @@ -227,15 +217,8 @@ def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_para

def test_check_runtime_memory_dask(spec, executor):
pytest.importorskip("dask.distributed")
try:
from cubed.runtime.executors.dask_distributed_async import (
AsyncDaskDistributedExecutor,
)

if not isinstance(executor, AsyncDaskDistributedExecutor):
pytest.skip(f"{type(executor)} does not support check_runtime_memory")
except ImportError:
pass
if executor.name != "dask":
pytest.skip(f"{executor.name} executor does not support check_runtime_memory")

spec = cubed.Spec(spec.work_dir, allowed_mem="4GB") # larger than runtime memory
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
Expand All @@ -253,15 +236,8 @@ def test_check_runtime_memory_dask(spec, executor):

def test_check_runtime_memory_dask_no_workers(spec, executor):
pytest.importorskip("dask.distributed")
try:
from cubed.runtime.executors.dask_distributed_async import (
AsyncDaskDistributedExecutor,
)

if not isinstance(executor, AsyncDaskDistributedExecutor):
pytest.skip(f"{type(executor)} does not support check_runtime_memory")
except ImportError:
pass
if executor.name != "dask":
pytest.skip(f"{executor.name} executor does not support check_runtime_memory")

spec = cubed.Spec(spec.work_dir, allowed_mem=100000)
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
Expand Down
6 changes: 5 additions & 1 deletion cubed/tests/test_random.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ def spec(tmp_path):
return cubed.Spec(tmp_path, allowed_mem=100000)


@pytest.fixture(scope="module", params=MAIN_EXECUTORS)
@pytest.fixture(
scope="module",
params=MAIN_EXECUTORS,
ids=[executor.name for executor in MAIN_EXECUTORS],
)
def executor(request):
return request.param

Expand Down
3 changes: 1 addition & 2 deletions docs/user-guide/executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ An executor may be specified as a part of the {py:class}`Spec <cubed.Spec>`:

```python
import cubed
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor

spec = cubed.Spec(
work_dir="s3://cubed-tomwhite-temp",
allowed_mem="2GB",
executor=AsyncModalDagExecutor()
executor_name="modal"
)
```

Expand Down

0 comments on commit 49dd19f

Please sign in to comment.