Skip to content

Commit

Permalink
Add support for using a dask distributed scheduler (#3151)
Browse files Browse the repository at this point in the history
Co-authored-by: Valeriu Predoi <valeriu.predoi@gmail.com>
  • Loading branch information
2 people authored and Javier Vegas-Regidor committed Jan 14, 2024
1 parent e3b5877 commit c351d3b
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions esmvaltool/diag_scripts/shared/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
from pathlib import Path

import distributed
import iris
import matplotlib.pyplot as plt
import yaml
Expand Down Expand Up @@ -169,6 +170,7 @@ class ProvenanceLogger:
with ProvenanceLogger(cfg) as provenance_logger:
provenance_logger.log(output_file, record)
"""

def __init__(self, cfg):
"""Create a provenance logger."""
self._log_file = os.path.join(cfg['run_dir'],
Expand Down Expand Up @@ -474,15 +476,24 @@ def main(cfg):
parser.add_argument(
'-f',
'--force',
help=("Force emptying the output directories"
help=("Force emptying the output directories "
"(useful when re-running the script)"),
action='store_true',
)
parser.add_argument(
'-i',
'--ignore-existing',
help=("Force running the script, even if output files exists."
"(useful when re-running the script, use at your own risk)"),
help=("Force running the script, even if output files exist "
"(useful when re-running the script, use at your own risk)."),
action='store_true',
)
parser.add_argument(
'-n',
'--no-distributed',
help=("Do not use the Dask distributed 'scheduler_address' from the "
"configuration file "
"(useful when re-running the script and the scheduler is no "
"longer available)."),
action='store_true',
)
parser.add_argument(
Expand Down Expand Up @@ -555,6 +566,19 @@ def main(cfg):
logger.info("Removing %s from previous run.", provenance_file)
os.remove(provenance_file)

yield cfg
if not args.no_distributed and 'scheduler_address' in cfg:
try:
client = distributed.Client(cfg['scheduler_address'])
except OSError as exc:
raise OSError(
"Unable to connect to the Dask distributed scheduler at "
f"{cfg['scheduler_address']}. If the scheduler is no longer "
"available, try re-running the diagnostic script with the "
"--no-distributed flag.", ) from exc
else:
client = contextlib.nullcontext()

with client:
yield cfg

logger.info("End of diagnostic script run.")

0 comments on commit c351d3b

Please sign in to comment.