From 8edba46ccc09f515876c49932d66e38f33c6486d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 26 Mar 2020 13:22:06 +0000 Subject: [PATCH 1/9] Add worker key reference counting --- distributed/worker.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/distributed/worker.py b/distributed/worker.py index ba25c91d97..aff39b1f5e 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -330,6 +330,7 @@ def __init__( ): self.tasks = dict() self.task_state = dict() + self.task_refs = dict() self.dep_state = dict() self.dependencies = dict() self.dependents = dict() @@ -1299,6 +1300,11 @@ def update_data(self, comm=None, data=None, report=True, serializers=None): if key in self.dep_state: self.transition_dep(key, "memory", value=value) + if key in self.task_refs: + self.task_refs[key] += 1 + else: + self.task_refs[key] = 1 + self.log.append((key, "receive-from-scatter")) if report: @@ -1309,6 +1315,14 @@ def update_data(self, comm=None, data=None, report=True, serializers=None): async def delete_data(self, comm=None, keys=None, report=True): if keys: for key in list(keys): + if key in self.task_refs: + self.task_refs[key] -= 1 + if self.task_refs[key] > 0: + keys.remove(key) + continue + else: + del self.task_refs[key] + self.log.append((key, "delete")) if key in self.task_state: self.release_key(key) From 53691d2c22c9a62d6d8bd3f0a68461079bb9682d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 26 Mar 2020 13:24:16 +0000 Subject: [PATCH 2/9] Add test --- distributed/tests/test_worker_client.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index 14a2d30f7d..d35f74f382 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -315,3 +315,16 @@ def test_submit_different_names(s, a, b): assert fut > 0 finally: yield c.close() + + +@gen_cluster(client=True) +async def test_task_unique_groups_scatter(c, s, a, b): + """ This test ensure that tasks are correctly deleted when using scatter/submit + """ + a = await c.scatter([0, 1], hash=True) + x = await c.submit(sum, a) + del a + del x + b = await c.scatter([1, 2], hash=True) + y = await c.submit(sum, b) + assert s.task_prefixes["sum"].states["memory"] == 1 From a799dd59f436672039d5d76736737a16d8092653 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 26 Mar 2020 14:01:40 +0000 Subject: [PATCH 3/9] Cast keys to list as it sometimes is a tuple --- distributed/worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index aff39b1f5e..193b9ba31a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1313,8 +1313,9 @@ def update_data(self, comm=None, data=None, report=True, serializers=None): return info async def delete_data(self, comm=None, keys=None, report=True): + keys = list(keys) if keys: - for key in list(keys): + for key in keys: if key in self.task_refs: self.task_refs[key] -= 1 if self.task_refs[key] > 0: From 3b4b7795aa302d34ba0f60b117e9f90a380be1dd Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 26 Mar 2020 19:17:24 +0000 Subject: [PATCH 4/9] Update test --- distributed/tests/test_worker_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index d35f74f382..cf131c5430 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -327,4 +327,4 @@ async def test_task_unique_groups_scatter(c, s, a, b): del x b = await c.scatter([1, 2], hash=True) y = await c.submit(sum, b) - assert s.task_prefixes["sum"].states["memory"] == 1 + assert y.key in b.data and s.tasks[y.key].status == "memory" From 6d6f44cff63e47fa6d80e30a7de8cc1449c6e0a4 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 27 Mar 2020 08:48:34 +0000 Subject: [PATCH 5/9] Fix test --- distributed/tests/test_worker_client.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index cf131c5430..f801b2b4f3 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -321,10 +321,10 @@ def test_submit_different_names(s, a, b): async def test_task_unique_groups_scatter(c, s, a, b): """ This test ensure that tasks are correctly deleted when using scatter/submit """ - a = await c.scatter([0, 1], hash=True) - x = await c.submit(sum, a) - del a + n = await c.scatter([0, 1], hash=True) + x = await c.submit(sum, n) + del n del x - b = await c.scatter([1, 2], hash=True) - y = await c.submit(sum, b) - assert y.key in b.data and s.tasks[y.key].status == "memory" + m = await c.scatter([1, 2], hash=True) + y = await c.submit(sum, m) + assert y in list(b.data.values()) From 05b606972492b42ab4a3a106dcb54519bc47340a Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 27 Mar 2020 09:23:03 +0000 Subject: [PATCH 6/9] Check both workers for value --- distributed/tests/test_worker_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index f801b2b4f3..5db48bb8f2 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -327,4 +327,4 @@ async def test_task_unique_groups_scatter(c, s, a, b): del x m = await c.scatter([1, 2], hash=True) y = await c.submit(sum, m) - assert y in list(b.data.values()) + assert y in list(b.data.values()) or y in list(a.data.values()) From 3eaa30fa40d3d4e58fc5b72b42e946aae9dfa8e5 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 27 Mar 2020 13:52:52 +0000 Subject: [PATCH 7/9] Update test so that sum key is also the same both times --- distributed/tests/test_worker_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index 5db48bb8f2..06736aba36 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -325,6 +325,6 @@ async def test_task_unique_groups_scatter(c, s, a, b): x = await c.submit(sum, n) del n del x - m = await c.scatter([1, 2], hash=True) + m = await c.scatter([0, 1], hash=True) y = await c.submit(sum, m) assert y in list(b.data.values()) or y in list(a.data.values()) From 6114d4b3606d035e1ec31dd83f473848731e0220 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 25 Apr 2020 13:20:17 -0700 Subject: [PATCH 8/9] Create futures at the beginning of scatter rather than the end Otherwise there is a period where the user has requested that data exists, but we haven't incremented the reference counter yet. This way we've stated that yes, these keys are important to us, even while we wait for the data to transfer. This stops the client from sending an erroneous release message to the scheduler. --- distributed/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/client.py b/distributed/client.py index 52c0e2b420..af7507898e 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1985,6 +1985,7 @@ async def _scatter( if w.scheduler.address == self.scheduler.address: direct = True + out = {k: Future(k, self, inform=False) for k in data} if local_worker: # running within task local_worker.update_data(data=data, report=False) @@ -2024,7 +2025,6 @@ async def _scatter( timeout=timeout, ) - out = {k: Future(k, self, inform=False) for k in data} for key, typ in types.items(): self.futures[key].finish(type=typ) From e84ff97bc46f47fb723f68aea0c20732e2acaa8a Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Sat, 25 Apr 2020 13:34:39 -0700 Subject: [PATCH 9/9] Improve logging on lost data --- distributed/scheduler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 82e5f812dd..294f95ef81 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1815,7 +1815,11 @@ def update_graph( if any( dep not in self.tasks and dep not in tasks for dep in deps ): # bad key - logger.info("User asked for computation on lost data, %s", k) + logger.info( + "User asked for computation %s lost data %s", + k, + [dep for dep in deps if dep not in self.tasks], + ) del tasks[k] del dependencies[k] if k in keys: