Skip to content

Commit

Permalink
Fix a race condition which would allow a rescheduled task to be
Browse files Browse the repository at this point in the history
reported missing even though it is not
  • Loading branch information
fjetter committed Sep 8, 2021
1 parent 0b7510e commit 9a90025
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 9 deletions.
34 changes: 34 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2718,3 +2718,37 @@ async def test_gather_dep_exception_one_task_2(c, s, a, b):
s.handle_missing_data(key="f1", errant_worker=a.address)

await fut2


async def test_gather_dep_do_not_handle_response_of_not_requested_tasks(c, s, a, b):
"""At time of writing, the gather_dep implementation filtered tasks again
for in-flight state. The response parser, however, did not distinguish
resulting in unwanted missing-data signals to the scheduler, causing
potential rescheduling or data leaks.
This test may become obsolete if the implementation changes significantly.
"""
import distributed

with mock.patch.object(distributed.worker.Worker, "gather_dep") as mocked_gather:
fut1 = c.submit(inc, 1, workers=[a.address], key="f1")
fut2 = c.submit(inc, fut1, workers=[a.address], key="f2")
await fut2
fut4 = c.submit(sum, fut1, fut2, workers=[b.address], key="f4")
fut3 = c.submit(inc, fut1, workers=[b.address], key="f3")

fut2_key = fut2.key

while fut2_key not in b.tasks or b.tasks[fut2_key].state != "flight":
await asyncio.sleep(0)

fut4.release()
while fut4.key in b.tasks:
await asyncio.sleep(0)

story_before = b.story(fut2.key)
assert fut2.key in mocked_gather.call_args.kwargs["to_gather"]
await Worker.gather_dep(b, **mocked_gather.call_args.kwargs)
story_after = b.story(fut2.key)
assert story_before == story_after
await fut3
14 changes: 5 additions & 9 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2462,30 +2462,26 @@ async def gather_dep(
busy = response.get("status", "") == "busy"
data = response.get("data", {})

# FIXME: We should not handle keys which were skipped by this coro. to_gather_keys is only a subset
assert set(to_gather_keys).issubset(
set(self.in_flight_workers.get(worker))
)

for d in self.in_flight_workers.pop(worker):
ts = self.tasks.get(d)
try:
if not busy and d in data:
if d not in to_gather_keys or ts is None:
continue
elif not busy and d in data:
self.transition(ts, "memory", value=data[d])
elif ts is None or ts.state == "executing":
elif ts.state == "executing":
self.log.append(("already-executing", d))
self.release_key(d, reason="already executing at gather")
elif ts.state == "flight" and not ts.dependents:
self.log.append(("flight no-dependents", d))
self.release_key(
d, reason="In-flight task no longer has dependents."
)
elif (
not busy
and d not in data
and ts.dependents
and ts.state != "memory"
):
elif not busy and d not in data and ts.state == "flight":
ts.who_has.discard(worker)
self.has_what[worker].discard(ts.key)
self.log.append(("missing-dep", d))
Expand Down

0 comments on commit 9a90025

Please sign in to comment.