Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add best practices page to Dask cuDF docs #16821

Merged
merged 31 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f01fd71
start best practices page for dask-cudf
rjzamora Sep 16, 2024
7aa8041
revisions
rjzamora Sep 17, 2024
b2ce634
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 17, 2024
1e028ea
address code review
rjzamora Sep 18, 2024
3332717
more revisions
rjzamora Sep 18, 2024
eee37f3
more revisions
rjzamora Sep 18, 2024
7c63c7e
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 18, 2024
a425405
add from_map note on meta
rjzamora Sep 18, 2024
9233524
add note on diagnostics
rjzamora Sep 18, 2024
bd144c2
fix typos
rjzamora Sep 18, 2024
5f854e7
tweak wording
rjzamora Sep 19, 2024
397efa7
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
6c8771b
fix map_partitions typo
rjzamora Sep 19, 2024
f7731b8
revisions
rjzamora Sep 19, 2024
581a69f
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
8515cb9
fix spelling error and add link to quick-start example
rjzamora Sep 19, 2024
a23deff
replace link to readme
rjzamora Sep 19, 2024
4c1b55d
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 19, 2024
8ecd536
add a bit more info about wait and CPU-GPU data movement
rjzamora Sep 20, 2024
251bf23
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
40a638e
update
rjzamora Sep 20, 2024
d082cac
Apply suggestions from code review
rjzamora Sep 20, 2024
8152fca
Apply suggestions from code review
rjzamora Sep 20, 2024
a653a5a
Merge remote-tracking branch 'upstream/branch-24.10' into dask-cudf-b…
rjzamora Sep 20, 2024
91d4fd5
fix lists
rjzamora Sep 20, 2024
d58a5ce
fix func list
rjzamora Sep 20, 2024
59e597a
roll back func change
rjzamora Sep 20, 2024
adbd22d
fix more double-colon mistakes
rjzamora Sep 20, 2024
216d5de
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
d76dbd6
Apply suggestions from code review
rjzamora Sep 20, 2024
da7308a
Merge branch 'branch-24.10' into dask-cudf-best-practices
rjzamora Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 320 additions & 0 deletions docs/dask_cudf/source/best_practices.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
.. _best-practices:

Dask cuDF Best Practices
========================

This page outlines several important guidelines for using `Dask cuDF
<https://docs.rapids.ai/api/dask-cudf/stable/>`__ effectively.

.. note::
Since Dask cuDF is a backend extension for
`Dask DataFrame <https://docs.dask.org/en/stable/dataframe.html>`__,
the guidelines discussed in the `Dask DataFrames Best Practices
<https://docs.dask.org/en/stable/dataframe-best-practices.html>`__
documentation also apply to Dask cuDF (excluding any pandas-specific
details).


Deployment and Configuration
----------------------------

Use Dask-CUDA
~~~~~~~~~~~~~

To execute a Dask workflow on multiple GPUs, a Dask cluster must
be deployed with `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/>`__
and `Dask.distributed <https://distributed.dask.org/en/stable/>`__.

When running on a single machine, the `LocalCUDACluster <https://docs.rapids.ai/api/dask-cuda/stable/api/#dask_cuda.LocalCUDACluster>`__
convenience function is strongly recommended. No matter how many GPUs are
available on the machine (even one!), using `Dask-CUDA has many advantages
<https://docs.rapids.ai/api/dask-cuda/stable/#motivation>`__
over default (threaded) execution. Just to list a few:

* Dask-CUDA makes it easy to pin workers to specific devices.
* Dask-CUDA makes it easy to configure memory-spilling options.
* The distributed scheduler collects useful diagnostic information that can be viewed on a dashboard in real time.
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

