Skip to content

Commit

Permalink
Remove ModalDagExecutor, since it is unused and has fewer features th…
Browse files Browse the repository at this point in the history
…an the async version (#431)
  • Loading branch information
tomwhite authored Mar 18, 2024
1 parent 49dd19f commit e185fca
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 94 deletions.
4 changes: 0 additions & 4 deletions cubed/runtime/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ def create_executor(name: str, executor_options: Optional[dict] = None) -> Execu
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor

return AsyncModalDagExecutor(**executor_options)
elif name == "modal-sync":
from cubed.runtime.executors.modal import ModalDagExecutor

return ModalDagExecutor(**executor_options)
elif name == "processes":
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor

Expand Down
83 changes: 1 addition & 82 deletions cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
import os
import time
from asyncio.exceptions import TimeoutError
from typing import Optional, Sequence

import modal
from modal.exception import ConnectionError
from networkx import MultiDiGraph
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from cubed.runtime.pipeline import visit_nodes
from cubed.runtime.types import Callback, DagExecutor
from cubed.runtime.utils import (
execute_with_stats,
handle_callbacks,
handle_operation_start_callbacks,
)
from cubed.spec import Spec
from cubed.runtime.utils import execute_with_stats

RUNTIME_MEMORY_MIB = 2000

Expand Down Expand Up @@ -105,71 +92,3 @@ def run_remotely(self, input, func=None, config=None, name=None, compute_id=None
# note we can't use the execution_stat decorator since it doesn't work with modal decorators
result, stats = execute_with_stats(func, input, config=config)
return result, stats


# This just retries the initial connection attempt, not the function calls
@retry(
reraise=True,
retry=retry_if_exception_type((TimeoutError, ConnectionError)),
stop=stop_after_attempt(3),
)
def execute_dag(
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
cloud: Optional[str] = None,
**kwargs,
) -> None:
if spec is not None:
check_runtime_memory(spec)
with stub.run():
cloud = cloud or "aws"
if cloud == "aws":
app_function = run_remotely
elif cloud == "gcp":
app_function = Container().run_remotely
else:
raise ValueError(f"Unrecognized cloud: {cloud}")
for name, node in visit_nodes(dag, resume=resume):
handle_operation_start_callbacks(callbacks, name)
pipeline = node["pipeline"]
task_create_tstamp = time.time()
for _, stats in app_function.map(
pipeline.mappable,
order_outputs=False,
kwargs=dict(func=pipeline.function, config=pipeline.config),
):
stats["name"] = name
stats["task_create_tstamp"] = task_create_tstamp
handle_callbacks(callbacks, stats)


class ModalDagExecutor(DagExecutor):
"""An execution engine that uses Modal."""

def __init__(self, **kwargs):
self.kwargs = kwargs

@property
def name(self) -> str:
return "modal-sync"

def execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
execute_dag(
dag,
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**merged_kwargs,
)
7 changes: 0 additions & 7 deletions cubed/tests/test_executor_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,6 @@ def test_compute_arrays_in_parallel(spec, any_executor, compute_arrays_in_parall
@pytest.mark.cloud
@pytest.mark.parametrize("compute_arrays_in_parallel", [True, False])
def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_parallel):
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor

if not isinstance(modal_executor, AsyncModalDagExecutor):
pytest.skip(
f"{type(modal_executor)} does not support compute_arrays_in_parallel"
)

tmp_path = "s3://cubed-unittest/parallel_pipelines"
spec = cubed.Spec(tmp_path, allowed_mem=100000)
try:
Expand Down
1 change: 0 additions & 1 deletion cubed/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@

try:
MODAL_EXECUTORS.append(create_executor("modal"))
MODAL_EXECUTORS.append(create_executor("modal-sync"))
except ImportError:
pass

Expand Down

0 comments on commit e185fca

Please sign in to comment.