Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker State should be exclusively modified through batched comms #6604

Open
crusaderky opened this issue Jun 21, 2022 · 1 comment
Open
Labels
stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@crusaderky
Copy link
Collaborator

This is a high level epic.

The Worker State Machine (distributed/worker_state_machine.py) can be exclusively updated through the Worker.handle_stimulus handler. Most calls that change the worker state coming from the scheduler are dealt through batched comms, which has the notable feature of being strictly sequential. This makes it a lot harder to introduce subtle race conditions where the worker state is not where the scheduler thinks it it.

There are three notable offenders that bypass the batched comms and use RPC instead:

If you use any of these calls, you may have

  1. a rebalance/replicate/scatter command is fired through RPC by the scheduler
  2. another command is fired by the scheduler through batched send, e.g. free-keys
  3. the two commands land on the worker in the opposite order as they were sent by the scheduler

e.g. the scheduler may send free-keys as it wants the worker to forget the key, and then shortly afterwards it may scatter data with the same key to the worker; but the worker will instead receive the scattered data first, which will transition the key to memory, and then free-keys, which will make it lose the scattered data.

CC @fjetter @gjoseph92

@crusaderky crusaderky added the stability Issue or feature related to cluster stability (e.g. deadlock) label Jun 21, 2022
@gjoseph92
Copy link
Collaborator

I'm generally in favor of this. Note though that async stream handlers are still processed concurrently:

handler = self.stream_handlers[op]
if is_coroutine_function(handler):
self.loop.add_callback(handler, **merge(extra, msg))
await gen.sleep(0)
else:
handler(**merge(extra, msg))

So if replicate and free-keys both had async handlers (free-keys is currently sync, but just imagine), though they would always be invoked in the correct order, the handlers would still need to be written properly to be able to work concurrently. I'm not saying that's a reason not to make the change you're proposing—just something to be aware of.

The main difficultly I see is that batched comms are fire-and-forget. The scheduler neither gets a return value, or even confirmation that the operation happened. The current implementation of scatter at least seems to rely on a) getting an nbytes response back from each worker, and b) blocking until all the scatter calls have completed. So this would have to be refactored.

But overall I think this is a good idea, and ensuring state-modifying commands arrive in the order they're sent seems like an essential thing to guarantee!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

2 participants