From f7a9f9d9e5b9e734f303241e0b344a1e4d9cff02 Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 6 Jul 2023 15:58:21 +0100 Subject: [PATCH] Ensure runtime memory is not less than allowed (Lithops and Modal) --- cubed/core/array.py | 4 +++- cubed/core/plan.py | 2 ++ cubed/runtime/executors/beam.py | 4 +++- cubed/runtime/executors/dask.py | 1 + cubed/runtime/executors/lithops.py | 15 ++++++++++++++- cubed/runtime/executors/modal.py | 23 ++++++++++++++++++++--- cubed/runtime/executors/modal_async.py | 14 ++++++++++++-- cubed/runtime/executors/python.py | 3 ++- cubed/runtime/executors/python_async.py | 5 ++++- cubed/tests/test_core.py | 14 ++++++++++++++ 10 files changed, 75 insertions(+), 10 deletions(-) diff --git a/cubed/core/array.py b/cubed/core/array.py index 82f1166c..fb77ae76 100644 --- a/cubed/core/array.py +++ b/cubed/core/array.py @@ -398,7 +398,8 @@ def compute( **kwargs, ): """Compute multiple arrays at once.""" - plan = arrays_to_plan(*arrays) # guarantees all arrays have same spec + spec = check_array_specs(arrays) # guarantees all arrays have same spec + plan = arrays_to_plan(*arrays) if executor is None: executor = arrays[0].spec.executor if executor is None: @@ -413,6 +414,7 @@ def compute( optimize_graph=optimize_graph, resume=resume, array_names=[a.name for a in arrays], + spec=spec, **kwargs, ) diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 365c00c3..5dfc19c3 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -156,6 +156,7 @@ def execute( callbacks=None, optimize_graph=True, resume=None, + spec=None, array_names=None, **kwargs, ): @@ -169,6 +170,7 @@ def execute( callbacks=callbacks, array_names=array_names, resume=resume, + spec=spec, **kwargs, ) if callbacks is not None: diff --git a/cubed/runtime/executors/beam.py b/cubed/runtime/executors/beam.py index 9bcb31cb..fc853add 100644 --- a/cubed/runtime/executors/beam.py +++ b/cubed/runtime/executors/beam.py @@ -83,7 +83,9 @@ def expand(self, pcoll): class BeamDagExecutor(DagExecutor): """An execution engine that uses Apache Beam.""" - def execute_dag(self, dag, callbacks=None, array_names=None, resume=None, **kwargs): + def execute_dag( + self, dag, callbacks=None, array_names=None, resume=None, spec=None, **kwargs + ): dag = dag.copy() pipeline = beam.Pipeline(**kwargs) diff --git a/cubed/runtime/executors/dask.py b/cubed/runtime/executors/dask.py index 24c10f1c..b95a31d8 100644 --- a/cubed/runtime/executors/dask.py +++ b/cubed/runtime/executors/dask.py @@ -18,6 +18,7 @@ def execute_dag( callbacks=None, array_names=None, resume=None, + spec=None, **compute_kwargs, ): # Note this currently only builds the task graph for each stage once it gets to that stage in computation diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index f1cceffb..8b554bcd 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -20,7 +20,7 @@ from lithops.wait import ALWAYS, ANY_COMPLETED from networkx import MultiDiGraph -from cubed.core.array import Callback +from cubed.core.array import Callback, Spec from cubed.core.plan import visit_node_generations, visit_nodes from cubed.runtime.backup import should_launch_backup from cubed.runtime.executors.lithops_retries import ( @@ -166,11 +166,22 @@ def execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, compute_arrays_in_parallel: Optional[bool] = None, **kwargs, ) -> None: use_backups = kwargs.pop("use_backups", False) + allowed_mem = spec.allowed_mem if spec is not None else None with FunctionExecutor(**kwargs) as executor: + runtime_memory_mb = executor.config[executor.backend].get( + "runtime_memory", None + ) + if runtime_memory_mb is not None and allowed_mem is not None: + runtime_memory = runtime_memory_mb * 1_000_000 + if runtime_memory < allowed_mem: + raise ValueError( + f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})" + ) if not compute_arrays_in_parallel: for name, node in visit_nodes(dag, resume=resume): pipeline = node["pipeline"] @@ -238,6 +249,7 @@ def execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, **kwargs, ) -> None: merged_kwargs = {**self.kwargs, **kwargs} @@ -246,5 +258,6 @@ def execute_dag( callbacks=callbacks, array_names=array_names, resume=resume, + spec=spec, **merged_kwargs, ) diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index 6d3541c6..4c4bb5dc 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -8,11 +8,13 @@ from networkx import MultiDiGraph from tenacity import retry, retry_if_exception_type, stop_after_attempt -from cubed.core.array import Callback +from cubed.core.array import Callback, Spec from cubed.core.plan import visit_nodes from cubed.runtime.types import DagExecutor from cubed.runtime.utils import execute_with_stats, handle_callbacks +RUNTIME_MEMORY_MIB = 2000 + stub = modal.Stub("cubed-stub") requirements_file = os.getenv("CUBED_MODAL_REQUIREMENTS_FILE") @@ -48,10 +50,20 @@ ) +def check_runtime_memory(spec): + allowed_mem = spec.allowed_mem if spec is not None else None + runtime_memory = RUNTIME_MEMORY_MIB * 1024 * 1024 + if allowed_mem is not None: + if runtime_memory < allowed_mem: + raise ValueError( + f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})" + ) + + @stub.function( image=aws_image, secret=modal.Secret.from_name("my-aws-secret"), - memory=2000, + memory=RUNTIME_MEMORY_MIB, retries=2, cloud="aws", ) @@ -66,7 +78,7 @@ def run_remotely(input, func=None, config=None): @stub.cls( image=gcp_image, secret=modal.Secret.from_name("my-googlecloud-secret"), - memory=2000, + memory=RUNTIME_MEMORY_MIB, retries=2, cloud="gcp", ) @@ -97,9 +109,12 @@ def execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, cloud: Optional[str] = None, **kwargs, ) -> None: + if spec is not None: + check_runtime_memory(spec) with stub.run(): cloud = cloud or "aws" if cloud == "aws": @@ -138,6 +153,7 @@ def execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, **kwargs, ) -> None: merged_kwargs = {**self.kwargs, **kwargs} @@ -146,5 +162,6 @@ def execute_dag( callbacks=callbacks, array_names=array_names, resume=resume, + spec=spec, **merged_kwargs, ) diff --git a/cubed/runtime/executors/modal_async.py b/cubed/runtime/executors/modal_async.py index c327b47f..b1a56c21 100644 --- a/cubed/runtime/executors/modal_async.py +++ b/cubed/runtime/executors/modal_async.py @@ -11,10 +11,15 @@ from networkx import MultiDiGraph from tenacity import retry, retry_if_exception_type, stop_after_attempt -from cubed.core.array import Callback +from cubed.core.array import Callback, Spec from cubed.core.plan import visit_node_generations, visit_nodes from cubed.runtime.backup import should_launch_backup -from cubed.runtime.executors.modal import Container, run_remotely, stub +from cubed.runtime.executors.modal import ( + Container, + check_runtime_memory, + run_remotely, + stub, +) from cubed.runtime.types import DagExecutor from cubed.runtime.utils import handle_callbacks @@ -151,10 +156,13 @@ async def async_execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, cloud: Optional[str] = None, compute_arrays_in_parallel: Optional[bool] = None, **kwargs, ) -> None: + if spec is not None: + check_runtime_memory(spec) async with stub.run(): cloud = cloud or "aws" if cloud == "aws": @@ -195,6 +203,7 @@ def execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, **kwargs, ) -> None: merged_kwargs = {**self.kwargs, **kwargs} @@ -204,6 +213,7 @@ def execute_dag( callbacks=callbacks, array_names=array_names, resume=resume, + spec=spec, **merged_kwargs, ) ) diff --git a/cubed/runtime/executors/python.py b/cubed/runtime/executors/python.py index 26852f12..9720f1b8 100644 --- a/cubed/runtime/executors/python.py +++ b/cubed/runtime/executors/python.py @@ -3,7 +3,7 @@ from networkx import MultiDiGraph from tenacity import retry, stop_after_attempt -from cubed.core.array import Callback, TaskEndEvent +from cubed.core.array import Callback, Spec, TaskEndEvent from cubed.core.plan import visit_nodes from cubed.primitive.types import CubedPipeline from cubed.runtime.types import DagExecutor @@ -23,6 +23,7 @@ def execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, **kwargs, ) -> None: for name, node in visit_nodes(dag, resume=resume): diff --git a/cubed/runtime/executors/python_async.py b/cubed/runtime/executors/python_async.py index 3f5351a5..4ac7b8ac 100644 --- a/cubed/runtime/executors/python_async.py +++ b/cubed/runtime/executors/python_async.py @@ -9,7 +9,7 @@ from networkx import MultiDiGraph from tenacity import Retrying, stop_after_attempt -from cubed.core.array import Callback +from cubed.core.array import Callback, Spec from cubed.core.plan import visit_node_generations, visit_nodes from cubed.primitive.types import CubedPipeline from cubed.runtime.types import DagExecutor @@ -99,6 +99,7 @@ async def async_execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, compute_arrays_in_parallel: Optional[bool] = None, **kwargs, ) -> None: @@ -136,6 +137,7 @@ def execute_dag( callbacks: Optional[Sequence[Callback]] = None, array_names: Optional[Sequence[str]] = None, resume: Optional[bool] = None, + spec: Optional[Spec] = None, **kwargs, ) -> None: asyncio.run( @@ -144,6 +146,7 @@ def execute_dag( callbacks=callbacks, array_names=array_names, resume=resume, + spec=spec, **kwargs, ) ) diff --git a/cubed/tests/test_core.py b/cubed/tests/test_core.py index efd3c0aa..a9cd49c6 100644 --- a/cubed/tests/test_core.py +++ b/cubed/tests/test_core.py @@ -629,3 +629,17 @@ def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_para finally: fs = fsspec.open(tmp_path).fs fs.rm(tmp_path, recursive=True) + + +@pytest.mark.cloud +def test_check_runtime_memory_modal(spec, modal_executor): + tmp_path = "s3://cubed-unittest/check-runtime-memory" + spec = cubed.Spec(tmp_path, allowed_mem="4GB") # larger than Modal runtime memory + a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec) + b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec) + c = xp.add(a, b) + with pytest.raises( + ValueError, + match=r"Runtime memory \(2097152000\) is less than allowed_mem \(4000000000\)", + ): + c.compute(executor=modal_executor)