Please see `Dask-CUDA's API <https://docs.rapids.ai/api/dask-cuda/stable/>`__
and `Best Practices <https://docs.rapids.ai/api/dask-cuda/stable/examples/best-practices/>`__
documentation for detailed information. Typical ``LocalCUDACluster`` usage
is also illustrated within the multi-GPU section of `Dask cuDF's
<https://docs.rapids.ai/api/dask-cudf/stable/>`__ documentation.

.. note::
When running on cloud infrastructure or HPC systems, it is usually best to
leverage system-specific deployment libraries like `Dask Operator
<https://docs.dask.org/en/latest/deploying-kubernetes.html>`__ and `Dask-Jobqueue
<https://jobqueue.dask.org/en/latest/>`__.

Please see `the RAPIDS deployment documentation <https://docs.rapids.ai/deployment/stable/>`__
for further details and examples.


Use diagnostic tools
~~~~~~~~~~~~~~~~~~~~

The Dask ecosystem includes several diagnostic tools that you should absolutely use.
These tools include an intuitive `browser dashboard
<https://docs.dask.org/en/stable/dashboard.html>`__ as well as a dedicated
`API for collecting performance profiles
<https://distributed.dask.org/en/latest/diagnosing-performance.html#performance-reports>`__.

jacobtomlinson marked this conversation as resolved.
Show resolved Hide resolved
No matter the workflow, using the dashboard is strongly recommended.
It provides a visual representation of the worker resources and compute
progress. It also shows basic GPU memory and utilization metrics (under
the ``GPU`` tab). To visualize more detailed GPU metrics in JupyterLab,
use `NVDashboard <https://github.com/rapidsai/jupyterlab-nvdashboard>`__.


Enable cuDF spilling
~~~~~~~~~~~~~~~~~~~~

When using Dask cuDF for classic ETL workloads, it is usually best
to enable `native spilling support in cuDF
<https://docs.rapids.ai/api/cudf/stable/developer_guide/library_design/#spilling-to-host-memory>`__.
When using :func:`LocalCUDACluster`, this is easily accomplished by
setting ``enable_cudf_spill=True``.

When a Dask cuDF workflow includes conversion between DataFrame and Array
representations, native cuDF spilling may be insufficient. For these cases,
`JIT-unspill <https://docs.rapids.ai/api/dask-cuda/nightly/spilling/#jit-unspill>`__
is likely to produce better protection from out-of-memory (OOM) errors.
Please see `Dask-CUDA's spilling documentation
<https://docs.rapids.ai/api/dask-cuda/24.10/spilling/>`__ for further details
and guidance.

Use RMM
~~~~~~~

Memory allocations in cuDF are significantly faster and more efficient when
the `RAPIDS Memory Manager (RMM) <https://docs.rapids.ai/api/rmm/stable/>`__
library is configured appropriately on worker processes. In most cases, the best way to manage
memory is by initializing an RMM pool on each worker before executing a
workflow. When using :func:`LocalCUDACluster`, this is easily accomplished
by setting ``rmm_pool_size`` to a large fraction (e.g. ``0.9``).
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

See the `Dask-CUDA memory-management documentation
<https://docs.rapids.ai/api/dask-cuda/nightly/examples/best-practices/#gpu-memory-management>`__
for more details.

Use the Dask DataFrame API
~~~~~~~~~~~~~~~~~~~~~~~~~~
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

Although Dask cuDF provides a public ``dask_cudf`` Python module, we
strongly recommended that you use the CPU/GPU portable ``dask.dataframe``
API instead. Simply `use the Dask configuration system
<https://docs.dask.org/en/stable/how-to/selecting-the-collection-backend.html>`__
to set the ``"dataframe.backend"`` option to ``"cudf"``, and the
``dask_cudf`` module will be imported and used implicitly.

Be sure to use the :func:`to_backend` method if you need to convert
between the different DataFrame backends. For example::

df = df.to_backend("pandas") # This gives us a pandas-backed collection

.. note::
Although :func:`to_backend` makes it easy to move data between pandas
and cuDF, repetitive CPU-GPU data movement can degrade performance
significantly. For optimal results, keep your data on the GPU as much
as possible.

