Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batches #459

Merged
merged 11 commits into from
Apr 20, 2023
22 changes: 22 additions & 0 deletions docs/source/explanations/optimization/internal_optimizers.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,28 @@ convergence criteria that clearly are the same are implemented slightly differen
different optimizers. However, complete transparency is possible and we try to document
the exact meaning of all options for all optimizers.

## Algorithms that parallelize

Algorithms can evaluate the criterion function in parallel. To make such a parallel
algorithm fully compatible with estimagic (including history collection and benchmarking
functionality), the following conditions need to be fulfilled:

- The algorithm has an argument called `n_cores` which determines how many cores are
used for the parallelization.
- The algorithm has an argument called `batch_evaluator` and all parallelization is done
using a built-in or user provided batch evaluator.

Moreover, we strongly suggest to comply with the following convention:

- The algorithm has an argument called `batch_size` which is an integer that is greater
or equal to `n_cores`. Setting the `batch_size` larger than n_cores, allows to
simulate how the algorithm would behave with `n_cores=batch_size` but only uses
`n_cores` cores. This allows to simulate / benchmark the parallelizability of an
algorithm even if no parallel hardware is available.

If the mandatory conditions are not fulfilled, the algorithm should disable all history
collection by using `mark_minimizer(..., disable_history=True)`.

## Nonlinear constraints

Estimagic can pass nonlinear constraints to the internal optimizer. The internal
Expand Down
4 changes: 3 additions & 1 deletion src/estimagic/benchmarking/process_benchmark_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def create_convergence_histories(
time_sr.name = "walltime"
criterion_sr = _get_history_as_stacked_sr_from_results(results, "criterion_history")
x_dist_sr = _get_history_of_the_parameter_distance(results, x_opt)
df = pd.concat([time_sr, criterion_sr, x_dist_sr], axis=1)
batch_sr = _get_history_as_stacked_sr_from_results(results, "batches_history")
batch_sr.name = "n_batches"
df = pd.concat([time_sr, criterion_sr, x_dist_sr, batch_sr], axis=1)

df.index = df.index.rename({"evaluation": "n_evaluations"})
df = df.sort_index().reset_index()
Expand Down
3 changes: 3 additions & 0 deletions src/estimagic/benchmarking/run_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def _get_results(names, raw_results, kwargs_list):
[tree_just_flatten(p, registry=registry) for p in history["params"]]
)
criterion_history = pd.Series(history["criterion"])
batches_history = pd.Series(history["batches"])
time_history = pd.Series(history["runtime"])
elif isinstance(result, str):
_criterion = inputs["criterion"]
Expand All @@ -175,6 +176,7 @@ def _get_results(names, raw_results, kwargs_list):
criterion_history = pd.Series(_criterion(inputs["params"])["value"])

time_history = pd.Series([np.inf])
batches_history = pd.Series([0])
else:
raise TypeError(
"'result' object is expected to be of type 'dict' or 'str'."
Expand All @@ -184,6 +186,7 @@ def _get_results(names, raw_results, kwargs_list):
"params_history": params_history,
"criterion_history": criterion_history,
"time_history": time_history,
"batches_history": batches_history,
"solution": result,
}

Expand Down
173 changes: 98 additions & 75 deletions src/estimagic/optimization/get_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ def get_final_algorithm(
):
"""Get algorithm-function with partialled options.

The resulting function only depends on ``x``, the relevant criterion functions
and derivatives and ``step_id``.
The resulting function only depends on ``x``, the relevant criterion functions,
derivatives and ``step_id``. The remaining options are partialled in.

Moreover, we add the following capabilities over the internal algorithms:
- log the algorithm progress in a database (if logging is True)
- collect the history of parameters and criterion values as well as runtime and
batch information.

Args:
algorithm (str or callable): String with the name of an algorithm or internal
Expand Down Expand Up @@ -98,14 +103,11 @@ def get_final_algorithm(
logging=logging,
database=database,
)

is_parallel = internal_options.get("n_cores") not in (None, 1)
is_parallel = algo_info.parallelizes
batch_size = internal_options.get("batch_size", internal_options.get("n_cores", 1))

if collect_history and not algo_info.disable_history:
algorithm = _add_history_collection_via_criterion(algorithm)
if is_parallel:
algorithm = _add_history_collection_via_batch_evaluator(algorithm)
algorithm = _add_history_processing(algorithm, is_parallel)
algorithm = _add_history_collection(algorithm, is_parallel, batch_size)

return algorithm

Expand Down Expand Up @@ -154,104 +156,125 @@ def wrapper_add_logging_algorithm(**kwargs):
return decorator_add_logging_to_algorithm


def _add_history_collection_via_criterion(algorithm):
"""Partial a history container into all functions that define the optimization."""
def _add_history_collection(algorithm, is_parallel, batch_size):
"""Add history collection to the algorithm.

