Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rebalance() resilience to computations #4968

Merged
merged 44 commits into from
Jul 14, 2021

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Jun 24, 2021

Partial fix for #4906

In scope

  • Let rebalance() gracefully handle all possible race conditions that could be caused by a computation running at the same time on the cluster
  • Thorough unit test coverage for all the above race condition cases
  • Code deduplication with replicate(), which also grants it increased resiliency

Out of scope, left to future PRs

  • Let computation gracefully handle having keys suddendly disappear due to a rebalance()
  • Proper resiliency review of replicate()
  • Preempt most common race conditions between rebalance() and computation; namely skip in-memory tasks that are a dependency of a queued or running task
  • Investigate bulk vs. ad-hoc scheduler<->worker comms. At the moment the code uses Server.rpc like it did before.

CC @mrocklin @jrbourbeau @fjetter @gjoseph92

distributed/scheduler.py Outdated Show resolved Hide resolved
"""
result = await retry_operation(
self.rpc(addr=worker_address).gather, who_has=who_has
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the worker disconnects?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working to bake into self.rpc graceful handling for it (in scope for this PR but not yet in - the unit tests for it are already there though).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's now in

distributed/scheduler.py Outdated Show resolved Hide resolved
return {"status": "OK"}
missing_keys = {k for r in failed_keys_by_recipient.values() for k in r}
if missing_keys:
return {"status": "missing-data", "keys": list(missing_keys)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently forwarded to the user/caller of replicate/rebalance, correct? We should probably document what exactly missing-data in this context means. IIUC this does not necessarily mean that the task is lost but merely that it was not where we expected it to be, for whatever reason.

Copy link
Collaborator Author

@crusaderky crusaderky Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed Client.rebalance to raise KeyError if it receives missing-data AND the client explicitly listed futures to be rebalanced; if futures were not specified the status message will be ignored.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: the logic described above is actually in Scheduler.rebalance

Comment on lines 5953 to 5955
await asyncio.gather(
*(self._delete_worker_data(r, v) for r, v in to_senders.items())
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens to dependents of the to-be-deleted keys on the worker? IIUC the worker state machine is currently not equipped to deal with the necessary transitions required for something like this. The necessary transitions on worker side would be

transition(ts, "memory" -> "fetch")
for dep in ts.dependents: transition(dep, "ready" -> "waiting")

The delete/free/release on worker side is not that sophisticated. I remember KeyErrors popping up while debugging the deadlocks recently if I removed keys too eagerly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resilience of computation to having needed keys removed from under its feet is out of scope for this PR. As of this PR, it is still not robust to run rebalance() during a compute, in the sense that rebalance will no longer crash but compute still will.

@crusaderky crusaderky closed this Jul 1, 2021
@crusaderky crusaderky reopened this Jul 1, 2021
@@ -329,7 +329,6 @@ async def test_remove_worker_from_scheduler(s, a, b):
await s.remove_worker(address=a.address)
assert a.address not in s.nthreads
assert len(s.workers[b.address].processing) == len(dsk) # b owns everything
s.validate_state()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant - it's already called by the gen_cluster cleanup code.

@crusaderky
Copy link
Collaborator Author

@fjetter @mrocklin @jrbourbeau ready for final review. Unit tests have been extensively stress-tested.

@crusaderky crusaderky marked this pull request as ready for review July 2, 2021 12:34
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from a few nitpicks the big things I want to discuss and settle before merging are

  • Error handling around asyncio.gather
  • use of transitions instead of plain del parent.tasks

otherwise changes LGTM

distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
@@ -6073,19 +6146,14 @@ async def replicate(
wws._address for wws in ts._who_has
]

results = await asyncio.gather(
await asyncio.gather(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of an exception on one of the _gather_on_worker tasks, the exception is, by default, raised immediately. Even though an exception is raised the not yet completed tasks will continue running and are not cancelled. Regardless of whether we cancel them or not, we will always loose the result of the successful ones. If any of the gather_on_worker results fail we'd not get any logs.

I would suggest to either move all the log_event calls in the coros themselves such that they log the event themselves or add the argument return_exceptions=True to asyncio.gather and handle the exceptions explicitly here.

The latter would also reduce the chance of introducing subtle bugs once the logic below this gather becomes more complicated (if ever). I don't have a strong opinion about which way we go but I think we should handle this case properly.

Copy link
Collaborator Author

@crusaderky crusaderky Jul 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither _gather_on_worker nor _delete_worker_data ever raise exceptions though. Literally the only case where they do is event loop shutdown.
Added comments to highlight this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that's not 100% true. At the very least _delete_worker_data performs state transitions which may raise. That likely doesn't justify dedicated exception handling here so I'm good.

distributed/scheduler.py Show resolved Hide resolved
distributed/scheduler.py Show resolved Hide resolved
distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved

await cc.close_rpc()


Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is old garbage. The 'delete_data' does not exist. Note the 'dont_' prefix in the function - this was never executed.

@crusaderky
Copy link
Collaborator Author

@fjetter all review comments have been addressed

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jul 14, 2021

@fjetter synced with main and ready for merge as soon as tests pass

@fjetter fjetter merged commit 5f01fe6 into dask:main Jul 14, 2021
@crusaderky crusaderky deleted the rebalance_in_compute branch July 14, 2021 13:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants