Skip to content

Commit

Permalink
Add support for configuring Dask distributed (#2049)
Browse files Browse the repository at this point in the history
Co-authored-by: Valeriu Predoi <valeriu.predoi@gmail.com>
Co-authored-by: Rémi Kazeroni <remi.kazeroni@dlr.de>
  • Loading branch information
3 people authored Jun 1, 2023
1 parent f656483 commit 1c1e6f1
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 19 deletions.
2 changes: 2 additions & 0 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@
(f'https://docs.esmvaltool.org/projects/ESMValCore/en/{rtd_version}/',
None),
'esmvaltool': (f'https://docs.esmvaltool.org/en/{rtd_version}/', None),
'dask': ('https://docs.dask.org/en/stable/', None),
'distributed': ('https://distributed.dask.org/en/stable/', None),
'iris': ('https://scitools-iris.readthedocs.io/en/latest/', None),
'iris-esmf-regrid': ('https://iris-esmf-regrid.readthedocs.io/en/latest',
None),
Expand Down
155 changes: 155 additions & 0 deletions doc/quickstart/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,161 @@ the user.
debugging, etc. You can even provide any config user value as a run flag
``--argument_name argument_value``

.. _config-dask:

Dask distributed configuration
==============================

The :ref:`preprocessor functions <preprocessor_functions>` and many of the
:ref:`Python diagnostics in ESMValTool <esmvaltool:recipes>` make use of the
:ref:`Iris <iris:iris_docs>` library to work with the data.
In Iris, data can be either :ref:`real or lazy <iris:real_and_lazy_data>`.
Lazy data is represented by `dask arrays <https://docs.dask.org/en/stable/array.html>`_.
Dask arrays consist of many small
`numpy arrays <https://numpy.org/doc/stable/user/absolute_beginners.html#what-is-an-array>`_
(called chunks) and if possible, computations are run on those small arrays in
parallel.
In order to figure out what needs to be computed when, Dask makes use of a
'`scheduler <https://docs.dask.org/en/stable/scheduling.html>`_'.
The default scheduler in Dask is rather basic, so it can only run on a single
computer and it may not always find the optimal task scheduling solution,
resulting in excessive memory use when using e.g. the
:func:`esmvalcore.preprocessor.multi_model_statistics` preprocessor function.
Therefore it is recommended that you take a moment to configure the
`Dask distributed <https://distributed.dask.org>`_ scheduler.
A Dask scheduler and the 'workers' running the actual computations, are
collectively called a 'Dask cluster'.

In ESMValCore, the Dask cluster can configured by creating a file called
``~/.esmvaltool/dask.yml``, where ``~`` is short for your home directory.
In this file, under the ``client`` keyword, the arguments to
:obj:`distributed.Client` can be provided.
Under the ``cluster`` keyword, the type of cluster (e.g.
:obj:`distributed.LocalCluster`), as well as any arguments required to start
the cluster can be provided.
Extensive documentation on setting up Dask Clusters is available
`here <https://docs.dask.org/en/latest/deploying.html>`__.

.. warning::

The format of the ``~/.esmvaltool/dask.yml`` configuration file is not yet
fixed and may change in the next release of ESMValCore.

.. note::

If not all preprocessor functions support lazy data, computational
performance may be best with the default scheduler.
See `issue #674 <https://github.com/ESMValGroup/ESMValCore/issues/674>`_ for
progress on making all preprocessor functions lazy.

**Example configurations**

*Personal computer*

Create a Dask distributed cluster on the computer running ESMValCore using
all available resources:

.. code:: yaml
cluster:
type: distributed.LocalCluster
this should work well for most personal computers.

.. note::

Note that, if running this configuration on a shared node of an HPC cluster,
Dask will try and use as many resources it can find available, and this may
lead to overcrowding the node by a single user (you)!

*Shared computer*

Create a Dask distributed cluster on the computer running ESMValCore, with
2 workers with 4 threads/4 GiB of memory each (8 GiB in total):

.. code:: yaml
cluster:
type: distributed.LocalCluster
n_workers: 2
threads_per_worker: 4
memory_limit: 4 GiB
this should work well for shared computers.

*Computer cluster*

Create a Dask distributed cluster on the
`Levante <https://docs.dkrz.de/doc/levante/running-jobs/index.html>`_
supercomputer using the `Dask-Jobqueue <https://jobqueue.dask.org/en/latest/>`_
package:

.. code:: yaml
cluster:
type: dask_jobqueue.SLURMCluster
queue: shared
account: bk1088
cores: 8
memory: 7680MiB
processes: 2
interface: ib0
local_directory: "/scratch/b/b381141/dask-tmp"
n_workers: 24
This will start 24 workers with ``cores / processes = 4`` threads each,
resulting in ``n_workers / processes = 12`` Slurm jobs, where each Slurm job
will request 8 CPU cores and 7680 MiB of memory and start ``processes = 2``
workers.
This example will use the fast infiniband network connection (called ``ib0``
on Levante) for communication between workers running on different nodes.
It is
`important to set the right location for temporary storage <https://docs.dask.org/en/latest/deploying-hpc.html#local-storage>`__,
in this case the ``/scratch`` space is used.
It is also possible to use environmental variables to configure the temporary
storage location, if you cluster provides these.

A configuration like this should work well for larger computations where it is
advantageous to use multiple nodes in a compute cluster.
See
`Deploying Dask Clusters on High Performance Computers <https://docs.dask.org/en/latest/deploying-hpc.html>`_
for more information.

*Externally managed Dask cluster*

Use an externally managed cluster, e.g. a cluster that you started using the
`Dask Jupyterlab extension <https://github.com/dask/dask-labextension#dask-jupyterlab-extension>`_:

.. code:: yaml
client:
address: '127.0.0.1:8786'
See `here <https://jobqueue.dask.org/en/latest/interactive.html>`_
for an example of how to configure this on a remote system.

For debugging purposes, it can be useful to start the cluster outside of
ESMValCore because then
`Dask dashboard <https://docs.dask.org/en/stable/dashboard.html>`_ remains
available after ESMValCore has finished running.

**Advice on choosing performant configurations**

The threads within a single worker can access the same memory locations, so
they may freely pass around chunks, while communicating a chunk between workers
is done by copying it, so this is (a bit) slower.
Therefore it is beneficial for performance to have multiple threads per worker.
However, due to limitations in the CPython implementation (known as the Global
Interpreter Lock or GIL), only a single thread in a worker can execute Python
code (this limitation does not apply to compiled code called by Python code,
e.g. numpy), therefore the best performing configurations will typically not
use much more than 10 threads per worker.

Due to limitations of the NetCDF library (it is not thread-safe), only one
of the threads in a worker can read or write to a NetCDF file at a time.
Therefore, it may be beneficial to use fewer threads per worker if the
computation is very simple and the runtime is determined by the
speed with which the data can be read from and/or written to disk.

.. _config-esgf:

Expand Down
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ dependencies:
- cftime
- compilers
- dask
- dask-jobqueue
- distributed
- esgf-pyclient>=0.3.1
- esmpy!=8.1.0
- filelock
Expand All @@ -18,7 +20,7 @@ dependencies:
- geopy
- humanfriendly
- importlib_resources
- iris>=3.4.0
- iris>=3.6.0
- iris-esmf-regrid >=0.6.0 # to work with latest esmpy
- isodate
- jinja2
Expand Down
3 changes: 3 additions & 0 deletions esmvalcore/_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def process_recipe(recipe_file: Path, session):
import shutil

from esmvalcore._recipe.recipe import read_recipe_file
from esmvalcore.config._dask import check_distributed_config
if not recipe_file.is_file():
import errno
raise OSError(errno.ENOENT, "Specified recipe file does not exist",
Expand Down Expand Up @@ -103,6 +104,8 @@ def process_recipe(recipe_file: Path, session):
logger.info("If you experience memory problems, try reducing "
"'max_parallel_tasks' in your user configuration file.")

check_distributed_config()

if session['compress_netcdf']:
logger.warning(
"You have enabled NetCDF compression. Accessing .nc files can be "
Expand Down
38 changes: 30 additions & 8 deletions esmvalcore/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import psutil
import yaml
from distributed import Client

from ._citation import _write_citation_files
from ._provenance import TrackedFile, get_task_provenance
from .config._dask import get_distributed_client
from .config._diagnostics import DIAGNOSTICS, TAGS


Expand Down Expand Up @@ -718,10 +720,22 @@ def run(self, max_parallel_tasks: Optional[int] = None) -> None:
max_parallel_tasks : int
Number of processes to run. If `1`, run the tasks sequentially.
"""
if max_parallel_tasks == 1:
self._run_sequential()
else:
self._run_parallel(max_parallel_tasks)
with get_distributed_client() as client:
if client is None:
address = None
else:
address = client.scheduler.address
for task in self.flatten():
if (isinstance(task, DiagnosticTask)
and Path(task.script).suffix.lower() == '.py'):
# Only insert the scheduler address if running a
# Python script.
task.settings['scheduler_address'] = address

if max_parallel_tasks == 1:
self._run_sequential()
else:
self._run_parallel(address, max_parallel_tasks)

def _run_sequential(self) -> None:
"""Run tasks sequentially."""
Expand All @@ -732,7 +746,7 @@ def _run_sequential(self) -> None:
for task in sorted(tasks, key=lambda t: t.priority):
task.run()

def _run_parallel(self, max_parallel_tasks=None):
def _run_parallel(self, scheduler_address, max_parallel_tasks):
"""Run tasks in parallel."""
scheduled = self.flatten()
running = {}
Expand All @@ -757,7 +771,8 @@ def done(task):
if len(running) >= max_parallel_tasks:
break
if all(done(t) for t in task.ancestors):
future = pool.apply_async(_run_task, [task])
future = pool.apply_async(_run_task,
[task, scheduler_address])
running[task] = future
scheduled.remove(task)

Expand Down Expand Up @@ -790,7 +805,14 @@ def _copy_results(task, future):
task.output_files, task.products = future.get()


def _run_task(task):
def _run_task(task, scheduler_address):
"""Run task and return the result."""
output_files = task.run()
if scheduler_address is None:
client = contextlib.nullcontext()
else:
client = Client(scheduler_address)

with client:
output_files = task.run()

return output_files, task.products
79 changes: 79 additions & 0 deletions esmvalcore/config/_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Configuration for Dask distributed."""
import contextlib
import importlib
import logging
from pathlib import Path

import yaml
from distributed import Client

logger = logging.getLogger(__name__)

CONFIG_FILE = Path.home() / '.esmvaltool' / 'dask.yml'


def check_distributed_config():
"""Check the Dask distributed configuration."""
if not CONFIG_FILE.exists():
logger.warning(
"Using the Dask basic scheduler. This may lead to slow "
"computations and out-of-memory errors. "
"Note that the basic scheduler may still be the best choice for "
"preprocessor functions that are not lazy. "
"In that case, you can safely ignore this warning. "
"See https://docs.esmvaltool.org/projects/ESMValCore/en/latest/"
"quickstart/configure.html#dask-distributed-configuration for "
"more information. ")


@contextlib.contextmanager
def get_distributed_client():
"""Get a Dask distributed client."""
dask_args = {}
if CONFIG_FILE.exists():
config = yaml.safe_load(CONFIG_FILE.read_text(encoding='utf-8'))
if config is not None:
dask_args = config

client_args = dask_args.get('client') or {}
cluster_args = dask_args.get('cluster') or {}

# Start a cluster, if requested
if 'address' in client_args:
# Use an externally managed cluster.
cluster = None
if cluster_args:
logger.warning(
"Not using Dask 'cluster' settings from %s because a cluster "
"'address' is already provided in 'client'.", CONFIG_FILE)
elif cluster_args:
# Start cluster.
cluster_type = cluster_args.pop(
'type',
'distributed.LocalCluster',
)
cluster_module_name, cluster_cls_name = cluster_type.rsplit('.', 1)
cluster_module = importlib.import_module(cluster_module_name)
cluster_cls = getattr(cluster_module, cluster_cls_name)
cluster = cluster_cls(**cluster_args)
client_args['address'] = cluster.scheduler_address
else:
# No cluster configured, use Dask basic scheduler, or a LocalCluster
# managed through Client.
cluster = None

# Start a client, if requested
if dask_args:
client = Client(**client_args)
logger.info("Dask dashboard: %s", client.dashboard_link)
else:
logger.info("Using the Dask basic scheduler.")
client = None

try:
yield client
finally:
if client is not None:
client.close()
if cluster is not None:
cluster.close()
3 changes: 2 additions & 1 deletion esmvalcore/experimental/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import yaml

from esmvalcore._recipe.recipe import Recipe as RecipeEngine
from esmvalcore.config import CFG, Session
from esmvalcore.config import CFG, Session, _dask

from ._logging import log_to_dir
from .recipe_info import RecipeInfo
Expand Down Expand Up @@ -132,6 +132,7 @@ def run(
session['diagnostics'] = task

with log_to_dir(session.run_dir):
_dask.check_distributed_config()
self._engine = self._load(session=session)
self._engine.run()

Expand Down
Loading

0 comments on commit 1c1e6f1

Please sign in to comment.