Skip to content

Commit

Permalink
Ensure runtime memory is not less than allowed (Lithops and Modal)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Jul 24, 2023
1 parent 8b939dd commit f7a9f9d
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 10 deletions.
4 changes: 3 additions & 1 deletion cubed/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ def compute(
**kwargs,
):
"""Compute multiple arrays at once."""
plan = arrays_to_plan(*arrays) # guarantees all arrays have same spec
spec = check_array_specs(arrays) # guarantees all arrays have same spec
plan = arrays_to_plan(*arrays)
if executor is None:
executor = arrays[0].spec.executor
if executor is None:
Expand All @@ -413,6 +414,7 @@ def compute(
optimize_graph=optimize_graph,
resume=resume,
array_names=[a.name for a in arrays],
spec=spec,
**kwargs,
)

Expand Down
2 changes: 2 additions & 0 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def execute(
callbacks=None,
optimize_graph=True,
resume=None,
spec=None,
array_names=None,
**kwargs,
):
Expand All @@ -169,6 +170,7 @@ def execute(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**kwargs,
)
if callbacks is not None:
Expand Down
4 changes: 3 additions & 1 deletion cubed/runtime/executors/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def expand(self, pcoll):
class BeamDagExecutor(DagExecutor):
"""An execution engine that uses Apache Beam."""

def execute_dag(self, dag, callbacks=None, array_names=None, resume=None, **kwargs):
def execute_dag(
self, dag, callbacks=None, array_names=None, resume=None, spec=None, **kwargs
):
dag = dag.copy()
pipeline = beam.Pipeline(**kwargs)

Expand Down
1 change: 1 addition & 0 deletions cubed/runtime/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def execute_dag(
callbacks=None,
array_names=None,
resume=None,
spec=None,
**compute_kwargs,
):
# Note this currently only builds the task graph for each stage once it gets to that stage in computation
Expand Down
15 changes: 14 additions & 1 deletion cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from lithops.wait import ALWAYS, ANY_COMPLETED
from networkx import MultiDiGraph

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.executors.lithops_retries import (
Expand Down Expand Up @@ -166,11 +166,22 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", False)
allowed_mem = spec.allowed_mem if spec is not None else None
with FunctionExecutor(**kwargs) as executor:
runtime_memory_mb = executor.config[executor.backend].get(
"runtime_memory", None
)
if runtime_memory_mb is not None and allowed_mem is not None:
runtime_memory = runtime_memory_mb * 1_000_000
if runtime_memory < allowed_mem:
raise ValueError(
f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})"
)
if not compute_arrays_in_parallel:
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
Expand Down Expand Up @@ -238,6 +249,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -246,5 +258,6 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**merged_kwargs,
)
23 changes: 20 additions & 3 deletions cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from networkx import MultiDiGraph
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_nodes
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import execute_with_stats, handle_callbacks

RUNTIME_MEMORY_MIB = 2000

stub = modal.Stub("cubed-stub")

requirements_file = os.getenv("CUBED_MODAL_REQUIREMENTS_FILE")
Expand Down Expand Up @@ -48,10 +50,20 @@
)


def check_runtime_memory(spec):
allowed_mem = spec.allowed_mem if spec is not None else None
runtime_memory = RUNTIME_MEMORY_MIB * 1024 * 1024
if allowed_mem is not None:
if runtime_memory < allowed_mem:
raise ValueError(
f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})"
)


@stub.function(
image=aws_image,
secret=modal.Secret.from_name("my-aws-secret"),
memory=2000,
memory=RUNTIME_MEMORY_MIB,
retries=2,
cloud="aws",
)
Expand All @@ -66,7 +78,7 @@ def run_remotely(input, func=None, config=None):
@stub.cls(
image=gcp_image,
secret=modal.Secret.from_name("my-googlecloud-secret"),
memory=2000,
memory=RUNTIME_MEMORY_MIB,
retries=2,
cloud="gcp",
)
Expand Down Expand Up @@ -97,9 +109,12 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
cloud: Optional[str] = None,
**kwargs,
) -> None:
if spec is not None:
check_runtime_memory(spec)
with stub.run():
cloud = cloud or "aws"
if cloud == "aws":
Expand Down Expand Up @@ -138,6 +153,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -146,5 +162,6 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**merged_kwargs,
)
14 changes: 12 additions & 2 deletions cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
from networkx import MultiDiGraph
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.executors.modal import Container, run_remotely, stub
from cubed.runtime.executors.modal import (
Container,
check_runtime_memory,
run_remotely,
stub,
)
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import handle_callbacks

Expand Down Expand Up @@ -151,10 +156,13 @@ async def async_execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
cloud: Optional[str] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
if spec is not None:
check_runtime_memory(spec)
async with stub.run():
cloud = cloud or "aws"
if cloud == "aws":
Expand Down Expand Up @@ -195,6 +203,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -204,6 +213,7 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**merged_kwargs,
)
)
3 changes: 2 additions & 1 deletion cubed/runtime/executors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from networkx import MultiDiGraph
from tenacity import retry, stop_after_attempt

from cubed.core.array import Callback, TaskEndEvent
from cubed.core.array import Callback, Spec, TaskEndEvent
from cubed.core.plan import visit_nodes
from cubed.primitive.types import CubedPipeline
from cubed.runtime.types import DagExecutor
Expand All @@ -23,6 +23,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
for name, node in visit_nodes(dag, resume=resume):
Expand Down
5 changes: 4 additions & 1 deletion cubed/runtime/executors/python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from networkx import MultiDiGraph
from tenacity import Retrying, stop_after_attempt

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.primitive.types import CubedPipeline
from cubed.runtime.types import DagExecutor
Expand Down Expand Up @@ -99,6 +99,7 @@ async def async_execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
Expand Down Expand Up @@ -136,6 +137,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
asyncio.run(
Expand All @@ -144,6 +146,7 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**kwargs,
)
)
14 changes: 14 additions & 0 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,17 @@ def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_para
finally:
fs = fsspec.open(tmp_path).fs
fs.rm(tmp_path, recursive=True)


@pytest.mark.cloud
def test_check_runtime_memory_modal(spec, modal_executor):
tmp_path = "s3://cubed-unittest/check-runtime-memory"
spec = cubed.Spec(tmp_path, allowed_mem="4GB") # larger than Modal runtime memory
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
c = xp.add(a, b)
with pytest.raises(
ValueError,
match=r"Runtime memory \(2097152000\) is less than allowed_mem \(4000000000\)",
):
c.compute(executor=modal_executor)

0 comments on commit f7a9f9d

Please sign in to comment.