From 927d36a401a8eb6e62b8075079216ecf11439099 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 18 Sep 2024 14:11:03 +0200 Subject: [PATCH 1/3] Increase rootish dependencies --- distributed/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ab5b28fffc..5fd0824a6f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3090,8 +3090,8 @@ def is_rootish(self, ts: TaskState) -> bool: # TODO short-circuit to True if `not ts.dependencies`? return ( len(tg) > self.total_nthreads * 2 - and len(tg.dependencies) < 5 - and sum(map(len, tg.dependencies)) < 5 + and len(tg.dependencies) < 100 + and sum(map(len, tg.dependencies)) < 500 ) def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None: From 2ce050decbf875af168ab9e269655f96b5ccecf5 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Wed, 18 Sep 2024 15:01:16 +0200 Subject: [PATCH 2/3] Use config variables --- distributed/distributed.yaml | 2 ++ distributed/scheduler.py | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 1e7505a116..7227aea918 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -10,6 +10,8 @@ distributed: # tornado.application: error scheduler: + rootish-tg: 100 + rootish-tg-dependencies: 500 allowed-failures: 3 # number of retries before a task is considered bad bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth blocked-handlers: [] diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5fd0824a6f..286d0ef7aa 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1840,6 +1840,11 @@ def __init__( + repr(self.WORKER_SATURATION) ) + self.rootish_tg_threshold = dask.config.get("distributed.scheduler.rootish-tg") + self.rootish_tg_dependencies_threshold = dask.config.get( + "distributed.scheduler.rootish-tg-dependencies" + ) + @abstractmethod def log_event(self, topic: str | Collection[str], msg: Any) -> None: ... @@ -3090,8 +3095,8 @@ def is_rootish(self, ts: TaskState) -> bool: # TODO short-circuit to True if `not ts.dependencies`? return ( len(tg) > self.total_nthreads * 2 - and len(tg.dependencies) < 100 - and sum(map(len, tg.dependencies)) < 500 + and len(tg.dependencies) < self.rootish_tg_threshold + and sum(map(len, tg.dependencies)) < self.rootish_tg_dependencies_threshold ) def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0) -> None: From 3a2755efb36301a7608c4ad7d28f0dc6351e68b2 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Thu, 17 Oct 2024 12:16:49 +0200 Subject: [PATCH 3/3] Add configurations for rootish taskgroup threshold --- distributed/distributed-schema.yaml | 26 ++++++++++++++++++++++++++ distributed/distributed.yaml | 4 ++-- distributed/scheduler.py | 6 ++++-- distributed/tests/test_scheduler.py | 12 ++++++++++++ 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index f7e452383c..09dd294c7d 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -133,6 +133,32 @@ properties: generally leave `worker-saturation` at 1.0, though 1.25-1.5 could slightly improve performance if ample memory is available. + rootish-taskgroup: + type: + - integer + + description: | + Controls when a specific task group is identified as rootish when + worker saturation is set. + + A task group is identifier as rootish if it has only up to a certain number + of dependencies (5 by default). This can be faulty for very large datasets + where the number of data tasks from xarray can be higher than 5. + + Increasing this limit will capture these root tasks successfully but increase + the risk of misidentifying task groups as rootish, which can have + performance implications. + + rootish-taskgroup-dependencies: + type: + - integer + + description: | + Controls the number of transitive dependencies a task group can have to be considered rootish. + It checks the number of dependencies each dependency of a rootish task groups has. + + The same caveats as for `rootish-taskgroup` apply. + worker-ttl: type: - string diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 7227aea918..11c986093f 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -10,8 +10,6 @@ distributed: # tornado.application: error scheduler: - rootish-tg: 100 - rootish-tg-dependencies: 500 allowed-failures: 3 # number of retries before a task is considered bad bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth blocked-handlers: [] @@ -25,6 +23,8 @@ distributed: work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers + rootish-taskgroup: 5 # number of dependencies of a rootish tg + rootish-taskgroup-dependencies: 5 # number of dependencies of the dependencies of the rootish tg worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this preload: [] # Run custom modules with Scheduler preload-argv: [] # See https://docs.dask.org/en/latest/how-to/customize-initialization.html diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 93521709d5..790c519623 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1840,9 +1840,11 @@ def __init__( + repr(self.WORKER_SATURATION) ) - self.rootish_tg_threshold = dask.config.get("distributed.scheduler.rootish-tg") + self.rootish_tg_threshold = dask.config.get( + "distributed.scheduler.rootish-taskgroup" + ) self.rootish_tg_dependencies_threshold = dask.config.get( - "distributed.scheduler.rootish-tg-dependencies" + "distributed.scheduler.rootish-taskgroup-dependencies" ) @abstractmethod diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index cf3b8b8d4f..b081ed9285 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -5277,3 +5277,15 @@ async def before_close(self): assert s.plugins["before_close"].call_count == 1 lines = caplog.getvalue().split("\n") assert sum("Closing scheduler" in line for line in lines) == 1 + + +@gen_cluster( + client=True, + config={ + "distributed.scheduler.rootish-taskgroup": 10, + "distributed.scheduler.rootish-taskgroup-dependencies": 15, + }, +) +async def test_rootish_taskgroup_configuration(c, s, *workers): + assert s.rootish_tg_threshold == 10 + assert s.rootish_tg_dependencies_threshold == 15