Avoid eager execution
~~~~~~~~~~~~~~~~~~~~~

Although Dask DataFrame collections are lazy by default, there are several
notable methods that will result in the immediate execution of the
underlying task graph:

:func:`compute`: Calling ``ddf.compute()`` will materialize the result of
``ddf`` and return a single cuDF object. This is done by executing the entire
task graph associated with ``ddf`` and concatenating its partitions in
local memory on the client process.

.. note::
Never call :func:`compute` on a large collection that cannot fit comfortably
in the memory of a single GPU!

:func:`persist`: Like :func:`compute`, calling ``ddf.persist()`` will
execute the entire task graph associated with ``ddf``. The most important
difference is that the computed partitions will remain in distributed
worker memory instead of being concatenated together on the client process.
Another difference is that :func:`persist` will return immediately when
executing on a distributed cluster. If you need a blocking synchronization
point in your workflow, simply use the :func:`wait` function::

ddf = ddf.persist()
wait(ddf)

.. note::
Avoid calling :func:`persist` on a large collection that cannot fit comfortably
in global worker memory. If the total sum of the partition sizes is larger
than the sum of all GPU memory, calling persist will result in significant
spilling from device memory. If the individual partition sizes are large, this
is likely to produce an OOM error.

:func:`len` / :func:`head` / :func:`tail`: Although these operations are used
often within pandas/cuDF code to quickly inspect data, it is best to avoid
them in Dask DataFrame. In most cases, these operations will execute some or all
of the underlying task graph to materialize the collection.

:func:`sort_values` / :func:`set_index` : These operations both require Dask to
eagerly collect quantile information about the column(s) being targeted by the
global sort operation. See `Avoid Sorting`__ for notes on sorting considerations.

.. note::
When using :func:`set_index`, be sure to pass in ``sort=False`` whenever the
global collection does not **need** to be sorted by the new index.

Avoid Sorting
~~~~~~~~~~~~~

`The design of Dask DataFrame <https://docs.dask.org/en/stable/dataframe-design.html#dask-dataframe-design>`__
makes it advantageous to work with data that is already sorted along its index at
creation time. For most other cases, it is best to avoid sorting unless the logic
of the workflow makes global ordering absolutely necessary.

If the purpose of a :func:`sort_values` operation is to ensure that all unique
values in ``by`` will be moved to the same output partition, then `shuffle
<https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.shuffle.html>`__
is often the better option.


Reading Data
------------

Tune the partition size
~~~~~~~~~~~~~~~~~~~~~~~

The ideal partition size is usually between 1/32 and 1/8 the memory
capacity of a single GPU. Increasing the partition size will typically
reduce the number of tasks in your workflow and improve the GPU utilization
for each task. However, if the partitions are too large, the risk of OOM
errors can become significant.

.. note::
As a general rule of thumb, start with 1/32-1/16 for shuffle-intensive workflows
(e.g. large-scale sorting and joining), and 1/16-1/8 otherwise. For pathologically
skewed data distributions, it may be necessary to target 1/64 or smaller.
This rule of thumb comes from anecdotal optimization and OOM-debugging
experience. Since every workflow is different, choosing the best partition
size is both an art and a science.

The easiest way to tune the partition size is when the DataFrame collection
is first created by a function like :func:`read_parquet`, :func:`read_csv`,
or :func:`from_map`. For example, both :func:`read_parquet` and :func:`read_csv`
expose a ``blocksize`` argument for adjusting the maximum partition size.

If the partition size cannot be tuned effectively at creation time, the
`repartition <https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.repartition.html>`__
method can be used as a last resort.


Use Parquet
~~~~~~~~~~~

`Parquet <https://parquet.apache.org/docs/file-format/>`__ is the recommended
file format for Dask cuDF. It provides efficient columnar storage and enables
Dask to perform valuable query optimizations like column projection and
predicate pushdown.

