Skip to content

Commit

Permalink
Introduce RetryingFunctionExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Aug 14, 2023
1 parent 6ad5601 commit 027d291
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 92 deletions.
35 changes: 16 additions & 19 deletions cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
from cubed.core.plan import visit_node_generations, visit_nodes
from cubed.runtime.backup import should_launch_backup
from cubed.runtime.executors.lithops_retries import (
RetryingFunctionExecutor,
RetryingFuture,
map_with_retries,
wait_with_retries,
)
from cubed.runtime.types import DagExecutor
from cubed.runtime.utils import handle_callbacks
Expand All @@ -40,7 +39,7 @@ def run_func(input, func=None, config=None, name=None):


def map_unordered(
lithops_function_executor: FunctionExecutor,
lithops_function_executor: RetryingFunctionExecutor,
group_map_functions: Sequence[Callable[..., Any]],
group_map_iterdata: Sequence[
Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]]
Expand Down Expand Up @@ -88,8 +87,7 @@ def map_unordered(
partial_map_function = lambda x: map_function(x, **kwargs)
group_name_to_function[group_name] = partial_map_function

futures = map_with_retries(
lithops_function_executor,
futures = lithops_function_executor.map(
partial_map_function,
map_iterdata,
timeout=timeout,
Expand All @@ -101,8 +99,7 @@ def map_unordered(
pending.extend(futures)

while pending:
finished, pending = wait_with_retries(
lithops_function_executor,
finished, pending = lithops_function_executor.wait(
pending,
throw_except=False,
return_when=return_when,
Expand Down Expand Up @@ -142,8 +139,7 @@ def map_unordered(
):
input = future.input
logger.info("Running backup task for %s", input)
futures = map_with_retries(
lithops_function_executor,
futures = lithops_function_executor.map(
group_name_to_function[group_name],
[input],
timeout=timeout,
Expand Down Expand Up @@ -172,16 +168,17 @@ def execute_dag(
) -> None:
use_backups = kwargs.pop("use_backups", False)
allowed_mem = spec.allowed_mem if spec is not None else None
with FunctionExecutor(**kwargs) as executor:
runtime_memory_mb = executor.config[executor.backend].get(
"runtime_memory", None
)
if runtime_memory_mb is not None and allowed_mem is not None:
runtime_memory = runtime_memory_mb * 1_000_000
if runtime_memory < allowed_mem:
raise ValueError(
f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})"
)
function_executor = FunctionExecutor(**kwargs)
runtime_memory_mb = function_executor.config[function_executor.backend].get(
"runtime_memory", None
)
if runtime_memory_mb is not None and allowed_mem is not None:
runtime_memory = runtime_memory_mb * 1_000_000
if runtime_memory < allowed_mem:
raise ValueError(
f"Runtime memory ({runtime_memory}) is less than allowed_mem ({allowed_mem})"
)
with RetryingFunctionExecutor(function_executor) as executor:
if not compute_arrays_in_parallel:
for name, node in visit_nodes(dag, resume=resume):
pipeline = node["pipeline"]
Expand Down
156 changes: 89 additions & 67 deletions cubed/runtime/executors/lithops_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

from lithops import FunctionExecutor
from lithops.future import ResponseFuture
from lithops.wait import ALL_COMPLETED, ALWAYS, ANY_COMPLETED
from lithops.wait import (
ALL_COMPLETED,
ALWAYS,
ANY_COMPLETED,
THREADPOOL_SIZE,
WAIT_DUR_SEC,
)
from six import reraise


Expand Down Expand Up @@ -86,80 +92,96 @@ def result(self, throw_except: bool = True, internal_storage: Any = None):
return res


def map_with_retries(
function_executor: FunctionExecutor,
map_function: Callable[..., Any],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
timeout: Optional[int] = None,
include_modules: Optional[List[str]] = [],
retries: Optional[int] = None,
group_name: Optional[str] = None,
) -> List[RetryingFuture]:
class RetryingFunctionExecutor:
"""
A generalisation of Lithops `map`, with retries.
A wrapper around Lithops `FunctionExecutor` that supports retries.
"""

inputs = list(map_iterdata)
futures_list = function_executor.map(
map_function, inputs, timeout=timeout, include_modules=include_modules
)
return [
RetryingFuture(
f,
map_function=map_function,
input=i,
map_kwargs=dict(timeout=timeout, include_modules=include_modules),
retries=retries,
group_name=group_name,
)
for i, f in zip(inputs, futures_list)
]


def wait_with_retries(
function_executor: FunctionExecutor,
fs: List[RetryingFuture],
throw_except: Optional[bool] = True,
return_when: Optional[Any] = ALL_COMPLETED,
show_progressbar: Optional[bool] = True,
) -> Tuple[List[RetryingFuture], List[RetryingFuture]]:
"""
A generalisation of Lithops `wait`, with retries.
"""
def __init__(self, executor: FunctionExecutor):
self.executor = executor

lookup = {f.response_future: f for f in fs}
def __enter__(self):
self.executor.__enter__()
return self

while True:
response_futures = [f.response_future for f in fs]
def __exit__(self, exc_type, exc_value, traceback):
self.executor.__exit__(exc_type, exc_value, traceback)

done, pending = function_executor.wait(
response_futures,
throw_except=throw_except,
return_when=return_when,
show_progressbar=show_progressbar,
def map(
self,
map_function: Callable[..., Any],
map_iterdata: Iterable[Union[List[Any], Tuple[Any, ...], Dict[str, Any]]],
*,
retries: Optional[int] = None,
group_name: Optional[str] = None,
**kwargs,
) -> List[RetryingFuture]:
inputs = list(map_iterdata)
futures_list = self.executor.map(
map_function,
inputs,
**kwargs,
)

retrying_done = []
retrying_pending = [lookup[response_future] for response_future in pending]
for response_future in done:
retrying_future = lookup[response_future]
if response_future.error:
retrying_future._inc_failure_count()
if retrying_future._should_retry():
retrying_future._retry(function_executor)
# put back into pending since we are retrying this input
retrying_pending.append(retrying_future)
lookup[retrying_future.response_future] = retrying_future
return [
RetryingFuture(
f,
map_function=map_function,
input=i,
map_kwargs=kwargs,
retries=retries,
group_name=group_name,
)
for i, f in zip(inputs, futures_list)
]

def wait(
self,
fs: List[RetryingFuture],
throw_except: Optional[bool] = True,
return_when: Optional[Any] = ALL_COMPLETED,
download_results: Optional[bool] = False,
timeout: Optional[int] = None,
threadpool_size: Optional[int] = THREADPOOL_SIZE,
wait_dur_sec: Optional[int] = WAIT_DUR_SEC,
show_progressbar: Optional[bool] = True,
) -> Tuple[List[RetryingFuture], List[RetryingFuture]]:
lookup = {f.response_future: f for f in fs}

while True:
response_futures = [f.response_future for f in fs]

done, pending = self.executor.wait(
response_futures,
throw_except=throw_except,
return_when=return_when,
download_results=download_results,
timeout=timeout,
threadpool_size=threadpool_size,
wait_dur_sec=wait_dur_sec,
show_progressbar=show_progressbar,
)

retrying_done = []
retrying_pending = [lookup[response_future] for response_future in pending]
for response_future in done:
retrying_future = lookup[response_future]
if response_future.error:
retrying_future._inc_failure_count()
if retrying_future._should_retry():
retrying_future._retry(self.executor)
# put back into pending since we are retrying this input
retrying_pending.append(retrying_future)
lookup[retrying_future.response_future] = retrying_future
else:
retrying_done.append(retrying_future)
else:
retrying_done.append(retrying_future)
else:
retrying_done.append(retrying_future)

if return_when == ALWAYS:
break
elif return_when == ANY_COMPLETED and len(retrying_done) > 0:
break
elif return_when == ALL_COMPLETED and len(retrying_pending) == 0:
break
if return_when == ALWAYS:
break
elif return_when == ANY_COMPLETED and len(retrying_done) > 0:
break
elif return_when == ALL_COMPLETED and len(retrying_pending) == 0:
break

return retrying_done, retrying_pending
return retrying_done, retrying_pending
3 changes: 2 additions & 1 deletion cubed/tests/runtime/test_lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
from lithops.executors import LocalhostExecutor

from cubed.runtime.executors.lithops import map_unordered
from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor


def run_test(function, input, retries, timeout=10, use_backups=False):
outputs = set()
with LocalhostExecutor() as executor:
with RetryingFunctionExecutor(LocalhostExecutor()) as executor:
for output in map_unordered(
executor,
[function],
Expand Down
9 changes: 4 additions & 5 deletions cubed/tests/runtime/test_lithops_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,19 @@

from lithops.executors import LocalhostExecutor

from cubed.runtime.executors.lithops_retries import map_with_retries, wait_with_retries
from cubed.runtime.executors.lithops_retries import RetryingFunctionExecutor
from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure


def run_test(function, input, retries, timeout=10):
with LocalhostExecutor() as executor:
futures = map_with_retries(
executor,
with RetryingFunctionExecutor(LocalhostExecutor()) as executor:
futures = executor.map(
function,
input,
timeout=timeout,
retries=retries,
)
done, pending = wait_with_retries(executor, futures, throw_except=False)
done, pending = executor.wait(futures, throw_except=False)
assert len(pending) == 0
outputs = set(f.result() for f in done)
return outputs
Expand Down

0 comments on commit 027d291

Please sign in to comment.