Skip to content

Commit

Permalink
Some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 5, 2021
1 parent 8305049 commit bee8e03
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 160 deletions.
4 changes: 1 addition & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1969,7 +1969,6 @@ def __init__(
("processing", "erred"): self.transition_processing_erred,
("no-worker", "released"): self.transition_no_worker_released,
("no-worker", "waiting"): self.transition_no_worker_waiting,
# TODO: Write a test. Worker disconnects -> no-worker -> reconnect with task to memory. Triggered every few hundred times by test_handle_superfluous_data
("no-worker", "memory"): self.transition_no_worker_memory,
("released", "forgotten"): self.transition_released_forgotten,
("memory", "forgotten"): self.transition_memory_forgotten,
Expand Down Expand Up @@ -7769,14 +7768,13 @@ def _task_to_msg(state: SchedulerState, ts: TaskState, duration: double = -1) ->

if duration < 0:
duration = state.get_task_duration(ts)
import uuid

msg: dict = {
"op": "compute-task",
"key": ts._key,
"priority": ts._priority,
"duration": duration,
"stimulus_id": f"compute-task-{uuid.uuid4()}",
"stimulus_id": f"compute-task-{time()}",
"who_has": {},
}
if ts._resource_restrictions:
Expand Down
45 changes: 0 additions & 45 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,51 +185,6 @@ async def observe(dask_worker):
await res


# from contextlib import suppress


# @pytest.mark.repeat(100)
# @gen_cluster(client=True, Worker=Nanny)
# async def test_flight_to_executing_via_cancelled_resumed(c, s, a, b):
# fut1 = c.submit(
# slow_serialize, 1, delay=1, workers=[a.worker_address], allow_other_workers=True
# )
# await wait(fut1)

# def _id(x):
# return x.data

# fut2 = c.submit(_id, fut1, workers=[b.worker_address])

# async def observe(dask_worker):
# while (
# fut1.key not in dask_worker.tasks
# or dask_worker.tasks[fut1.key].state != "flight"
# ):
# await asyncio.sleep(0)

# await c.run(observe, workers=[b.worker_address])
# proc = psutil.Process(a.pid)

# # Close in scheduler to ensure we transition and reschedule task properly
# await s.close_worker(worker=a.worker_address)

# # Killing the process should cause a ""Worker stream died during communication" error
# proc.kill()
# assert await fut2 == 1

# b_story = (
# await c.run(
# lambda dask_worker: dask_worker.story(fut1.key), workers=[b.worker_address]
# )
# )[b.worker_address]
# assert any("receive-dep-failed" in msg for msg in b_story)
# assert any("missing-dep" in msg for msg in b_story)
# if not any("cancelled" in msg for msg in b_story):
# breakpoint()
# assert any("resumed" in msg for msg in b_story)

# @pytest.mark.repeat(100)
@gen_cluster(client=True)
async def test_flight_to_executing_via_cancelled_resumed(c, s, a, b):
import asyncio
Expand Down
3 changes: 1 addition & 2 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def __sizeof__(self) -> int:
return parse_bytes(dask.config.get("distributed.comm.offload")) + 1


@gen_cluster(client=True, timeout=345678)
@gen_cluster(client=True)
async def test_worker_who_has_clears_after_failed_connection(c, s, a, b):
"""This test is very sensitive to cluster state consistency. Timeouts often
indicate subtle deadlocks. Be mindful when marking flaky/repeat/etc."""
Expand Down Expand Up @@ -610,7 +610,6 @@ async def test_forget_data_not_supposed_to_have(s, a, b):
nthreads=[("127.0.0.1", 1) for _ in range(3)],
config={"distributed.comm.timeouts.connect": "1s"},
Worker=Nanny,
timeout=1000,
)
async def test_failing_worker_with_additional_replicas_on_cluster(c, s, *workers):
"""
Expand Down
2 changes: 2 additions & 0 deletions distributed/tests/test_stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def vsum(*args):
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 80)
async def test_stress_communication(c, s, *workers):
s.validate = False # very slow otherwise
for w in workers:
w.validate = False
da = pytest.importorskip("dask.array")
# Test consumes many file descriptors and can hang if the limit is too low
resource = pytest.importorskip("resource")
Expand Down
13 changes: 10 additions & 3 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2173,8 +2173,8 @@ def raise_exc(*args):
await asyncio.sleep(0.01)

expected_states = {
# f.key: "released",
# g.key: "released",
f.key: "released",
g.key: "released",
res.key: "error",
}

Expand Down Expand Up @@ -2367,6 +2367,7 @@ def raise_exc(*args):
assert_task_states_on_worker(expected_states_A, a)

expected_states_B = {
f.key: "released",
g.key: "memory",
h.key: "memory",
res.key: "error",
Expand All @@ -2376,12 +2377,16 @@ def raise_exc(*args):

g.release()

expected_states_A = {h.key: "memory"}
expected_states_A = {
g.key: "released",
h.key: "memory",
}
await asyncio.sleep(0.05)
assert_task_states_on_worker(expected_states_A, a)

# B must not forget a task since all have a still valid dependent
expected_states_B = {
f.key: "released",
h.key: "memory",
res.key: "error",
}
Expand All @@ -2392,6 +2397,8 @@ def raise_exc(*args):
expected_states_A = {}
assert_task_states_on_worker(expected_states_A, a)
expected_states_B = {
f.key: "released",
h.key: "released",
res.key: "error",
}

Expand Down
Loading

0 comments on commit bee8e03

Please sign in to comment.