The most important arguments to :func:`read_parquet` are ``blocksize`` and
``aggregate_files``:

``blocksize``: Use this argument to specify the maximum partition size.
The default is `"256 MiB"`, but larger values are usually more performant
on GPUs with more than 8 GiB of memory. Dask will use the ``blocksize``
value to map a discrete number of Parquet row-groups (or files) to each
output partition. This mapping will only account for the uncompressed
storage size of each row group, which is usually smaller than the
correspondng ``cudf.DataFrame``.

``aggregate_files``: Use this argument to specify whether Dask should
map multiple files to the same DataFrame partition. The default is
``False``, but ``aggregate_files=True`` is usually more performant when
the dataset contains many files that are smaller than half of ``blocksize``.

If you know that your files correspond to a reasonable partition size
before splitting or aggregation, set ``blocksize=None`` to disallow
file splitting. In the absence of column-projection pushdown, this will
result in a simple 1-to-1 mapping between files and output partitions.

.. note::
If your workflow requires a strict 1-to-1 mapping between files and
partitions, use :func:`from_map` to manually construct your partitions
with ``cudf.read_parquet``. When :func:`dd.read_parquet` is used,
query-planning optimizations may automatically aggregate distinct files
into the same partition (even when ``aggregate_files=False``).

.. note::
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this note. Once we have more cloud IO specific optimizations it might make sense to add it to best practices or create a new one for cloud IO to discuss tips/tricks for those environments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that we need a lot more remote-IO information. However, it doesn't feel like there is much to say yet :/

Metadata collection can be extremely slow when reading from remote
storage (e.g. S3 and GCS). When reading many remote files that all
correspond to a reasonable partition size, use ``blocksize=None``
to avoid unnecessary metadata collection.


Use :func:`from_map`
~~~~~~~~~~~~~~~~~~~~

To implement custom DataFrame-creation logic that is not covered by
existing APIs (like :func:`read_parquet`), use :func:`dask.dataframe.from_map`
whenever possible. The :func:`from_map` API has several advantages
over :func:`from_delayed`:

* It allows proper lazy execution of your custom logic
* It enables column projection (as long as the mapped function supports a ``columns`` key-word argument)

See the `from_map API documentation <https://docs.dask.org/en/stable/generated/dask_expr.from_map.html#dask_expr.from_map>`__
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
for more details.

.. note::
Whenever possible, be sure to specify the ``meta`` argument to
:func:`from_map`. If this argument is excluded, Dask will need to
materialize the first partition eagerly. If a large RMM pool is in
use on the first visible device, this eager execution on the client
may lead to an OOM error.


Sorting, Joining, and Grouping
------------------------------

Sorting, joining, and grouping operations all have the potential to
require the global shuffling of data between distinct partitions.
When the initial data fits comfortably in global GPU memory, these
"all-to-all" operations are typically bound by worker-to-worker
communication. When the data is larger than global GPU memory, the
bottleneck is typically device-to-host memory spilling.

Although every workflow is different, the following guidelines
are often recommended:

* `Use a distributed cluster with Dask-CUDA workers <Use Dask-CUDA>`_
* `Use native cuDF spilling whenever possible <Enable cuDF Spilling>`_
* Avoid shuffling whenever possible
* Use ``split_out=1`` for low-cardinality groupby aggregations
* Use ``broadcast=True`` for joins when at least one collection comprises a small number of partitions (e.g. ``<=5``)
* `Use UCX <https://docs.rapids.ai/api/dask-cuda/nightly/examples/ucx/>`__ if communication is a bottleneck.

.. note::
UCX enables Dask-CUDA workers to communicate using high-performance
tansport technologies like `NVLink <https://www.nvidia.com/en-us/data-center/nvlink/>`__
and Infiniband. Without UCX, inter-process communication will rely
on TCP sockets.

