Skip to content

Commit

Permalink
Ensure uniqueness of steal stimulus ID (#5620)
Browse files Browse the repository at this point in the history
The stimulus ID is used to verify worker responses to distinguish
concurrently running steal requests. The windows clock in particular is
not strictly monotonic which caused duplicates in this test setup.
  • Loading branch information
fjetter authored Jan 7, 2022
1 parent d50cee9 commit 232cc0f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
7 changes: 5 additions & 2 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import uuid
from collections import defaultdict, deque
from math import log2
from time import time
Expand Down Expand Up @@ -233,7 +234,9 @@ def move_task_request(self, ts, victim, thief) -> str:
try:
if ts in self.in_flight:
return "in-flight"
stimulus_id = f"steal-{time()}"
# Stimulus IDs are used to verify the response, see
# `move_task_confirm`. Therefore, this must be truly unique.
stimulus_id = f"steal-{uuid.uuid4().hex}"

key = ts.key
self.remove_key_from_stealable(ts)
Expand Down Expand Up @@ -291,7 +294,7 @@ async def move_task_confirm(self, *, key, state, stimulus_id, worker=None):
self.in_flight[ts] = d
return
except KeyError:
self.log(("already-aborted", key, state, stimulus_id))
self.log(("already-aborted", key, state, worker, stimulus_id))
return

thief = d["thief"]
Expand Down
34 changes: 33 additions & 1 deletion distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import dask

from distributed import Nanny, Worker, wait, worker_client
from distributed import Lock, Nanny, Worker, wait, worker_client
from distributed.compatibility import LINUX, WINDOWS
from distributed.config import config
from distributed.metrics import time
Expand Down Expand Up @@ -1193,3 +1193,35 @@ async def test_correct_bad_time_estimate(c, s, *workers):
assert any(s.tasks[f.key] in steal.key_stealable for f in futures)
await wait(futures)
assert all(w.data for w in workers), [sorted(w.data) for w in workers]


@gen_cluster(client=True)
async def test_steal_stimulus_id_unique(c, s, *workers):
steal = s.extensions["stealing"]
num_futs = 1_000
async with Lock() as lock:

def blocked(x, lock):
lock.acquire()

# Setup all tasks on worker 0 such that victim/thief relation is the
# same for all tasks.
futures = c.map(
blocked, range(num_futs), lock=lock, workers=[workers[0].address]
)
# Ensure all tasks are assigned to the worker since otherwise the
# move_task_request fails.
while len(workers[0].tasks) != num_futs:
await asyncio.sleep(0.1)
tasks = [s.tasks[f.key] for f in futures]
w0 = s.workers[workers[0].address]
w1 = s.workers[workers[1].address]
# Generating the move task requests as fast as possible increases the
# chance of duplicates if the uniqueness is not guaranteed.
for ts in tasks:
steal.move_task_request(ts, w0, w1)
# Values stored in in_flight are used for response verification.
# Therefore all stimulus IDs are stored here and must be unique
stimulus_ids = {dct["stimulus_id"] for dct in steal.in_flight.values()}
assert len(stimulus_ids) == num_futs
await c.cancel(futures)

0 comments on commit 232cc0f

Please sign in to comment.