Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queuing docs #7203

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 106 additions & 3 deletions docs/source/scheduling-policies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,21 @@ of downstream computations, since it can determine how much data will need to be
transferred between workers in the future. Different heuristics are used for these
different scenarios:

Initial Task Placement
~~~~~~~~~~~~~~~~~~~~~~
Initial Task Placement - queuing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When :ref:`queuing <queuing>` is enabled (the default), each initial task is simply
scheduled on the least-busy worker at the moment. If an initial task wants to run, but
all worker threads are occupied, then the task instead goes onto (or stays on) the
queue and is not sent to any worker.

.. _co-assignment:

Initial Task Placement - no queuing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Currently, this scheduling is only used when :ref:`queuing <queuing>` is disabled
(the ``distributed.scheduler.worker-saturation`` config value is set to ``inf``).

We want neighboring root tasks to run on the same worker, since there's a
good chance those neighbors will be combined in a downstream operation::
Expand Down Expand Up @@ -140,6 +153,7 @@ competing interests that might motivate our choice:
the memory footprint small
4. Run tasks that are related so that large chunks of work can be completely
eliminated before running new chunks of work
5. Run tasks that use existing work before starting tasks that create new work

Accomplishing all of these objectives simultaneously is impossible. Optimizing
for any of these objectives perfectly can result in costly overhead. The
Expand Down Expand Up @@ -192,6 +206,89 @@ at a *coarse* level, if not a fine-grained one.
Dask's scheduling policies are short-term-efficient and long-term-fair
to multiple clients.

.. _queuing:

Avoid over-saturating workers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When there are many initial tasks to run, workers don't need to know about all of them
up front::

o o o o o o o o o o
/ \ / \ / \ / \ / \ / \ / \ / \ / \ / \
o o o o o o o o o o o o o o o o o o o o
| | | | | | | | | | | | | | | | | | | |
* * * * * * * * * * * * * * * * * * * * <-- initial tasks

The scheduler only submits initial tasks (``*`` tasks in the figure above) to workers
until all worker threads are filled up [#]_. The remaining initial tasks are put in a queue
on the scheduler, ordered by priority.

Tasks are popped off this queue and scheduled whenever a thread opens up on a worker
*and* there are no other higher-priority tasks (``o`` tasks in this diagram) that could
run instead.

This ensures we finish existing streams of work before starting on new work. This keeps
memory use as low as possible and generally gives much more stable execution compared to
submitting all initial tasks at once.

There are two downsides to this queueing:

1. Initial tasks are not :ref:`co-assigned <co-assignment>`. This means that workers may
have to make data transfers which could have been avoided. This can cause a moderate
slow-down on some workloads compared to disabling queuing. However, in many of those
cases, disabling queuing might cause workers to run out of memory, so the slow-down
is usually a better tradeoff.
2. For embarrassingly-parallel workloads like a ``client.map``, there can be a minor
increase in overhead per task, because each time a task finishes, a
scheduler<->worker roundtrip message is required before the next task starts. In most
cases, this overhead is not even measureable and not something to worry about.

This will only matter if you have very fast tasks, or a very slow network—that is, if
your task runtime is the same order of magnitude as your network latency. For
example, if each task only takes 1ms, and a scheduler<->worker roundtrip message
takes 10ms, all those roundtrip messages will dominate the runtime.

This means you should make your tasks bigger (via larger chunksizes, or batching more
work into single Dask tasks). In general, task runtime should be significantly larger
than network latency for Dask to perform well.

.. [#] By default, it will actually submit slightly more tasks than threads per worker
(for example, 1 extra task for workers with <= 10 threads). This slight buffering
maintains better performance when tasks are very fast. See next section for
details.

.. _adjust-queuing:

Adjusting or disabling queuing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

*It's rare to need to adjust queuing. The default value works well for almost all cases.
Only advanced users wanting to tune performance in unusual cases might consider adjusing
this parameter.*

Queuing behavior is controlled by the ``distributed.scheduler.worker-saturation`` config
value. This is set via the :doc:`Dask configuration system <configuration>`. The config
value must be set on the scheduler, before the scheduler starts.

The value controls how many initial chunks of data workers will have in memory at once.
This is basically the "breadth" of execution through the graph. Specifically, up to
``ceil(worker-saturation * nthreads)`` initial tasks are sent to a worker at a time.

By default, ``worker-saturation`` is ``1.1``. This value was chosen to keep worker
memory relatively low (workers with <= 10 threads will only get 1 extra initial chunk in
memory each), while mitigating the effects of the extra latency for users running on
very slow networks.

* If workers are running out of memory, consider setting ``worker-saturation`` to
``1.0`` instead of ``1.1``.
* If your network is very slow, or your tasks are extremely fast, and you want to
decrease runtime, consider increasing ``worker-saturation``. This *may* speed things
up slightly, at the cost of increased memory use. Values above ``2.0`` usually have
little benefit.
* If your graph would benefit from :ref:`co-assignment <co-assignment>`, and you have
plenty of memory on the cluster, consider disabling queueing by setting
``worker-saturation`` to ``inf`` to speed up runtime.

Where these decisions are made
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -214,8 +311,14 @@ scheduler, and workers at various points in the computation.
submit, persist, map, or any operation that generates futures).
3. Whenever a task is ready to run (its dependencies, if any, are complete),
the scheduler assigns it to a worker. When multiple tasks are ready at once,
they are all submitted to workers, in priority order.
they are submitted to workers, in priority order. If scheduler-side queuing
is active, they are submitted until all workers are full, then any leftover
runnable tasks are put in the scheduler queue. If queuing is disabled, then
all runnable tasks are submitted at once.
4. However, when the worker receives these tasks, it considers their priorities
when determining which tasks to prioritize for fetching data or for
computation. The worker maintains a heap of all ready-to-run tasks ordered
by this priority.
5. If scheduler-side queuing is active: when any task completes on a worker,
if there are no other higher-priority tasks to run, the scheduler pops off
the next queued task and runs it on that worker.