diff --git a/docs/dask_cudf/source/best_practices.rst b/docs/dask_cudf/source/best_practices.rst new file mode 100644 index 00000000000..142124163af --- /dev/null +++ b/docs/dask_cudf/source/best_practices.rst @@ -0,0 +1,320 @@ +.. _best-practices: + +Dask cuDF Best Practices +======================== + +This page outlines several important guidelines for using `Dask cuDF +`__ effectively. + +.. note:: + Since Dask cuDF is a backend extension for + `Dask DataFrame `__, + the guidelines discussed in the `Dask DataFrames Best Practices + `__ + documentation also apply to Dask cuDF (excluding any pandas-specific + details). + + +Deployment and Configuration +---------------------------- + +Use Dask-CUDA +~~~~~~~~~~~~~ + +To execute a Dask workflow on multiple GPUs, a Dask cluster must +be deployed with `Dask-CUDA `__ +and `Dask.distributed `__. + +When running on a single machine, the `LocalCUDACluster `__ +convenience function is strongly recommended. No matter how many GPUs are +available on the machine (even one!), using `Dask-CUDA has many advantages +`__ +over default (threaded) execution. Just to list a few: + +* Dask-CUDA makes it easy to pin workers to specific devices. +* Dask-CUDA makes it easy to configure memory-spilling options. +* The distributed scheduler collects useful diagnostic information that can be viewed on a dashboard in real time. + +Please see `Dask-CUDA's API `__ +and `Best Practices `__ +documentation for detailed information. Typical ``LocalCUDACluster`` usage +is also illustrated within the multi-GPU section of `Dask cuDF's +`__ documentation. + +.. note:: + When running on cloud infrastructure or HPC systems, it is usually best to + leverage system-specific deployment libraries like `Dask Operator + `__ and `Dask-Jobqueue + `__. + + Please see `the RAPIDS deployment documentation `__ + for further details and examples. + + +Use diagnostic tools +~~~~~~~~~~~~~~~~~~~~ + +The Dask ecosystem includes several diagnostic tools that you should absolutely use. +These tools include an intuitive `browser dashboard +`__ as well as a dedicated +`API for collecting performance profiles +`__. + +No matter the workflow, using the dashboard is strongly recommended. +It provides a visual representation of the worker resources and compute +progress. It also shows basic GPU memory and utilization metrics (under +the ``GPU`` tab). To visualize more detailed GPU metrics in JupyterLab, +use `NVDashboard `__. + + +Enable cuDF spilling +~~~~~~~~~~~~~~~~~~~~ + +When using Dask cuDF for classic ETL workloads, it is usually best +to enable `native spilling support in cuDF +`__. +When using :func:`LocalCUDACluster`, this is easily accomplished by +setting ``enable_cudf_spill=True``. + +When a Dask cuDF workflow includes conversion between DataFrame and Array +representations, native cuDF spilling may be insufficient. For these cases, +`JIT-unspill `__ +is likely to produce better protection from out-of-memory (OOM) errors. +Please see `Dask-CUDA's spilling documentation +`__ for further details +and guidance. + +Use RMM +~~~~~~~ + +Memory allocations in cuDF are significantly faster and more efficient when +the `RAPIDS Memory Manager (RMM) `__ +library is configured appropriately on worker processes. In most cases, the best way to manage +memory is by initializing an RMM pool on each worker before executing a +workflow. When using :func:`LocalCUDACluster`, this is easily accomplished +by setting ``rmm_pool_size`` to a large fraction (e.g. ``0.9``). + +See the `Dask-CUDA memory-management documentation +`__ +for more details. + +Use the Dask DataFrame API +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Although Dask cuDF provides a public ``dask_cudf`` Python module, we +strongly recommended that you use the CPU/GPU portable ``dask.dataframe`` +API instead. Simply `use the Dask configuration system +`__ +to set the ``"dataframe.backend"`` option to ``"cudf"``, and the +``dask_cudf`` module will be imported and used implicitly. + +Be sure to use the :func:`to_backend` method if you need to convert +between the different DataFrame backends. For example:: + + df = df.to_backend("pandas") # This gives us a pandas-backed collection + +.. note:: + Although :func:`to_backend` makes it easy to move data between pandas + and cuDF, repetitive CPU-GPU data movement can degrade performance + significantly. For optimal results, keep your data on the GPU as much + as possible. + +Avoid eager execution +~~~~~~~~~~~~~~~~~~~~~ + +Although Dask DataFrame collections are lazy by default, there are several +notable methods that will result in the immediate execution of the +underlying task graph: + +:func:`compute`: Calling ``ddf.compute()`` will materialize the result of +``ddf`` and return a single cuDF object. This is done by executing the entire +task graph associated with ``ddf`` and concatenating its partitions in +local memory on the client process. + +.. note:: + Never call :func:`compute` on a large collection that cannot fit comfortably + in the memory of a single GPU! + +:func:`persist`: Like :func:`compute`, calling ``ddf.persist()`` will +execute the entire task graph associated with ``ddf``. The most important +difference is that the computed partitions will remain in distributed +worker memory instead of being concatenated together on the client process. +Another difference is that :func:`persist` will return immediately when +executing on a distributed cluster. If you need a blocking synchronization +point in your workflow, simply use the :func:`wait` function:: + + ddf = ddf.persist() + wait(ddf) + +.. note:: + Avoid calling :func:`persist` on a large collection that cannot fit comfortably + in global worker memory. If the total sum of the partition sizes is larger + than the sum of all GPU memory, calling persist will result in significant + spilling from device memory. If the individual partition sizes are large, this + is likely to produce an OOM error. + +:func:`len` / :func:`head` / :func:`tail`: Although these operations are used +often within pandas/cuDF code to quickly inspect data, it is best to avoid +them in Dask DataFrame. In most cases, these operations will execute some or all +of the underlying task graph to materialize the collection. + +:func:`sort_values` / :func:`set_index` : These operations both require Dask to +eagerly collect quantile information about the column(s) being targeted by the +global sort operation. See `Avoid Sorting`__ for notes on sorting considerations. + +.. note:: + When using :func:`set_index`, be sure to pass in ``sort=False`` whenever the + global collection does not **need** to be sorted by the new index. + +Avoid Sorting +~~~~~~~~~~~~~ + +`The design of Dask DataFrame `__ +makes it advantageous to work with data that is already sorted along its index at +creation time. For most other cases, it is best to avoid sorting unless the logic +of the workflow makes global ordering absolutely necessary. + +If the purpose of a :func:`sort_values` operation is to ensure that all unique +values in ``by`` will be moved to the same output partition, then `shuffle +`__ +is often the better option. + + +Reading Data +------------ + +Tune the partition size +~~~~~~~~~~~~~~~~~~~~~~~ + +The ideal partition size is usually between 1/32 and 1/8 the memory +capacity of a single GPU. Increasing the partition size will typically +reduce the number of tasks in your workflow and improve the GPU utilization +for each task. However, if the partitions are too large, the risk of OOM +errors can become significant. + +.. note:: + As a general rule of thumb, start with 1/32-1/16 for shuffle-intensive workflows + (e.g. large-scale sorting and joining), and 1/16-1/8 otherwise. For pathologically + skewed data distributions, it may be necessary to target 1/64 or smaller. + This rule of thumb comes from anecdotal optimization and OOM-debugging + experience. Since every workflow is different, choosing the best partition + size is both an art and a science. + +The easiest way to tune the partition size is when the DataFrame collection +is first created by a function like :func:`read_parquet`, :func:`read_csv`, +or :func:`from_map`. For example, both :func:`read_parquet` and :func:`read_csv` +expose a ``blocksize`` argument for adjusting the maximum partition size. + +If the partition size cannot be tuned effectively at creation time, the +`repartition `__ +method can be used as a last resort. + + +Use Parquet +~~~~~~~~~~~ + +`Parquet `__ is the recommended +file format for Dask cuDF. It provides efficient columnar storage and enables +Dask to perform valuable query optimizations like column projection and +predicate pushdown. + +The most important arguments to :func:`read_parquet` are ``blocksize`` and +``aggregate_files``: + +``blocksize``: Use this argument to specify the maximum partition size. +The default is `"256 MiB"`, but larger values are usually more performant +on GPUs with more than 8 GiB of memory. Dask will use the ``blocksize`` +value to map a discrete number of Parquet row-groups (or files) to each +output partition. This mapping will only account for the uncompressed +storage size of each row group, which is usually smaller than the +correspondng ``cudf.DataFrame``. + +``aggregate_files``: Use this argument to specify whether Dask should +map multiple files to the same DataFrame partition. The default is +``False``, but ``aggregate_files=True`` is usually more performant when +the dataset contains many files that are smaller than half of ``blocksize``. + +If you know that your files correspond to a reasonable partition size +before splitting or aggregation, set ``blocksize=None`` to disallow +file splitting. In the absence of column-projection pushdown, this will +result in a simple 1-to-1 mapping between files and output partitions. + +.. note:: + If your workflow requires a strict 1-to-1 mapping between files and + partitions, use :func:`from_map` to manually construct your partitions + with ``cudf.read_parquet``. When :func:`dd.read_parquet` is used, + query-planning optimizations may automatically aggregate distinct files + into the same partition (even when ``aggregate_files=False``). + +.. note:: + Metadata collection can be extremely slow when reading from remote + storage (e.g. S3 and GCS). When reading many remote files that all + correspond to a reasonable partition size, use ``blocksize=None`` + to avoid unnecessary metadata collection. + + +Use :func:`from_map` +~~~~~~~~~~~~~~~~~~~~ + +To implement custom DataFrame-creation logic that is not covered by +existing APIs (like :func:`read_parquet`), use :func:`dask.dataframe.from_map` +whenever possible. The :func:`from_map` API has several advantages +over :func:`from_delayed`: + +* It allows proper lazy execution of your custom logic +* It enables column projection (as long as the mapped function supports a ``columns`` key-word argument) + +See the `from_map API documentation `__ +for more details. + +.. note:: + Whenever possible, be sure to specify the ``meta`` argument to + :func:`from_map`. If this argument is excluded, Dask will need to + materialize the first partition eagerly. If a large RMM pool is in + use on the first visible device, this eager execution on the client + may lead to an OOM error. + + +Sorting, Joining, and Grouping +------------------------------ + +Sorting, joining, and grouping operations all have the potential to +require the global shuffling of data between distinct partitions. +When the initial data fits comfortably in global GPU memory, these +"all-to-all" operations are typically bound by worker-to-worker +communication. When the data is larger than global GPU memory, the +bottleneck is typically device-to-host memory spilling. + +Although every workflow is different, the following guidelines +are often recommended: + +* `Use a distributed cluster with Dask-CUDA workers `_ +* `Use native cuDF spilling whenever possible `_ +* Avoid shuffling whenever possible + * Use ``split_out=1`` for low-cardinality groupby aggregations + * Use ``broadcast=True`` for joins when at least one collection comprises a small number of partitions (e.g. ``<=5``) +* `Use UCX `__ if communication is a bottleneck. + +.. note:: + UCX enables Dask-CUDA workers to communicate using high-performance + tansport technologies like `NVLink `__ + and Infiniband. Without UCX, inter-process communication will rely + on TCP sockets. + + +User-defined functions +---------------------- + +Most real-world Dask DataFrame workflows use `map_partitions +`__ +to map user-defined functions across every partition of the underlying data. +This API is a fantastic way to apply custom operations in an intuitive and +scalable way. With that said, the :func:`map_partitions` method will produce +an opaque DataFrame expression that blocks the query-planning `optimizer +`__ from performing +useful optimizations (like projection and filter pushdown). + +Since column-projection pushdown is often the most effective optimization, +it is important to select the necessary columns both before and after calling +:func:`map_partitions`. You can also add explicit filter operations to further +mitigate the loss of filter pushdown. diff --git a/docs/dask_cudf/source/index.rst b/docs/dask_cudf/source/index.rst index 7fe6cbd45fa..23ca7e49753 100644 --- a/docs/dask_cudf/source/index.rst +++ b/docs/dask_cudf/source/index.rst @@ -15,7 +15,7 @@ as the ``"cudf"`` dataframe backend for .. note:: Neither Dask cuDF nor Dask DataFrame provide support for multi-GPU or multi-node execution on their own. You must also deploy a - `dask.distributed ` cluster + `dask.distributed `__ cluster to leverage multiple GPUs. We strongly recommend using `Dask-CUDA `__ to simplify the setup of the cluster, taking advantage of all features of the GPU @@ -29,6 +29,10 @@ minutes to Dask by `10 minutes to cuDF and Dask cuDF `__. +After reviewing the sections below, please see the +:ref:`Best Practices ` page for further guidance on +using Dask cuDF effectively. + Using Dask cuDF --------------- @@ -36,7 +40,7 @@ Using Dask cuDF The Dask DataFrame API (Recommended) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Simply use the `Dask configuration ` system to +Simply use the `Dask configuration `__ system to set the ``"dataframe.backend"`` option to ``"cudf"``. From Python, this can be achieved like so:: @@ -50,14 +54,14 @@ environment before running your code. Once this is done, the public Dask DataFrame API will leverage ``cudf`` automatically when a new DataFrame collection is created from an on-disk format using any of the following ``dask.dataframe`` -functions:: +functions: -* :func:`dask.dataframe.read_parquet` -* :func:`dask.dataframe.read_json` -* :func:`dask.dataframe.read_csv` -* :func:`dask.dataframe.read_orc` -* :func:`dask.dataframe.read_hdf` -* :func:`dask.dataframe.from_dict` +* :func:`read_parquet` +* :func:`read_json` +* :func:`read_csv` +* :func:`read_orc` +* :func:`read_hdf` +* :func:`from_dict` For example:: @@ -112,8 +116,8 @@ performance benefit over the CPU/GPU-portable ``dask.dataframe`` API. Also, using some parts of the explicit API are incompatible with automatic query planning (see the next section). -The explicit Dask cuDF API -~~~~~~~~~~~~~~~~~~~~~~~~~~ +Query Planning +~~~~~~~~~~~~~~ Dask cuDF now provides automatic query planning by default (RAPIDS 24.06+). As long as the ``"dataframe.query-planning"`` configuration is set to diff --git a/python/dask_cudf/README.md b/python/dask_cudf/README.md index 4655d2165f0..69e1524be39 100644 --- a/python/dask_cudf/README.md +++ b/python/dask_cudf/README.md @@ -16,6 +16,7 @@ See the [RAPIDS install page](https://docs.rapids.ai/install) for the most up-to ## Resources - [Dask cuDF documentation](https://docs.rapids.ai/api/dask-cudf/stable/) +- [Best practices](https://docs.rapids.ai/api/dask-cudf/stable/best_practices/) - [cuDF documentation](https://docs.rapids.ai/api/cudf/stable/) - [10 Minutes to cuDF and Dask cuDF](https://docs.rapids.ai/api/cudf/stable/user_guide/10min/) - [Dask-CUDA documentation](https://docs.rapids.ai/api/dask-cuda/stable/)