Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-process Python executor should not retry #275

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cubed/runtime/executors/python.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from typing import Any, Callable, Optional, Sequence

from networkx import MultiDiGraph
from tenacity import retry, stop_after_attempt

from cubed.core.array import Callback, Spec, TaskEndEvent
from cubed.core.plan import visit_nodes
from cubed.primitive.types import CubedPipeline
from cubed.runtime.types import DagExecutor


@retry(reraise=True, stop=stop_after_attempt(3))
def exec_stage_func(func: Callable[..., Any], *args, **kwargs):
return func(*args, **kwargs)

Expand Down
10 changes: 9 additions & 1 deletion cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cubed.extensions.timeline import TimelineVisualizationCallback
from cubed.extensions.tqdm import TqdmProgressBar
from cubed.primitive.blockwise import apply_blockwise
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
from cubed.tests.utils import (
ALL_EXECUTORS,
MAIN_EXECUTORS,
Expand Down Expand Up @@ -467,7 +468,12 @@ def mock_apply_blockwise(*args, **kwargs):
return apply_blockwise(*args, **kwargs)


@pytest.mark.skipif(
platform.system() == "Windows", reason="measuring memory does not run on windows"
)
def test_retries(mocker, spec):
# Use AsyncPythonDagExecutor since PythonDagExecutor doesn't support retries
executor = AsyncPythonDagExecutor()
# Inject faults into the primitive layer
mocker.patch(
"cubed.primitive.blockwise.apply_blockwise", side_effect=mock_apply_blockwise
Expand All @@ -476,7 +482,9 @@ def test_retries(mocker, spec):
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
c = xp.add(a, b)
assert_array_equal(c.compute(), np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]))
assert_array_equal(
c.compute(executor=executor), np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]])
)


@pytest.mark.skipif(
Expand Down