Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure runtime memory is not less than allowed #274

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cubed/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ def compute(
**kwargs,
):
"""Compute multiple arrays at once."""
plan = arrays_to_plan(*arrays) # guarantees all arrays have same spec
spec = check_array_specs(arrays) # guarantees all arrays have same spec
plan = arrays_to_plan(*arrays)
if executor is None:
executor = arrays[0].spec.executor
if executor is None:
Expand All @@ -413,6 +414,7 @@ def compute(
optimize_graph=optimize_graph,
resume=resume,
array_names=[a.name for a in arrays],
spec=spec,
**kwargs,
)

Expand Down
2 changes: 2 additions & 0 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def execute(
callbacks=None,
optimize_graph=True,
resume=None,
spec=None,
array_names=None,
**kwargs,
):
Expand All @@ -169,6 +170,7 @@ def execute(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**kwargs,
)
if callbacks is not None:
Expand Down
4 changes: 3 additions & 1 deletion cubed/runtime/executors/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ def expand(self, pcoll):
class BeamDagExecutor(DagExecutor):
"""An execution engine that uses Apache Beam."""

def execute_dag(self, dag, callbacks=None, array_names=None, resume=None, **kwargs):
def execute_dag(
self, dag, callbacks=None, array_names=None, resume=None, spec=None, **kwargs
):
dag = dag.copy()
pipeline = beam.Pipeline(**kwargs)

Expand Down
1 change: 1 addition & 0 deletions cubed/runtime/executors/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def execute_dag(
callbacks=None,
array_names=None,
resume=None,
spec=None,
**compute_kwargs,
):
# Note this currently only builds the task graph for each stage once it gets to that stage in computation
Expand Down
15 changes: 14 additions & 1 deletion cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from lithops.wait import ALWAYS, ANY_COMPLETED
from networkx import MultiDiGraph

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.executors.lithops_retries import (
Expand Down Expand Up @@ -166,11 +166,22 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
use_backups = kwargs.pop("use_backups", False)
allowed_mem = spec.allowed_mem if spec is not None else None
with FunctionExecutor(**kwargs) as executor:
runtime_memory_mb = executor.config[executor.backend].get(
"runtime_memory", None
)
if runtime_memory_mb is not None and allowed_mem is not None:
runtime_memory = runtime_memory_mb * 1_000_000
if runtime_memory < allowed_mem:
raise ValueError(
f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})"
)
if not compute_arrays_in_parallel:
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
Expand Down Expand Up @@ -238,6 +249,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -246,5 +258,6 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**merged_kwargs,
)
23 changes: 20 additions & 3 deletions cubed/runtime/executors/modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
from networkx import MultiDiGraph
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_nodes
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import execute_with_stats, handle_callbacks

RUNTIME_MEMORY_MIB = 2000

stub = modal.Stub("cubed-stub")

requirements_file = os.getenv("CUBED_MODAL_REQUIREMENTS_FILE")
Expand Down Expand Up @@ -48,10 +50,20 @@
)


def check_runtime_memory(spec):
allowed_mem = spec.allowed_mem if spec is not None else None
runtime_memory = RUNTIME_MEMORY_MIB * 1024 * 1024
if allowed_mem is not None:
if runtime_memory < allowed_mem:
raise ValueError(
f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})"
)


@stub.function(
image=aws_image,
secret=modal.Secret.from_name("my-aws-secret"),
memory=2000,
memory=RUNTIME_MEMORY_MIB,
retries=2,
cloud="aws",
)
Expand All @@ -66,7 +78,7 @@ def run_remotely(input, func=None, config=None):
@stub.cls(
image=gcp_image,
secret=modal.Secret.from_name("my-googlecloud-secret"),
memory=2000,
memory=RUNTIME_MEMORY_MIB,
retries=2,
cloud="gcp",
)
Expand Down Expand Up @@ -97,9 +109,12 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
cloud: Optional[str] = None,
**kwargs,
) -> None:
if spec is not None:
check_runtime_memory(spec)
with stub.run():
cloud = cloud or "aws"
if cloud == "aws":
Expand Down Expand Up @@ -138,6 +153,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -146,5 +162,6 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**merged_kwargs,
)
14 changes: 12 additions & 2 deletions cubed/runtime/executors/modal_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
from networkx import MultiDiGraph
from tenacity import retry, retry_if_exception_type, stop_after_attempt

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.executors.modal import Container, run_remotely, stub
from cubed.runtime.executors.modal import (
Container,
check_runtime_memory,
run_remotely,
stub,
)
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import handle_callbacks

Expand Down Expand Up @@ -151,10 +156,13 @@ async def async_execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
cloud: Optional[str] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
if spec is not None:
check_runtime_memory(spec)
async with stub.run():
cloud = cloud or "aws"
if cloud == "aws":
Expand Down Expand Up @@ -195,6 +203,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
Expand All @@ -204,6 +213,7 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**merged_kwargs,
)
)
3 changes: 2 additions & 1 deletion cubed/runtime/executors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from networkx import MultiDiGraph
from tenacity import retry, stop_after_attempt

from cubed.core.array import Callback, TaskEndEvent
from cubed.core.array import Callback, Spec, TaskEndEvent
from cubed.core.plan import visit_nodes
from cubed.primitive.types import CubedPipeline
from cubed.runtime.types import DagExecutor
Expand All @@ -23,6 +23,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
for name, node in visit_nodes(dag, resume=resume):
Expand Down
5 changes: 4 additions & 1 deletion cubed/runtime/executors/python_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from networkx import MultiDiGraph
from tenacity import Retrying, stop_after_attempt

from cubed.core.array import Callback
from cubed.core.array import Callback, Spec
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.primitive.types import CubedPipeline
from cubed.runtime.types import DagExecutor
Expand Down Expand Up @@ -99,6 +99,7 @@ async def async_execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_arrays_in_parallel: Optional[bool] = None,
**kwargs,
) -> None:
Expand Down Expand Up @@ -136,6 +137,7 @@ def execute_dag(
callbacks: Optional[Sequence[Callback]] = None,
array_names: Optional[Sequence[str]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
**kwargs,
) -> None:
asyncio.run(
Expand All @@ -144,6 +146,7 @@ def execute_dag(
callbacks=callbacks,
array_names=array_names,
resume=resume,
spec=spec,
**kwargs,
)
)
14 changes: 14 additions & 0 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,17 @@ def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_para
finally:
fs = fsspec.open(tmp_path).fs
fs.rm(tmp_path, recursive=True)


@pytest.mark.cloud
def test_check_runtime_memory_modal(spec, modal_executor):
tmp_path = "s3://cubed-unittest/check-runtime-memory"
spec = cubed.Spec(tmp_path, allowed_mem="4GB") # larger than Modal runtime memory
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
c = xp.add(a, b)
with pytest.raises(
ValueError,
match=r"Runtime memory \(2097152000\) is less than allowed_mem \(4000000000\)",
):
c.compute(executor=modal_executor)