Skip to content

Commit

Permalink
Add python templates for using Dask
Browse files Browse the repository at this point in the history
One template for using Dask on a single machine and one for use on a
distributed system, specifically OLCF systems Andes, Crusher, and
Frontier.
  • Loading branch information
bcaddy committed Oct 17, 2023
1 parent 92ebf3d commit 266a749
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
132 changes: 132 additions & 0 deletions python_scripts/dask_distributed_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""
This is the skeleton for how to run a Dask script on Andes at the OLCF. The CLI
commands required are in the docstring at the top, major Dask steps are in
functions, and `main` is mostly empty with a clear area on where to do your
computations.
Requirements: - Verified working with Dask v2023.6.0 - Install graphviz for
python
- 'conda install -c conda-forge python-graphviz graphviz'
- Make sure your version of msgpack-python is at least v1.0.5; v1.0.3 had a bug
- `conda install -c conda-forge msgpack-python=1.0.5`
Notes:
- This is entirely focused on getting Dask to run on Andes, Crusher, and
Frontier. Other systems will likely need similar steps but not identical
- Between each python script the Dask scheduler and workers need to be
restarted.
- "--interface ib0" does not seem to be required but likely does improve
transfer speeds. On Crusher it throws an error, just omit it
- It likes to spit out lots of ugly messages on shutdown that look like
something failed. Odds are that it worked fine and just didn't shutdown
gracefully
- On OLCF systems Dask seems to hang on setup if you use more than 256
processes. I haven't dug too deeply into it but for now it might be better to
limit jobs to that size and run them longer or run multiple jobs, potentially
an array job
- On OLCF systems it doesn't always end the job properly and the job will just
keep running and do nothing. Either set short walltimes so it times out or
just keep an eye on it. Maybe end with the script sending an exit command
################################################################################
#!/usr/bin/env bash
#SBATCH -A <allocation here>
#SBATCH -J <job name>
#SBATCH -o <slurm output file>/%x-%j.out
#SBATCH -t 04:00:00
#SBATCH -p batch
#SBATCH -N 32
#SBATCH --mail-user=<your email> #SBATCH --mail-type=ALL
# Setup some parameters DASK_SCHEDULE_FILE=$(pwd)/dask_schedule_file.json
DASK_NUM_WORKERS=$((SLURM_JOB_NUM_NODES*8))
# Add any scripts that you're importing to the PYTHONPATH, even ones in the same
# directory. The worker tasks have their own directories and won't find any of
# your scripts unless they're in the PYTHONPATH
export PYTHONPATH="${PYTHONPATH}:/your/path/here"
INTERFACE='--interface ib0' # For Andes
# INTERFACE='' # For Crusher
srun --exclusive --ntasks=1 dask scheduler $INTERFACE --scheduler-file $DASK_SCHEDULE_FILE --no-dashboard --no-show &
# Wait for the dask-scheduler to start
sleep 30
srun --exclusive --ntasks=$DASK_NUM_WORKERS dask worker --scheduler-file $DASK_SCHEDULE_FILE --memory-limit='auto' --worker-class distributed.Worker $INTERFACE --no-dashboard --local-directory <path to directory, might not be required> &
# Wait for workers to start
sleep 10
python -u ./dask-distributed-template.py --scheduler-file $DASK_SCHEDULE_FILE --num-workers $DASK_NUM_WORKERS
wait
################################################################################
"""

import dask
import dask.array as da
import dask.dataframe as dd
from dask.distributed import Client
from dask import graph_manipulation

import pathlib
import argparse

# ==============================================================================
def main():
# Get command line arguments
cli = argparse.ArgumentParser()
# Required Arguments
cli.add_argument('-N', '--num-workers', type=int, required=True, help='The number of workers to use')
cli.add_argument('-s', '--scheduler-file', type=pathlib.Path, required=True, help='The path to the scheduler file')
# Optional Arguments
# none yet, feel free to add your own
args = cli.parse_args()

# Setup the Dask cluster
client = startup_dask(args.scheduler_file, args.num_workers)

# Perform your computation
# ...
# ...
# ...
# Some suggestions:
# - If you're using Delayed then append all tasks to a list and execute them with `dask.compute(*command_list)`
# - Visualize task tree with `dask.visualize(*command_list, filename=str('filename.pdf'))
# - Add dependencies manually with `dask.graph_manipulation.bind(dependent_task, list_of_dependencies)`
# End of Computation

# Shutdown the Dask cluster
shutdown_dask(client)
# ==============================================================================

# ==============================================================================
def startup_dask(scheduler_file, num_workers):
# Connect to the dask-cluster
client = Client(scheduler_file=scheduler_file)
print('client information ', client)

# Block until num_workers are ready
print(f'Waiting for {num_workers} workers...')
client.wait_for_workers(n_workers=num_workers)

num_connected_workers = len(client.scheduler_info()['workers'])
print(f'{num_connected_workers} workers connected')

return client
# ==============================================================================

# ==============================================================================
def shutdown_dask(client):
print('Shutting down the cluster')
workers_list = list(client.scheduler_info()['workers'])
client.retire_workers(workers_list, close_workers=True)
client.shutdown()
# ==============================================================================

if __name__ == '__main__':
main()
47 changes: 47 additions & 0 deletions python_scripts/dask_single_machine_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
================================================================================
Written by Robert Caddy.
A simple template for Dask scripts running on a single machine
================================================================================
"""

import dask
import dask.array as da
import dask.dataframe as dd
from dask import graph_manipulation

import argparse
import pathlib

# ==============================================================================
def main():
cli = argparse.ArgumentParser()
# Required Arguments
# Optional Arguments
cli.add_argument('-n', '--num-workers', type=int, default=8, help='The number of workers to use.')
args = cli.parse_args()

# Set scheduler type. Options are 'threads', 'processes', 'single-threaded', and 'distributed'.
# - 'threads' uses threads that share memory, often fastest on single machines, can run into issuse with the GIL
# - 'processes' uses multiple processes that do not share memory, can be used to get around issues with the GIL
# - `single-threaded` is great for debugging
dask.config.set(scheduler='processes', num_workers=args.num_workers)

# Perform your computation
# ...
# ...
# ...
# Some suggestions:
# - If you're using Delayed then append all tasks to a list and execute them with `dask.compute(*command_list)`
# - Visualize task tree with `dask.visualize(*command_list, filename=str('filename.pdf'))
# - Add dependencies manually with `dask.graph_manipulation.bind(dependent_task, list_of_dependencies)`
# End of Computation
# ==============================================================================

if __name__ == '__main__':
from timeit import default_timer
start = default_timer()
main()
print(f'\nTime to execute: {round(default_timer()-start,2)} seconds')

0 comments on commit 266a749

Please sign in to comment.