Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster Dump SchedulerPlugin #5983

Merged
merged 11 commits into from
Mar 30, 2022
4 changes: 2 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3930,8 +3930,8 @@ async def _dump_cluster_state(
self,
filename: str = "dask-cluster-dump",
write_from_scheduler: bool | None = None,
exclude: Collection[str] = ("run_spec",),
format: Literal["msgpack", "yaml"] = "msgpack",
exclude: Collection[str] = cluster_dump.DEFAULT_CLUSTER_DUMP_EXCLUDE,
format: Literal["msgpack", "yaml"] = cluster_dump.DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options,
):
filename = str(filename)
Expand Down
5 changes: 4 additions & 1 deletion distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story

DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack"
DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",)


def _tuple_to_list(node):
if isinstance(node, (list, tuple)):
Expand All @@ -27,7 +30,7 @@ def _tuple_to_list(node):
async def write_state(
get_state: Callable[[], Awaitable[Any]],
url: str,
format: Literal["msgpack", "yaml"],
format: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: dict[str, Any],
) -> None:
"Await a cluster dump, then serialize and write it to a path"
Expand Down
37 changes: 37 additions & 0 deletions distributed/diagnostics/cluster_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Any, Collection, Dict, Literal

from distributed.cluster_dump import (
DEFAULT_CLUSTER_DUMP_EXCLUDE,
DEFAULT_CLUSTER_DUMP_FORMAT,
)
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.scheduler import Scheduler


class ClusterDump(SchedulerPlugin):
"""Dumps cluster state prior to Scheduler shutdown

The Scheduler may shutdown in cases where it is in an error state,
or when it has been unexpectedly idle for long periods of time.
This plugin dumps the cluster state prior to Scheduler shutdown
for debugging purposes.
"""

def __init__(
self,
scheduler: Scheduler,
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
url: str,
exclude: "Collection[str]" = DEFAULT_CLUSTER_DUMP_EXCLUDE,
format_: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: Dict[str, Any],
):
self.scheduler = scheduler
self.url = url
self.exclude = exclude
self.format = format_
self.storage_options = storage_options

async def before_close(self):
await self.scheduler.dump_cluster_state_to_url(
self.url, self.exclude, self.format, **self.storage_options
)
3 changes: 3 additions & 0 deletions distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ async def start(self, scheduler: Scheduler) -> None:
This runs at the end of the Scheduler startup process
"""

async def before_close(self) -> None:
"""Runs prior to any Scheduler shutdown logic"""

async def close(self) -> None:
"""Run when the scheduler closes down

Expand Down
21 changes: 21 additions & 0 deletions distributed/diagnostics/tests/test_cluster_dump_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from distributed.cluster_dump import DumpArtefact
from distributed.diagnostics.cluster_dump import ClusterDump
from distributed.utils_test import gen_cluster, inc


@gen_cluster(client=True)
async def test_cluster_dump_plugin(c, s, *workers, tmp_path):
dump_file = tmp_path / "cluster_dump.msgpack.gz"

plugin = ClusterDump(s, str(dump_file))
s.add_plugin(plugin)
sjperkins marked this conversation as resolved.
Show resolved Hide resolved

f1 = c.submit(inc, 1)
f2 = c.submit(inc, f1)

assert (await f2) == 3
await s.close(close_workers=True)

dump = DumpArtefact.from_url(str(dump_file))
assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys())
assert {f1.key, f2.key} == set(dump.worker_story(f1.key, f2.key).keys())
5 changes: 5 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4242,6 +4242,11 @@ async def close(self, fast=False, close_workers=False):
if self.status in (Status.closing, Status.closed):
await self.finished()
return

await asyncio.gather(
*[plugin.before_close() for plugin in list(self.plugins.values())]
sjperkins marked this conversation as resolved.
Show resolved Hide resolved
)

self.status = Status.closing

logger.info("Scheduler closing...")
Expand Down