@functools.wraps(algorithm)
def wrapper_add_history_collection_via_criterion(**kwargs):
func_names = {"criterion", "derivative", "criterion_and_derivative"}
The history collection is done jointly be the internal criterion function and the
batch evaluator. Using the batch evaluator for history collection is necessary
for two reasons:
1. The batch information is only known inside the batch evaluator
2. The normal approach of appending information to a list that is partialled into
the internal criterion function breaks down when the batch evaluator pickles
the internal criterion function.

We make sure that optimizers that do some function evaluations via the batch
evaluator and others directly are handled correctly.

The interplay between the two methods for history collections is as follows:
- If the function is called directly, all relevant information is appended to a list
that is partialled into the internal criterion function.
- If the function is called via the batch evaluator, we signal this to the internal
criterion via the two arguments ``history_container`` (which is set to None) and
``return_history_entry`` (which is set to True). The returned history entries
are then collected inside the batch evaluator and appended to the history
container after all evaluations are done.

Args:
algorithm (callable): The algorithm.
is_parallel (bool): Whether the algorithm can parallelize.

"""

@functools.wraps(algorithm)
def algorithm_with_history_collection(**kwargs):
# initialize the shared history container
container = []

# add history collection via the internal criterion functions
func_names = {"criterion", "derivative", "criterion_and_derivative"}
_kwargs = kwargs.copy()
for name in func_names:
if name in kwargs:
_kwargs[name] = partial(kwargs[name], history_container=container)

# add history collection via the batch evaluator
if is_parallel:
raw_be = kwargs.get("batch_evaluator", "joblib")
batch_evaluator = process_batch_evaluator(raw_be)

_kwargs["batch_evaluator"] = _get_history_collecting_batch_evaluator(
batch_evaluator=batch_evaluator,
container=container,
batch_size=batch_size,
)

# call the algorithm
out = algorithm(**_kwargs)

# add the history container to the algorithm output
if "history" not in out:
out["history"] = container
else:
out["history"] = out["history"] + container
return out

return wrapper_add_history_collection_via_criterion
# process the history
out["history"] = _process_collected_history(out["history"])

return out

def _add_history_collection_via_batch_evaluator(algorithm):
"""Wrap the batch_evaluator argument such that histories are collected."""

@functools.wraps(algorithm)
def wrapper_add_history_collection_via_batch_evaluator(**kwargs):
raw_be = kwargs.get("batch_evaluator", "joblib")
batch_evaluator = process_batch_evaluator(raw_be)

container = []

@functools.wraps(batch_evaluator)
def wrapped_batch_evaluator(*args, **kwargs):
if args:
func = args[0]
else:
func = kwargs["func"]

# find out if func is our internal criterion function
if isinstance(func, partial) and "history_container" in func.keywords:
# partial in None as history container to disable history collection via
# criterion function, which would not work with parallelization anyways
_func = partial(func, history_container=None, return_history_entry=True)

if args:
_args = (_func, *args[1:])
_kwargs = kwargs
else:
_args = args
_kwargs = kwargs.copy()
_kwargs["func"] = _func
return algorithm_with_history_collection

raw_out = batch_evaluator(*_args, **_kwargs)

out = [tup[0] for tup in raw_out]
_hist = [tup[1] for tup in raw_out if tup[1] is not None]
def _get_history_collecting_batch_evaluator(batch_evaluator, container, batch_size):
@functools.wraps(batch_evaluator)
def history_collecting_batch_evaluator(*args, **kwargs):
if args:
func = args[0]
else:
func = kwargs["func"]

container.extend(_hist)
# find out if func is our internal criterion function. This is
# necessary because an algorithm might use the batch evaluatior for
# other functions as well, but for those functions we do not want to
# (and cannot) collect a history.
if isinstance(func, partial) and "history_container" in func.keywords:
# partial in None as history container to disable history collection
# via criterion function, which would not work with parallelization
_func = partial(func, history_container=None, return_history_entry=True)

if args:
_args = (_func, *args[1:])
_kwargs = kwargs
else:
out = batch_evaluator(*args, **kwargs)
_args = args
_kwargs = kwargs.copy()
_kwargs["func"] = _func

return out
raw_out = batch_evaluator(*_args, **_kwargs)
out = [tup[0] for tup in raw_out]
_hist = [tup[1] for tup in raw_out if tup[1] is not None]
_start_batch = container[-1]["batches"] + 1 if container else 0
_offsets = np.arange(len(_hist)).repeat(batch_size)[: len(_hist)]
_batch_info = _offsets + _start_batch
for batch, hist_entry in zip(_batch_info, _hist):
hist_entry["batches"] = batch

new_kwargs = kwargs.copy()
new_kwargs["batch_evaluator"] = wrapped_batch_evaluator
container.extend(_hist)

out = algorithm(**new_kwargs)

if "history" in out:
out["history"] = out["history"] + container
else:
out["history"] = container
out = batch_evaluator(*args, **kwargs)

return out

return wrapper_add_history_collection_via_batch_evaluator

return history_collecting_batch_evaluator

def _add_history_processing(algorithm, parallelizes):
@functools.wraps(algorithm)
def wrapper_add_history_processing(**kwargs):
out = algorithm(**kwargs)
raw = out["history"]
if parallelizes:
raw = sorted(raw, key=lambda e: e["runtime"])
history = list_of_dicts_to_dict_of_lists(raw)
runtimes = np.array(history["runtime"])
runtimes -= runtimes[0]
history["runtime"] = runtimes.tolist()

out["history"] = history
return out

return wrapper_add_history_processing
def _process_collected_history(raw):
history = list_of_dicts_to_dict_of_lists(raw)
runtimes = np.array(history["runtime"])
runtimes -= runtimes[0]
history["runtime"] = runtimes.tolist()
return history


def _adjust_options_to_algorithm(
Expand Down
7 changes: 7 additions & 0 deletions src/estimagic/optimization/internal_criterion_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ def func(x):
"criterion": scalar_critval,
"runtime": now,
}
if history_container is not None:
if history_container:
_batch = history_container[-1]["batches"] + 1
else:
_batch = 0

hist_entry["batches"] = _batch
else:
hist_entry = None

Expand Down
1 change: 0 additions & 1 deletion src/estimagic/optimization/tranquilo/tranquilo.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def _internal_tranquilo(
accept_candidate,
):
eval_info = {0: n_evals_at_start}

evaluate_criterion(eval_info)

_init_fvec = history.get_fvecs(0).mean(axis=0)
Expand Down
18 changes: 12 additions & 6 deletions src/estimagic/visualization/convergence_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ def convergence_plot(
improved on the problem. The algorithm converged where its line reaches 0
(if normalize_distance is True) or the horizontal blue line labeled "true solution".

Each plot shows on the x axis the runtime_measure, which can be walltime or number
of evaluations. Each algorithm's convergence is a line in the plot. Convergence can
be measured by the criterion value of the particular time/evaluation. The
convergence can be made monotone (i.e. always taking the bast value so far) or
normalized such that the distance from the start to the true solution is one.
Each plot shows on the x axis the runtime_measure, which can be walltime, number
of evaluations or number of batches. Each algorithm's convergence is a line in the
plot. Convergence can be measured by the criterion value of the particular
time/evaluation. The convergence can be made monotone (i.e. always taking the bast
value so far) or normalized such that the distance from the start to the true
solution is one.

Args:
problems (dict): estimagic benchmarking problems dictionary. Keys are the
Expand All @@ -64,7 +65,7 @@ def convergence_plot(
between the start value and the optimal value, i.e. 1 means the algorithm
is as far from the solution as the start value and 0 means the algorithm
has reached the solution value.
runtime_measure (str): "n_evaluations" or "walltime".
runtime_measure (str): "n_evaluations", "walltime" or "n_batches".
stopping_criterion (str): "x_and_y", "x_or_y", "x", "y" or None. If None, no
clipping is done.
x_precision (float or None): how close an algorithm must have gotten to the
Expand Down Expand Up @@ -138,6 +139,7 @@ def convergence_plot(
x_labels = {
"n_evaluations": "Number of Function Evaluations",
"walltime": "Elapsed Time",
"n_batches": "Number of Batches",
}

# container for individual plots
Expand All @@ -150,6 +152,10 @@ def convergence_plot(
for prob_name in remaining_problems:
g_ind = [] # container for data for traces in individual plot
to_plot = df[df["problem"] == prob_name]
if runtime_measure == "n_batches":
to_plot = (
to_plot.groupby(["algorithm", runtime_measure]).min().reset_index()
)

for i, alg in enumerate(to_plot["algorithm"].unique()):
temp = to_plot[to_plot["algorithm"] == alg]
Expand Down
Loading