Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Queueing and scheduling subtasks at worker side #2627

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions mars/deploy/oscar/tests/test_cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,38 +111,6 @@ def _get_labelled_port(label=None, create=True):

supervisor_cmd_start = [sys.executable, "-m", "mars.deploy.oscar.supervisor"]
worker_cmd_start = [sys.executable, "-m", "mars.deploy.oscar.worker"]
start_params = {
"bare_start": [
supervisor_cmd_start,
worker_cmd_start
+ [
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
False,
],
"with_supervisors": [
supervisor_cmd_start
+ [
"-e",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"-w",
lambda: str(_get_labelled_port("web")),
"--n-process",
"2",
],
worker_cmd_start
+ [
"-e",
lambda: f"127.0.0.1:{get_next_port(occupy=True)}",
"-s",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
True,
],
}


def _reload_args(args):
Expand All @@ -159,8 +127,40 @@ def _reload_args(args):

@pytest.mark.parametrize(
"supervisor_args,worker_args,use_web_addr",
list(start_params.values()),
ids=list(start_params.keys()),
[
pytest.param(
supervisor_cmd_start,
worker_cmd_start
+ [
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
False,
id="bare_start",
),
pytest.param(
supervisor_cmd_start
+ [
"-e",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"-w",
lambda: str(_get_labelled_port("web")),
"--n-process",
"2",
],
worker_cmd_start
+ [
"-e",
lambda: f"127.0.0.1:{get_next_port(occupy=True)}",
"-s",
lambda: f'127.0.0.1:{_get_labelled_port("supervisor")}',
"--config-file",
os.path.join(os.path.dirname(__file__), "local_test_config.yml"),
],
True,
id="with_supervisors",
),
],
)
@flaky(max_runs=10, rerun_filter=lambda err, *_: issubclass(err[0], _rerun_errors))
def test_cmdline_run(supervisor_args, worker_args, use_web_addr):
Expand Down
2 changes: 2 additions & 0 deletions mars/oscar/backends/ray/tests/test_ray_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@


class TestActor(mo.Actor):
__test__ = False

async def kill(self, address, uid):
actor_ref = await mo.actor_ref(address, uid)
task = asyncio.create_task(actor_ref.crash())
Expand Down
2 changes: 2 additions & 0 deletions mars/oscar/backends/test/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@


class TestMainActorPool(MainActorPool):
__test__ = False

@classmethod
def get_external_addresses(
cls, address: str, n_process: int = None, ports: List[int] = None
Expand Down
4 changes: 2 additions & 2 deletions mars/oscar/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ cdef class ActorRefMethod:
"""
Wrapper for an Actor method at client
"""
cdef ActorRef ref
cdef object method_name
cdef public ActorRef ref
cdef public object method_name
cdef object _options

def __init__(self, ref, method_name, options=None):
Expand Down
2 changes: 2 additions & 0 deletions mars/remote/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ def __init__(
n_output=None,
**kw,
):
function_args = function_args or []
function_kwargs = function_kwargs or {}
super().__init__(
_function=function,
_function_args=function_args,
Expand Down
2 changes: 1 addition & 1 deletion mars/services/cluster/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def get_mars_versions(self) -> List[str]:
node_info_ref = await self._get_node_info_ref()
return await node_info_ref.get_mars_versions()

async def get_bands(self) -> Dict:
async def get_bands(self) -> Dict[BandType, int]:
"""
Get bands that can be used for computation on current node.
Expand Down
39 changes: 39 additions & 0 deletions mars/services/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
import warnings
from typing import Dict, Iterable, List, Union

from .. import oscar as mo
from ..lib.aio import alru_cache
from ..serialization.serializables import (
Serializable,
BytesField,
StringField,
TupleField,
DictField,
)

_ModulesType = Union[List, str, None]


Expand All @@ -28,6 +38,35 @@ class NodeRole(enum.Enum):
WORKER = 1


class ActorCallback(Serializable):
actor_uid: bytes = BytesField("actor_uid")
actor_address: str = StringField("actor_address")
actor_method: str = StringField("actor_method")
args: tuple = TupleField("args")
kwargs: dict = DictField("kwargs")

def __init__(self, ref_method=None, **kw):
if ref_method is not None:
kw["actor_uid"] = ref_method.ref.uid
kw["actor_address"] = ref_method.ref.address
kw["actor_method"] = ref_method.method_name
kw["args"] = kw.get("args") or ()
kw["kwargs"] = kw.get("kwargs") or {}
super().__init__(**kw)

@classmethod
@alru_cache(cache_exceptions=False)
async def _get_ref(cls, actor_uid: bytes, actor_address: str):
return await mo.actor_ref(actor_uid, address=actor_address)

async def __call__(self, *args, **kwargs):
ref = await self._get_ref(self.actor_uid, self.actor_address)
args = self.args + args
kw = self.kwargs.copy()
kw.update(kwargs)
return await getattr(ref, self.actor_method)(*args, **kw)


class AbstractService(abc.ABC):
_instances = dict()

Expand Down
91 changes: 45 additions & 46 deletions mars/services/scheduling/api/oscar.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@


class SchedulingAPI(AbstractSchedulingAPI):
def __init__(
self, session_id: str, address: str, manager_ref=None, queueing_ref=None
):
def __init__(self, session_id: str, address: str, manager_ref=None):
self._session_id = session_id
self._address = address

self._manager_ref = manager_ref
self._queueing_ref = queueing_ref

@classmethod
@alru_cache
Expand All @@ -41,20 +38,30 @@ async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
manager_ref = await mo.actor_ref(
SubtaskManagerActor.gen_uid(session_id), address=address
)
from ..supervisor.queueing import SubtaskQueueingActor

queueing_ref = await mo.actor_ref(
SubtaskQueueingActor.gen_uid(session_id), address=address
)

scheduling_api = SchedulingAPI(session_id, address, manager_ref, queueing_ref)
scheduling_api = SchedulingAPI(session_id, address, manager_ref)
return scheduling_api

async def get_subtask_schedule_summaries(
self, task_id: Optional[str] = None
) -> List[SubtaskScheduleSummary]:
return await self._manager_ref.get_schedule_summaries(task_id)

async def cache_subtasks(
self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None
):
"""
Add subtask graph to cache for fast forwarding

Parameters
----------
subtasks
list of subtasks to be submitted to service
priorities
list of priorities of subtasks
"""
await self._manager_ref.cache_subtasks(subtasks, priorities)

async def add_subtasks(
self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None
):
Expand Down Expand Up @@ -88,12 +95,12 @@ async def update_subtask_priority(self, subtask_id: str, priority: Tuple):

@update_subtask_priority.batch
async def update_subtask_priority(self, args_list, kwargs_list):
await self._queueing_ref.update_subtask_priority.batch(
*(
self._queueing_ref.update_subtask_priority.delay(*args, **kwargs)
for args, kwargs in zip(args_list, kwargs_list)
)
)
subtask_ids, priorities = [], []
for args, kwargs in zip(args_list, kwargs_list):
subtask_id, priority = self.update_subtask_priority.bind(*args, **kwargs)
subtask_ids.append(subtask_id)
priorities.append(priority)
await self._manager_ref.update_subtask_priorities(subtask_ids, priorities)

async def cancel_subtasks(
self, subtask_ids: List[str], kill_timeout: Union[float, int] = 5
Expand All @@ -110,51 +117,43 @@ async def cancel_subtasks(
"""
await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout)

async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = True):
"""
Mark subtasks as finished, letting scheduling service to schedule
next tasks in the ready queue

Parameters
----------
subtask_ids
ids of subtasks to mark as finished
schedule_next
whether to schedule succeeding subtasks
"""
await self._manager_ref.finish_subtasks(subtask_ids, schedule_next)


class MockSchedulingAPI(SchedulingAPI):
@classmethod
async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
from ..supervisor import GlobalSlotManagerActor, AutoscalerActor

await mo.create_actor(
GlobalSlotManagerActor,
uid=GlobalSlotManagerActor.default_uid(),
address=address,
)
await mo.create_actor(
AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address
)
# from ..supervisor import AutoscalerActor
# await mo.create_actor(
# AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address
# )

from .... import resource as mars_resource
from ..worker import (
SubtaskExecutionActor,
WorkerSlotManagerActor,
SubtaskPrepareQueueActor,
SubtaskExecutionQueueActor,
WorkerQuotaManagerActor,
SlotManagerActor,
)

await mo.create_actor(
SubtaskExecutionActor,
subtask_max_retries=0,
uid=SubtaskExecutionActor.default_uid(),
SlotManagerActor,
uid=SlotManagerActor.default_uid(),
address=address,
)
await mo.create_actor(
SubtaskPrepareQueueActor,
uid=SubtaskPrepareQueueActor.default_uid(),
address=address,
)
await mo.create_actor(
SubtaskExecutionQueueActor,
uid=SubtaskExecutionQueueActor.default_uid(),
address=address,
)
await mo.create_actor(
WorkerSlotManagerActor,
uid=WorkerSlotManagerActor.default_uid(),
SubtaskExecutionActor,
subtask_max_retries=0,
uid=SubtaskExecutionActor.default_uid(),
address=address,
)
await mo.create_actor(
Expand Down
2 changes: 0 additions & 2 deletions mars/services/scheduling/supervisor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,5 @@

from .assigner import AssignerActor
from .autoscale import AutoscalerActor
from .globalslot import GlobalSlotManagerActor
from .manager import SubtaskManagerActor
from .queueing import SubtaskQueueingActor
from .service import SchedulingSupervisorService
21 changes: 11 additions & 10 deletions mars/services/scheduling/supervisor/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ async def __post_create__(self):
strategy_cls = getattr(importlib.import_module(module), name)
else:
strategy_cls = PendingTaskBacklogStrategy
from ..supervisor import GlobalSlotManagerActor

self.global_slot_ref = await mo.actor_ref(
GlobalSlotManagerActor.default_uid(), address=self.address
)
# from ..supervisor import GlobalSlotManagerActor
#
# self.global_slot_ref = await mo.actor_ref(
# GlobalSlotManagerActor.default_uid(), address=self.address
# )
self._cluster_api = await ClusterAPI.create(self.address)
self._strategy = await strategy_cls.create(self._autoscale_conf, self)
if self._enabled:
Expand All @@ -62,11 +62,12 @@ async def __pre_destroy__(self):
await self._strategy.stop()

async def register_session(self, session_id: str, address: str):
from .queueing import SubtaskQueueingActor

self.queueing_refs[session_id] = await mo.actor_ref(
SubtaskQueueingActor.gen_uid(session_id), address=address
)
pass
# from .queueing import SubtaskQueueingActor
#
# self.queueing_refs[session_id] = await mo.actor_ref(
# SubtaskQueueingActor.gen_uid(session_id), address=address
# )

async def unregister_session(self, session_id: str):
self.queueing_refs.pop(session_id, None)
Expand Down
Loading