Skip to content

Commit

Permalink
avoid materialization of dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 26, 2024
1 parent 9974bb0 commit 919ff4f
Showing 1 changed file with 5 additions and 21 deletions.
26 changes: 5 additions & 21 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4619,7 +4619,7 @@ async def add_nanny(self, comm: Comm, address: str) -> None:
def _match_graph_with_tasks(
self,
dsk: dict[Key, T_runspec],
dependencies: dict[Key, set[Key]],
dependencies: DependenciesMapping,
keys: set[Key],
) -> set[Key]:
n = -1
Expand All @@ -4634,7 +4634,6 @@ def _match_graph_with_tasks(
lost_keys.add(k)
logger.info("User asked for computation on lost data, %s", k)
dsk.pop(k, None)
del dependencies[k]
if k in keys:
keys.remove(k)
del deps
Expand Down Expand Up @@ -4668,15 +4667,14 @@ def _match_graph_with_tasks(
stack.append(dep)
for anc in done:
dsk.pop(anc, None)
dependencies.pop(anc, None)
return lost_keys

def _create_taskstate_from_graph(
self,
*,
start: float,
dsk: dict[Key, T_runspec],
dependencies: dict,
dependencies: DependenciesMapping,
keys: set[Key],
ordered: dict[Key, int],
client: str,
Expand Down Expand Up @@ -4878,19 +4876,7 @@ async def update_graph(
logger.debug("Materialization done. Got %i tasks.", len(dsk))
del graph
if not internal_priority:
# Removing all non-local keys before calling order()
dsk_keys = set(
dsk
) # intersection() of sets is much faster than dict_keys
stripped_deps = {
k: v.intersection(dsk_keys)
for k, v in dependencies.items()
if k in dsk_keys
}

internal_priority = await offload(
dask.order.order, dsk=dsk, dependencies=stripped_deps
)
internal_priority = await offload(dask.order.order, dsk=dsk)
ordering_done = time()
logger.debug("Ordering done.")

Expand Down Expand Up @@ -9381,7 +9367,7 @@ def _materialize_graph(
global_annotations: dict[str, Any],
validate: bool,
keys: set[Key],
) -> tuple[dict[Key, T_runspec], dict[Key, set[Key]], dict[str, dict[Key, Any]]]:
) -> tuple[dict[Key, T_runspec], DependenciesMapping, dict[str, dict[Key, Any]]]:
dsk: dict = ensure_dict(graph)
if validate:
for k in dsk:
Expand Down Expand Up @@ -9410,7 +9396,5 @@ def _materialize_graph(
logger.debug(
"Removing aliases. Started with %i and got %i left", len(dsk2), len(dsk3)
)
# FIXME: There should be no need to fully materialize and copy this but some
# sections in the scheduler are mutating it.
dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()}
dependencies = DependenciesMapping(dsk3)
return dsk3, dependencies, annotations_by_type

0 comments on commit 919ff4f

Please sign in to comment.