From c351d3b3621fb89109ddb6b0c02006642bd03098 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Tue, 6 Jun 2023 16:44:07 +0200 Subject: [PATCH] Add support for using a dask distributed scheduler (#3151) Co-authored-by: Valeriu Predoi --- esmvaltool/diag_scripts/shared/_base.py | 32 +++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/esmvaltool/diag_scripts/shared/_base.py b/esmvaltool/diag_scripts/shared/_base.py index c2c8210cb1..b5fc072875 100644 --- a/esmvaltool/diag_scripts/shared/_base.py +++ b/esmvaltool/diag_scripts/shared/_base.py @@ -9,6 +9,7 @@ import time from pathlib import Path +import distributed import iris import matplotlib.pyplot as plt import yaml @@ -169,6 +170,7 @@ class ProvenanceLogger: with ProvenanceLogger(cfg) as provenance_logger: provenance_logger.log(output_file, record) """ + def __init__(self, cfg): """Create a provenance logger.""" self._log_file = os.path.join(cfg['run_dir'], @@ -474,15 +476,24 @@ def main(cfg): parser.add_argument( '-f', '--force', - help=("Force emptying the output directories" + help=("Force emptying the output directories " "(useful when re-running the script)"), action='store_true', ) parser.add_argument( '-i', '--ignore-existing', - help=("Force running the script, even if output files exists." - "(useful when re-running the script, use at your own risk)"), + help=("Force running the script, even if output files exist " + "(useful when re-running the script, use at your own risk)."), + action='store_true', + ) + parser.add_argument( + '-n', + '--no-distributed', + help=("Do not use the Dask distributed 'scheduler_address' from the " + "configuration file " + "(useful when re-running the script and the scheduler is no " + "longer available)."), action='store_true', ) parser.add_argument( @@ -555,6 +566,19 @@ def main(cfg): logger.info("Removing %s from previous run.", provenance_file) os.remove(provenance_file) - yield cfg + if not args.no_distributed and 'scheduler_address' in cfg: + try: + client = distributed.Client(cfg['scheduler_address']) + except OSError as exc: + raise OSError( + "Unable to connect to the Dask distributed scheduler at " + f"{cfg['scheduler_address']}. If the scheduler is no longer " + "available, try re-running the diagnostic script with the " + "--no-distributed flag.", ) from exc + else: + client = contextlib.nullcontext() + + with client: + yield cfg logger.info("End of diagnostic script run.")