rjzamora marked this conversation as resolved.
Show resolved Hide resolved
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

rjzamora marked this conversation as resolved.
Show resolved Hide resolved
User-defined functions
----------------------

Most real-world Dask DataFrame workflows use `map_partitions
<https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.map_partitions.html>`__
to map user-defined functions across every partition of the underlying data.
This API is a fantastic way to apply custom operations in an intuitive and
scalable way. With that said, the :func:`map_partitions` method will produce
an opaque DataFrame expression that blocks the query-planning `optimizer
<https://docs.dask.org/en/stable/dataframe-optimizer.html>`__ from performing
useful optimizations (like projection and filter pushdown).

Since column-projection pushdown is often the most effective optimization,
it is important to select the necessary columns both before and after calling
:func:`map_partitions`. You can also add explicit filter operations to further
mitigate the loss of filter pushdown.
26 changes: 15 additions & 11 deletions docs/dask_cudf/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ as the ``"cudf"`` dataframe backend for
.. note::
Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU
or multi-node execution on their own. You must also deploy a
`dask.distributed <https://distributed.dask.org/en/stable/>` cluster
`dask.distributed <https://distributed.dask.org/en/stable/>`__ cluster
to leverage multiple GPUs. We strongly recommend using `Dask-CUDA
<https://docs.rapids.ai/api/dask-cuda/stable/>`__ to simplify the
setup of the cluster, taking advantage of all features of the GPU
Expand All @@ -29,14 +29,18 @@ minutes to Dask
by `10 minutes to cuDF and Dask cuDF
<https://docs.rapids.ai/api/cudf/stable/user_guide/10min.html>`__.

After reviewing the sections below, please see the
:ref:`Best Practices <best-practices>` page for further guidance on
using Dask cuDF effectively.


Using Dask cuDF
---------------

The Dask DataFrame API (Recommended)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Simply use the `Dask configuration <dask:configuration>` system to
Simply use the `Dask configuration <dask:configuration>`__ system to
set the ``"dataframe.backend"`` option to ``"cudf"``. From Python,
this can be achieved like so::

Expand All @@ -50,14 +54,14 @@ environment before running your code.
Once this is done, the public Dask DataFrame API will leverage
``cudf`` automatically when a new DataFrame collection is created
from an on-disk format using any of the following ``dask.dataframe``
functions::
functions:

* :func:`dask.dataframe.read_parquet`
* :func:`dask.dataframe.read_json`
* :func:`dask.dataframe.read_csv`
* :func:`dask.dataframe.read_orc`
* :func:`dask.dataframe.read_hdf`
* :func:`dask.dataframe.from_dict`
* :func:`read_parquet`
* :func:`read_json`
* :func:`read_csv`
* :func:`read_orc`
* :func:`read_hdf`
* :func:`from_dict`

For example::

Expand Down Expand Up @@ -112,8 +116,8 @@ performance benefit over the CPU/GPU-portable ``dask.dataframe`` API.
Also, using some parts of the explicit API are incompatible with
automatic query planning (see the next section).

The explicit Dask cuDF API
~~~~~~~~~~~~~~~~~~~~~~~~~~
Query Planning
~~~~~~~~~~~~~~

Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+).
As long as the ``"dataframe.query-planning"`` configuration is set to
Expand Down
1 change: 1 addition & 0 deletions python/dask_cudf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ See the [RAPIDS install page](https://docs.rapids.ai/install) for the most up-to
## Resources

- [Dask cuDF documentation](https://docs.rapids.ai/api/dask-cudf/stable/)
- [Best practices](https://docs.rapids.ai/api/dask-cudf/stable/best_practices/)
- [cuDF documentation](https://docs.rapids.ai/api/cudf/stable/)
- [10 Minutes to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/)
- [Dask-CUDA documentation](https://docs.rapids.ai/api/dask-cuda/stable/)
Expand Down
Loading