diff --git a/cubed/runtime/executors/python.py b/cubed/runtime/executors/python.py index 9720f1b8..b9349680 100644 --- a/cubed/runtime/executors/python.py +++ b/cubed/runtime/executors/python.py @@ -1,7 +1,6 @@ from typing import Any, Callable, Optional, Sequence from networkx import MultiDiGraph -from tenacity import retry, stop_after_attempt from cubed.core.array import Callback, Spec, TaskEndEvent from cubed.core.plan import visit_nodes @@ -9,7 +8,6 @@ from cubed.runtime.types import DagExecutor -@retry(reraise=True, stop=stop_after_attempt(3)) def exec_stage_func(func: Callable[..., Any], *args, **kwargs): return func(*args, **kwargs) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index eb564769..483351b1 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -15,6 +15,7 @@ from cubed.extensions.timeline import TimelineVisualizationCallback from cubed.extensions.tqdm import TqdmProgressBar from cubed.primitive.blockwise import apply_blockwise +from cubed.runtime.executors.python_async import AsyncPythonDagExecutor from cubed.tests.utils import ( ALL_EXECUTORS, MAIN_EXECUTORS, @@ -467,7 +468,12 @@ def mock_apply_blockwise(*args, **kwargs): return apply_blockwise(*args, **kwargs) +@pytest.mark.skipif( + platform.system() == "Windows", reason="measuring memory does not run on windows" +) def test_retries(mocker, spec): + # Use AsyncPythonDagExecutor since PythonDagExecutor doesn't support retries + executor = AsyncPythonDagExecutor() # Inject faults into the primitive layer mocker.patch( "cubed.primitive.blockwise.apply_blockwise", side_effect=mock_apply_blockwise @@ -476,7 +482,9 @@ def test_retries(mocker, spec): a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec) b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec) c = xp.add(a, b) - assert_array_equal(c.compute(), np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]])) + assert_array_equal( + c.compute(executor=executor), np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]) + ) @pytest.mark.skipif(