Skip to content

Commit

Permalink
Feature: Add concurrent library to ParallelComputingInterface (#667)
Browse files Browse the repository at this point in the history
* Add concurrent to parallel computing interface

* Expand docstring

* Change how deprecation warning is raised

* Update example to use interface variable

* Fix formatting

---------

Co-authored-by: Rafael M Mudafort <rafmudaf@gmail.com>
  • Loading branch information
Bartdoekemeijer and rafmudaf authored Jul 3, 2023
1 parent d91953a commit 7674287
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 10 deletions.
5 changes: 3 additions & 2 deletions examples/12_optimize_yaw_in_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ def load_windrose():
)

# Pour this into a parallel computing interface
parallel_interface = "concurrent"
fi_aep_parallel = ParallelComputingInterface(
fi=fi_aep,
max_workers=max_workers,
n_wind_direction_splits=max_workers,
n_wind_speed_splits=1,
use_mpi4py=False,
interface=parallel_interface,
print_timings=True,
)

Expand Down Expand Up @@ -113,7 +114,7 @@ def load_windrose():
max_workers=max_workers,
n_wind_direction_splits=max_workers,
n_wind_speed_splits=1,
use_mpi4py=False,
interface=parallel_interface,
print_timings=True,
)

Expand Down
76 changes: 68 additions & 8 deletions floris/tools/parallel_computing_interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2022 Shell
import copy
import warnings
from time import perf_counter as timerpc

import numpy as np
Expand Down Expand Up @@ -72,7 +73,8 @@ def __init__(
max_workers,
n_wind_direction_splits,
n_wind_speed_splits=1,
use_mpi4py=False,
interface="multiprocessing", # Options are 'multiprocessing', 'mpi4py' or 'concurrent'
use_mpi4py=None,
propagate_flowfield_from_workers=False,
print_timings=False
):
Expand All @@ -83,17 +85,51 @@ def __init__(
fi (FlorisInterface or UncertaintyInterface object): Interactive FLORIS object used to
perform the wake and turbine calculations. Can either be a regular FlorisInterface
object or can be an UncertaintyInterface object.
max_workers (int): Number of parallel workers, typically equal to the number of cores
you have on your system or HPC.
n_wind_direction_splits (int): Number of sectors to split the wind direction array over.
This is typically equal to max_workers, or a multiple of it.
n_wind_speed_splits (int): Number of sectors to split the wind speed array over. This is
typically 1 or 2. Defaults to 1.
interface (str): Parallel computing interface to leverage. Recommended is 'concurrent'
or 'multiprocessing' for local (single-system) use, and 'mpi4py' for high performance
computing on multiple nodes. Defaults to 'multiprocessing'.
use_mpi4py (bool): Deprecated option to enable/disable the usage of 'mpi4py'. This option
has been superseded by 'interface'.
propagate_flowfield_from_workers (bool): By enabling this, the flow field from every
floris object (one for each worker) is exported, combined and sent back to the main
module. This is slow so unless it's needed, it's recommended to be disabled. Defaults
to False.
print_timings (bool): Print the computation time to the console. Defaults to False.
"""

# Load the correct library
if use_mpi4py:
# Set defaults for backward compatibility
if use_mpi4py is not None:
warnings.warn(
"The option 'mpi4py' will be removed in a future version. "
"Please use the option 'interface'."
)
if use_mpi4py:
interface = "mpi4py"
else:
interface = "multiprocessing"

if interface == "mpi4py":
import mpi4py.futures as mp
self._PoolExecutor = mp.MPIPoolExecutor
else:
elif interface == "multiprocessing":
import multiprocessing as mp
self._PoolExecutor = mp.Pool
if max_workers is None:
max_workers = mp.cpu_count()
elif interface == "concurrent":
from concurrent.futures import ProcessPoolExecutor
self._PoolExecutor = ProcessPoolExecutor
else:
raise UserWarning(
f"Interface '{interface}' not recognized. "
"Please use 'concurrent', 'multiprocessing' or 'mpi4py'."
)

# Initialize floris object and copy common properties
self.fi = fi.copy()
Expand All @@ -114,7 +150,7 @@ def __init__(
np.min([max_workers, self.n_wind_direction_splits * self.n_wind_speed_splits])
)
self.propagate_flowfield_from_workers = propagate_flowfield_from_workers
self.use_mpi4py = use_mpi4py
self.interface = interface
self.print_timings = print_timings

def copy(self):
Expand Down Expand Up @@ -171,7 +207,7 @@ def reinitialize(
max_workers=self._max_workers,
n_wind_direction_splits=self._n_wind_direction_splits,
n_wind_speed_splits=self._n_wind_speed_splits,
use_mpi4py=self.use_mpi4py,
interface=self.interface,
propagate_flowfield_from_workers=self.propagate_flowfield_from_workers,
print_timings=self.print_timings,
)
Expand Down Expand Up @@ -294,7 +330,15 @@ def get_turbine_powers(self, yaw_angles=None):
# Perform parallel calculation
t1 = timerpc()
with self._PoolExecutor(self.max_workers) as p:
out = p.starmap(_get_turbine_powers_serial, multiargs)
if (self.interface == "mpi4py") or (self.interface == "multiprocessing"):
out = p.starmap(_get_turbine_powers_serial, multiargs)
else:
out = p.map(
_get_turbine_powers_serial,
[j[0] for j in multiargs],
[j[1] for j in multiargs]
)
# out = list(out)
t_execution = timerpc() - t1

# Postprocessing: merge power production (and opt. flow field) from individual runs
Expand Down Expand Up @@ -491,7 +535,23 @@ def optimize_yaw_angles(
# Optimize yaw angles using parallel processing
print("Optimizing yaw angles with {:d} workers.".format(self.max_workers))
with self._PoolExecutor(self.max_workers) as p:
df_opt_splits = p.starmap(_optimize_yaw_angles_serial, multiargs)
if (self.interface == "mpi4py") or (self.interface == "multiprocessing"):
df_opt_splits = p.starmap(_optimize_yaw_angles_serial, multiargs)
else:
df_opt_splits = p.map(
_optimize_yaw_angles_serial,
[j[0] for j in multiargs],
[j[1] for j in multiargs],
[j[2] for j in multiargs],
[j[3] for j in multiargs],
[j[4] for j in multiargs],
[j[5] for j in multiargs],
[j[6] for j in multiargs],
[j[7] for j in multiargs],
[j[8] for j in multiargs],
[j[9] for j in multiargs],
[j[10] for j in multiargs]
)
t2 = timerpc()

# Combine all solutions from multiprocessing into single dataframe
Expand Down

0 comments on commit 7674287

Please sign in to comment.