diff --git a/cubed/runtime/executors/modal_async.py b/cubed/runtime/executors/modal_async.py index af995d55a..c327b47fa 100644 --- a/cubed/runtime/executors/modal_async.py +++ b/cubed/runtime/executors/modal_async.py @@ -2,8 +2,8 @@ import copy import time from asyncio.exceptions import TimeoutError -from typing import Any, AsyncIterator, Dict, Iterable, Optional, Sequence from functools import partial +from typing import Any, AsyncIterator, Dict, Iterable, Optional, Sequence from aiostream import stream from modal.exception import ConnectionError @@ -166,9 +166,7 @@ async def async_execute_dag( if not compute_arrays_in_parallel: # run one pipeline at a time for name, node in visit_nodes(dag, resume=resume): - st = pipeline_to_stream( - app_function, name, node["pipeline"], **kwargs - ) + st = pipeline_to_stream(app_function, name, node["pipeline"], **kwargs) async with st.stream() as streamer: async for _, stats in streamer: handle_callbacks(callbacks, stats) @@ -176,9 +174,7 @@ async def async_execute_dag( for gen in visit_node_generations(dag, resume=resume): # run pipelines in the same topological generation in parallel by merging their streams streams = [ - pipeline_to_stream( - app_function, name, node["pipeline"], **kwargs - ) + pipeline_to_stream(app_function, name, node["pipeline"], **kwargs) for name, node in gen ] merged_stream = stream.merge(*streams)