From e185fca6d12a9a2d95a82ec24be69e34cbfcefad Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 18 Mar 2024 10:57:36 +0000 Subject: [PATCH] Remove ModalDagExecutor, since it is unused and has fewer features than the async version (#431) --- cubed/runtime/create.py | 4 -- cubed/runtime/executors/modal.py | 83 +-------------------------- cubed/tests/test_executor_features.py | 7 --- cubed/tests/utils.py | 1 - 4 files changed, 1 insertion(+), 94 deletions(-) diff --git a/cubed/runtime/create.py b/cubed/runtime/create.py index 3bced979..491b0a3c 100644 --- a/cubed/runtime/create.py +++ b/cubed/runtime/create.py @@ -28,10 +28,6 @@ def create_executor(name: str, executor_options: Optional[dict] = None) -> Execu from cubed.runtime.executors.modal_async import AsyncModalDagExecutor return AsyncModalDagExecutor(**executor_options) - elif name == "modal-sync": - from cubed.runtime.executors.modal import ModalDagExecutor - - return ModalDagExecutor(**executor_options) elif name == "processes": from cubed.runtime.executors.python_async import AsyncPythonDagExecutor diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index c1a152f7..df81fce2 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -1,21 +1,8 @@ import os -import time -from asyncio.exceptions import TimeoutError -from typing import Optional, Sequence import modal -from modal.exception import ConnectionError -from networkx import MultiDiGraph -from tenacity import retry, retry_if_exception_type, stop_after_attempt -from cubed.runtime.pipeline import visit_nodes -from cubed.runtime.types import Callback, DagExecutor -from cubed.runtime.utils import ( - execute_with_stats, - handle_callbacks, - handle_operation_start_callbacks, -) -from cubed.spec import Spec +from cubed.runtime.utils import execute_with_stats RUNTIME_MEMORY_MIB = 2000 @@ -105,71 +92,3 @@ def run_remotely(self, input, func=None, config=None, name=None, compute_id=None # note we can't use the execution_stat decorator since it doesn't work with modal decorators result, stats = execute_with_stats(func, input, config=config) return result, stats - - -# This just retries the initial connection attempt, not the function calls -@retry( - reraise=True, - retry=retry_if_exception_type((TimeoutError, ConnectionError)), - stop=stop_after_attempt(3), -) -def execute_dag( - dag: MultiDiGraph, - callbacks: Optional[Sequence[Callback]] = None, - resume: Optional[bool] = None, - spec: Optional[Spec] = None, - cloud: Optional[str] = None, - **kwargs, -) -> None: - if spec is not None: - check_runtime_memory(spec) - with stub.run(): - cloud = cloud or "aws" - if cloud == "aws": - app_function = run_remotely - elif cloud == "gcp": - app_function = Container().run_remotely - else: - raise ValueError(f"Unrecognized cloud: {cloud}") - for name, node in visit_nodes(dag, resume=resume): - handle_operation_start_callbacks(callbacks, name) - pipeline = node["pipeline"] - task_create_tstamp = time.time() - for _, stats in app_function.map( - pipeline.mappable, - order_outputs=False, - kwargs=dict(func=pipeline.function, config=pipeline.config), - ): - stats["name"] = name - stats["task_create_tstamp"] = task_create_tstamp - handle_callbacks(callbacks, stats) - - -class ModalDagExecutor(DagExecutor): - """An execution engine that uses Modal.""" - - def __init__(self, **kwargs): - self.kwargs = kwargs - - @property - def name(self) -> str: - return "modal-sync" - - def execute_dag( - self, - dag: MultiDiGraph, - callbacks: Optional[Sequence[Callback]] = None, - resume: Optional[bool] = None, - spec: Optional[Spec] = None, - compute_id: Optional[str] = None, - **kwargs, - ) -> None: - merged_kwargs = {**self.kwargs, **kwargs} - execute_dag( - dag, - callbacks=callbacks, - resume=resume, - spec=spec, - compute_id=compute_id, - **merged_kwargs, - ) diff --git a/cubed/tests/test_executor_features.py b/cubed/tests/test_executor_features.py index 850e6a75..80588583 100644 --- a/cubed/tests/test_executor_features.py +++ b/cubed/tests/test_executor_features.py @@ -191,13 +191,6 @@ def test_compute_arrays_in_parallel(spec, any_executor, compute_arrays_in_parall @pytest.mark.cloud @pytest.mark.parametrize("compute_arrays_in_parallel", [True, False]) def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_parallel): - from cubed.runtime.executors.modal_async import AsyncModalDagExecutor - - if not isinstance(modal_executor, AsyncModalDagExecutor): - pytest.skip( - f"{type(modal_executor)} does not support compute_arrays_in_parallel" - ) - tmp_path = "s3://cubed-unittest/parallel_pipelines" spec = cubed.Spec(tmp_path, allowed_mem=100000) try: diff --git a/cubed/tests/utils.py b/cubed/tests/utils.py index 7cdcd108..79747863 100644 --- a/cubed/tests/utils.py +++ b/cubed/tests/utils.py @@ -49,7 +49,6 @@ try: MODAL_EXECUTORS.append(create_executor("modal")) - MODAL_EXECUTORS.append(create_executor("modal-sync")) except ImportError: pass