Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document Scheduler and Worker state machine #6948

Merged
merged 14 commits into from
Aug 30, 2022

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Aug 24, 2022

@crusaderky crusaderky self-assigned this Aug 24, 2022
@crusaderky crusaderky added the documentation Improve or add to documentation label Aug 24, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Aug 24, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±0         15 suites  ±0   6h 17m 26s ⏱️ - 11m 42s
  3 052 tests ±0    2 968 ✔️ +1    83 💤 ±0  1  - 1 
22 577 runs  ±0  21 603 ✔️ +2  973 💤  - 1  1  - 1 

For more details on these failures, see this check.

Results for commit cdbcac5. ± Comparison against base commit 6a1b089.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member

fjetter commented Aug 25, 2022

@martindurant @jakirkham you might be interested

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! Very clear and useful documentation; this would be great to read as a new developer interested in working on the Worker. All comments are just naming/grammar nits.

distributed/worker_state_machine.py Outdated Show resolved Hide resolved
@@ -5,15 +5,12 @@ digraph{
];
released1 [label=released];
released2 [label=released];
new -> released1;
released1 -> waiting;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, merge conflict with #6614 for this file

docs/source/images/worker-execute-state.dot Show resolved Hide resolved
docs/source/scheduling-state.rst Outdated Show resolved Hide resolved
docs/source/worker-state.rst Outdated Show resolved Hide resolved
docs/source/worker-state.rst Outdated Show resolved Hide resolved
docs/source/worker-state.rst Outdated Show resolved Hide resolved
docs/source/worker-state.rst Outdated Show resolved Hide resolved
docs/source/worker-state.rst Outdated Show resolved Hide resolved
and only when the message reaches the worker it will be released there too.


Flow control
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading this section of the docs makes me so happy with the design of the worker state machine. Having state transformation strongly separated out from IO and concurrency like this is so nice, and such a big improvement. Nice work!

@martindurant
Copy link
Member

This is a great document to have available for reference.

I have a couple of high-level thoughts before getting into detail. None of these mean I am requesting changes in the implementation.

  • I do not see the need for the RESUMED state, why not just go to the target state?
  • Similarly for CONSTRAINED, which seems identical to READY, which is also constrained but on the thread pool
  • Reschedule appears to cause a task to be rescheduled, but forgotten? From https://distributed.dask.org/en/stable/api.html#distributed.Reschedule I understand that the point is to clear its state so that the scheduler can restart its life-cycle.

Some diagrams are disjoint. It makes it confusing to follow. For example, the big diagram at the top of Computing shows rescheduled->released->forgotten, but two diagrams later we see that ERROR and MEMORY have exactly the same paths.

I would change some names to make them clearer, if more verbose is allowed. Something like
WAITING -> WAITING_ON_DEPS_TO_COMPLETE
READY -> READY_TO_RUN
FETCH -> WAITING_TO_FETCH_DEPS
FLIGHT -> DEPS_IN_FLIGHT
RESCHEDULE -> RESCHEDULE_RAISED

I would add a clear and specific definition and consequence of every state. Some of this is in TaskStates, but I would add specific details about the data structures affected.
For example:

  • EXECUTING, the associated function is currently running on a thread on this worker and appears as a value of that thread ID in worker.active_threads
  • MEMORY, this key is contained in the worker.data dict, the value being the result returned from executing the task's function
  • FORGOTTEN, this key is no longer in worker.tasks (or any other structure?) and will soon be garbage collected
  • ERROR, an uncaught exception was raised either during the execution of a tasks function, or during serializaation/deserialization. The content of the exception and traceback are held in the task, so that they are relayed to the client and raised there when the client requests the corresponding future's result. Tasks that depend on this task will also get the status ERROR (?).

Why is the initial state of any task apparently RELEASED?

There is no mention anywhere of Actors, even though a lot of code is dedicated to them.

@gjoseph92
Copy link
Collaborator

