From 919ff4ff43dfd7c09cab17a52bc076a7c4ab7c24 Mon Sep 17 00:00:00 2001 From: fjetter Date: Mon, 26 Aug 2024 13:56:03 +0200 Subject: [PATCH] avoid materialization of dependencies --- distributed/scheduler.py | 26 +++++--------------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 23100b61e0..3f43a17c33 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4619,7 +4619,7 @@ async def add_nanny(self, comm: Comm, address: str) -> None: def _match_graph_with_tasks( self, dsk: dict[Key, T_runspec], - dependencies: dict[Key, set[Key]], + dependencies: DependenciesMapping, keys: set[Key], ) -> set[Key]: n = -1 @@ -4634,7 +4634,6 @@ def _match_graph_with_tasks( lost_keys.add(k) logger.info("User asked for computation on lost data, %s", k) dsk.pop(k, None) - del dependencies[k] if k in keys: keys.remove(k) del deps @@ -4668,7 +4667,6 @@ def _match_graph_with_tasks( stack.append(dep) for anc in done: dsk.pop(anc, None) - dependencies.pop(anc, None) return lost_keys def _create_taskstate_from_graph( @@ -4676,7 +4674,7 @@ def _create_taskstate_from_graph( *, start: float, dsk: dict[Key, T_runspec], - dependencies: dict, + dependencies: DependenciesMapping, keys: set[Key], ordered: dict[Key, int], client: str, @@ -4878,19 +4876,7 @@ async def update_graph( logger.debug("Materialization done. Got %i tasks.", len(dsk)) del graph if not internal_priority: - # Removing all non-local keys before calling order() - dsk_keys = set( - dsk - ) # intersection() of sets is much faster than dict_keys - stripped_deps = { - k: v.intersection(dsk_keys) - for k, v in dependencies.items() - if k in dsk_keys - } - - internal_priority = await offload( - dask.order.order, dsk=dsk, dependencies=stripped_deps - ) + internal_priority = await offload(dask.order.order, dsk=dsk) ordering_done = time() logger.debug("Ordering done.") @@ -9381,7 +9367,7 @@ def _materialize_graph( global_annotations: dict[str, Any], validate: bool, keys: set[Key], -) -> tuple[dict[Key, T_runspec], dict[Key, set[Key]], dict[str, dict[Key, Any]]]: +) -> tuple[dict[Key, T_runspec], DependenciesMapping, dict[str, dict[Key, Any]]]: dsk: dict = ensure_dict(graph) if validate: for k in dsk: @@ -9410,7 +9396,5 @@ def _materialize_graph( logger.debug( "Removing aliases. Started with %i and got %i left", len(dsk2), len(dsk3) ) - # FIXME: There should be no need to fully materialize and copy this but some - # sections in the scheduler are mutating it. - dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()} + dependencies = DependenciesMapping(dsk3) return dsk3, dependencies, annotations_by_type