Skip to content

Commit

Permalink
Try resolve missing items in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Feb 17, 2022
1 parent 4bea67e commit 6ee75c1
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 11 deletions.
63 changes: 63 additions & 0 deletions mars/services/scheduling/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import asyncio
import contextlib
import sys
import time
from collections import OrderedDict
from typing import Dict, Mapping, Optional, TypeVar, Iterator

from ... import oscar as mo
from ...lib.aio import alru_cache
Expand Down Expand Up @@ -58,3 +61,63 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks):
)
await asyncio.wait(coros)
raise


ResultType = TypeVar("ResultType")


class ResultCache(Mapping[str, ResultType]):
_cache: Dict[str, ResultType]
_cache_time: Dict[str, float]
_duration: float

def __init__(self, duration: float = 120):
self._cache = dict()
self._cache_time = OrderedDict()
self._duration = duration

def __getitem__(self, item: str):
self._del_expired_items()
return self._cache[item]

def get(
self, key: str, default: Optional[ResultType] = None
) -> Optional[ResultType]:
self._del_expired_items()
return self._cache.get(key, default)

def _del_expired_items(self):
keys = []
expire_time = time.time() - self._duration
for key, store_time in self._cache_time.items():
if store_time < expire_time:
break
keys.append(key)
for key in keys:
self._delitem(key)

def __setitem__(self, key: str, value):
self._del_expired_items()
self._cache[key] = value
self._cache_time[key] = time.time()

def _delitem(self, key: str):
del self._cache[key]
self._cache_time.pop(key, None)

def __delitem__(self, key: str):
self._delitem(key)
self._del_expired_items()

def __contains__(self, item: str):
self._del_expired_items()
return item in self._cache

def __len__(self) -> int:
self._del_expired_items()
return len(self._cache)

def __iter__(self) -> Iterator[str]:
self._del_expired_items()
return iter(self._cache)

25 changes: 16 additions & 9 deletions mars/services/scheduling/worker/execution/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ....cluster import ClusterAPI
from ....core import ActorCallback
from ....subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus
from ...utils import ResultCache
from ..queues import SubtaskPrepareQueueActor, SubtaskExecutionQueueActor
from ..quota import QuotaActor
from ..slotmanager import SlotManagerActor
Expand All @@ -49,18 +50,21 @@ class SubtaskExecutionActor(mo.Actor):

_subtask_api: SubtaskAPI
_subtask_preparer: SubtaskPreparer
_subtask_result_cache: ResultCache[SubtaskResult]

def __init__(
self,
subtask_max_retries: int = None,
enable_kill_slot: bool = True,
):
self._pred_key_mapping_dag = DAG()
self._subtask_caches = dict()
self._subtask_executions = dict()
self._prepare_queue_ref = None
self._execution_queue_ref = None

self._subtask_caches = dict()
self._subtask_executions = dict()
self._subtask_result_cache = ResultCache()

self._subtask_max_retries = subtask_max_retries or DEFAULT_SUBTASK_MAX_RETRIES
self._enable_kill_slot = enable_kill_slot

Expand Down Expand Up @@ -222,6 +226,7 @@ async def submit_subtasks(
priorities: List[Tuple],
supervisor_address: str,
band_name: str,
reschedule: bool = False,
):
assert len(subtasks) == len(priorities)
logger.debug("%d subtasks submitted to SubtaskExecutionActor", len(subtasks))
Expand All @@ -230,12 +235,15 @@ async def submit_subtasks(
for subtask, priority in zip(subtasks, priorities):
if isinstance(subtask, str):
try:
subtask = self._subtask_caches[subtask].subtask
except KeyError:
subtask = self._subtask_executions[subtask].subtask
except KeyError:
subtask = self._subtask_caches[subtask].subtask
try:
info = self._subtask_executions[subtask.subtask_id]
if info.result.status not in (
if subtask.subtask_id in self._subtask_executions:
result = self._subtask_executions[subtask.subtask_id].result
else:
result = self._subtask_result_cache[subtask.subtask_id]
if result.status not in (
SubtaskStatus.cancelled,
SubtaskStatus.errored,
):
Expand All @@ -249,7 +257,6 @@ async def submit_subtasks(
supervisor_address=supervisor_address,
band_name=band_name,
)
self._subtask_caches.pop(subtask.subtask_id, None)
self._subtask_executions[subtask.subtask_id] = subtask_info
put_delays.append(
self._prepare_queue_ref.put.delay(
Expand Down Expand Up @@ -322,7 +329,7 @@ async def cancel_subtasks(
continue
if not subtask_info.result.status.is_done:
self._fill_result_with_exc(subtask_info, exc_cls=asyncio.CancelledError)
infos_to_report.append(subtask_info)
infos_to_report.append(subtask_info)
await self._report_subtask_results(infos_to_report)

async def wait_subtasks(self, subtask_ids: List[str]):
Expand Down Expand Up @@ -488,6 +495,7 @@ async def _execute_subtask_with_retry(self, subtask_info: SubtaskExecutionInfo):
subtask_info,
max_retries=subtask_info.max_retries if subtask.retryable else 0,
)
self._subtask_result_cache[subtask.subtask_id] = subtask_info.result
except Exception as ex: # noqa: E722 # nosec # pylint: disable=bare-except
if not subtask.retryable:
unretryable_op = [
Expand Down Expand Up @@ -654,6 +662,5 @@ async def _forward_subtask_info(self, subtask_info: SubtaskExecutionInfo):
await self._execution_queue_ref.put(
subtask_id, subtask_info.band_name, subtask_info.priority
)
self.uncache_subtasks([subtask_id])
except PrepareFastFailed:
self._subtask_executions.pop(subtask_id, None)
13 changes: 11 additions & 2 deletions mars/services/subtask/worker/tests/subtask_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,19 @@ def _execute_operand(self, ctx: Dict[str, Any], op: OperandType):
continue
self.assert_object_consistent(out, ctx[out.key])

async def done(self):
await super().done()
def _unregister_executors(self):
for op in self._operand_executors:
try:
op.unregister_executor()
except KeyError:
pass

async def _release_scheduling(self):
# once the operand stops running, the slot may be reused immediately
# thus executors must be cleaned in time
self._unregister_executors()
await super()._release_scheduling()

async def done(self):
await super().done()
self._unregister_executors()

0 comments on commit 6ee75c1

Please sign in to comment.