I also just realized I don't think you mention the TaskState.done attribute in here. I think it's worth noting since the name is confusing (it sounds like it refers to the task being in a terminal state, as opposed to the execute/fetch coroutine that was responsible for it being complete).

crusaderky and others added 10 commits August 30, 2022 10:49
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
@crusaderky
Copy link
Collaborator Author

crusaderky commented Aug 30, 2022

I also just realized I don't think you mention the TaskState.done attribute in here. I think it's worth noting since the name is confusing (it sounds like it refers to the task being in a terminal state, as opposed to the execute/fetch coroutine that was responsible for it being complete).

I overhauled the docstring of the flag.

  • I do not see the need for the RESUMED state, why not just go to the target state?

Because you'd have a task in flight which is actually taking up a thread from the threadpool, and which will terminate with one of the subclasses of ExecuteDoneEvent instead of GatherDepDoneEvent; or you'd have a task in executing or long-running which is actually taking up network resources and will terminate with GatherDepDoneEvent instead of ExecuteDoneEvent.

Additionally, when such a task fails, you need to try doing what the scheduler originally asked for. This is actually the norm when a worker dies and the scheduler notices before its peers:

  1. worker dies
  2. scheduler notices and cancels fetches for all keys which existed exclusively on the dead worker
  3. scheduler sends a compute-task message to a random worker
  4. if the worker receiving the compute-task message just happens to be one that previosly had the task in flight, it will need to wait for the TCP connection to fall over, thus sending GatherDepNetworkFailureEvent to the state machine, and then start the computation.

This use case is also documented in the sphinx documents linked above.

  • Similarly for CONSTRAINED, which seems identical to READY, which is also constrained but on the thread pool

It's a lot more efficient to have two different pipelines for tasks with resources and tasks without, so that a task without resources is not blocked by tasks with.

reschedule causes the task to be immediately forgotten on the worker and released on the scheduler, which restarts its life-cycle.

Some diagrams are disjoint. It makes it confusing to follow. For example, the big diagram at the top of Computing shows rescheduled->released->forgotten, but two diagrams later we see that ERROR and MEMORY have exactly the same paths.

Yes, this is on purpose to highlight how reschedule immediately transitions to released and forgotten, while error/memory won't transition to released until the scheduler asks to. I updated the diagrams and the "Forgetting tasks" section.

I would change some names to make them clearer, if more verbose is allowed.

This would make things seriously hard to read considering how many times these labels appear throughout the code.
It would also make it necessary to use enums to avoid misspellings, further aggravating the code verbosity.

FETCH -> WAITING_TO_FETCH_DEPS
FLIGHT -> DEPS_IN_FLIGHT

No, when a task is in fetch or flight state it's itself waiting to be fetched or in flight
If a task is waiting for its dependencies to be gathered, it's in waiting state.
Additionally, a task may be in fetch or flight without being a dependency, as it may have been replicated by the Active Memory Manager.

I would add a clear and specific definition and consequence of every state. Some of this is in TaskStates, but I would add specific details about the data structures affected. For example: [...]

Added clarifications.

Why is the initial state of any task apparently RELEASED?

Historical reasons. There used to be two separate states, new at the beginning and released at the end; then they were merged into one. Either name is improper for the other state. However, you can find a task in released state when it's towards the end of its lifetime, whereas a brand new task will immediately transition to waiting or fetch within a single transitions() call.

There is no mention anywhere of Actors, even though a lot of code is dedicated to them.

Actually, Actors have nothing to do whatsoever with the worker state machine - they're just a task like any other. They are handled exclusively in Worker.

@crusaderky
Copy link
Collaborator Author

All review comments have been addressed

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job!

@crusaderky crusaderky merged commit 817ead3 into dask:main Aug 30, 2022
@crusaderky crusaderky deleted the workerstate_doc branch August 30, 2022 16:11
gjoseph92 pushed a commit to gjoseph92/distributed that referenced this pull request Oct 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improve or add to documentation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[DEV DOCS] Documentation of Scheduler and Worker state machine
4 participants