Skip to content

Commit

Permalink
Class Tune (#1200)
Browse files Browse the repository at this point in the history
  • Loading branch information
GooseIt authored May 21, 2023
1 parent 3c42bad commit f4b50fe
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 74 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Added
- Notebook `forecast_interpretation.ipynb` with forecast decomposition ([#1220](https://github.com/tinkoff-ai/etna/pull/1220))
-
- Class `Tune` for hyperparameter optimization within existing pipeline ([#1200](https://github.com/tinkoff-ai/etna/pull/1200))
-
-
### Changed
Expand Down
1 change: 1 addition & 0 deletions etna/auto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from etna.auto.auto import Auto
from etna.auto.auto import Tune
from etna.auto.pool import Pool
293 changes: 247 additions & 46 deletions etna/auto/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@

import pandas as pd
from hydra_slayer import get_from_params
from optuna.distributions import CategoricalDistribution
from optuna.distributions import DiscreteUniformDistribution
from optuna.distributions import IntLogUniformDistribution
from optuna.distributions import IntUniformDistribution
from optuna.distributions import LogUniformDistribution
from optuna.distributions import UniformDistribution
from optuna.samplers import BaseSampler
from optuna.samplers import TPESampler
from optuna.storages import BaseStorage
from optuna.storages import RDBStorage
from optuna.trial import FrozenTrial
from optuna.trial import Trial
from typing_extensions import Protocol

Expand All @@ -26,7 +35,7 @@
from etna.metrics import Sign
from etna.metrics.utils import MetricAggregationStatistics
from etna.metrics.utils import aggregate_metrics_df
from etna.pipeline import Pipeline
from etna.pipeline.base import BasePipeline


class _Callback(Protocol):
Expand All @@ -35,7 +44,7 @@ def __call__(self, metrics_df: pd.DataFrame, forecast_df: pd.DataFrame, fold_inf


class _Initializer(Protocol):
def __call__(self, pipeline: Pipeline) -> None:
def __call__(self, pipeline: BasePipeline) -> None:
...


Expand All @@ -51,7 +60,7 @@ def fit(
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
**optuna_kwargs,
) -> Pipeline:
) -> BasePipeline:
"""
Start automatic pipeline selection.
Expand Down Expand Up @@ -82,47 +91,19 @@ def summary(self) -> pd.DataFrame:
pass

@abstractmethod
def top_k(self, k: int = 5) -> List[Pipeline]:
"""
Get top k pipelines.
Parameters
----------
k:
number of pipelines to return
"""
def _summary(self, study: List[FrozenTrial]) -> List[dict]:
"""Get information from trial summary."""
pass

@staticmethod
@abstractmethod
def objective(
ts: TSDataset,
target_metric: Metric,
metric_aggregation: MetricAggregationStatistics,
metrics: List[Metric],
backtest_params: dict,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
) -> Callable[[Trial], float]:
def top_k(self, k: int = 5) -> List[BasePipeline]:
"""
Optuna objective wrapper.
Get top k pipelines.
Parameters
----------
ts:
tsdataset to fit on
target_metric:
metric to optimize
metric_aggregation:
aggregation method for per-segment metrics
metrics:
list of metrics to compute
backtest_params:
custom parameters for backtest instead of default backtest parameters
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
k:
number of pipelines to return
"""
pass

Expand Down Expand Up @@ -193,14 +174,10 @@ def summary(self) -> pd.DataFrame:

study = self._optuna.study.get_trials()

study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state}
for trial in study
]

study_params = self._summary(study=study)
return pd.DataFrame(study_params)

def top_k(self, k: int = 5) -> List[Pipeline]:
def top_k(self, k: int = 5) -> List[BasePipeline]:
"""
Get top k pipelines.
Expand All @@ -227,7 +204,7 @@ def __init__(
metric_aggregation: MetricAggregationStatistics = "mean",
backtest_params: Optional[dict] = None,
experiment_folder: Optional[str] = None,
pool: Union[Pool, List[Pipeline]] = Pool.default,
pool: Union[Pool, List[BasePipeline]] = Pool.default,
runner: Optional[AbstractRunner] = None,
storage: Optional[BaseStorage] = None,
metrics: Optional[List[Metric]] = None,
Expand Down Expand Up @@ -276,7 +253,7 @@ def fit(
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
**optuna_kwargs,
) -> Pipeline:
) -> BasePipeline:
"""
Start automatic pipeline selection.
Expand Down Expand Up @@ -316,6 +293,14 @@ def fit(

return get_from_params(**self._optuna.study.best_trial.user_attrs["pipeline"])

def _summary(self, study: List[FrozenTrial]) -> List[dict]:
"""Get information from trial summary."""
study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state}
for trial in study
]
return study_params

@staticmethod
def objective(
ts: TSDataset,
Expand Down Expand Up @@ -345,6 +330,11 @@ def objective(
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
Returns
-------
objective:
function that runs specified trial and returns its evaluated score
"""

def _objective(trial: Trial) -> float:
Expand All @@ -353,7 +343,7 @@ def _objective(trial: Trial) -> float:
pipeline_config.update(trial.relative_params)
pipeline_config.update(trial.params)

pipeline: Pipeline = get_from_params(**pipeline_config)
pipeline: BasePipeline = get_from_params(**pipeline_config)
if initializer is not None:
initializer(pipeline=pipeline)

Expand Down Expand Up @@ -387,3 +377,214 @@ def _init_optuna(self):
sampler=ConfigSampler(configs=pool_),
)
return optuna


class Tune(AutoBase):
"""Automatic tuning of custom pipeline."""

def __init__(
self,
pipeline: BasePipeline,
target_metric: Metric,
horizon: int,
metric_aggregation: MetricAggregationStatistics = "mean",
backtest_params: Optional[dict] = None,
experiment_folder: Optional[str] = None,
runner: Optional[AbstractRunner] = None,
storage: Optional[BaseStorage] = None,
metrics: Optional[List[Metric]] = None,
sampler: Optional[BaseSampler] = None,
):
"""
Initialize Auto class.
Parameters
----------
pipeline:
pipeline to optimize
target_metric:
metric to optimize
horizon:
horizon to forecast for
metric_aggregation:
aggregation method for per-segment metrics
backtest_params:
custom parameters for backtest instead of default backtest parameters
experiment_folder:
folder to store experiment results and name for optuna study
runner:
runner to use for distributed training
storage:
optuna storage to use
metrics:
list of metrics to compute
"""
super().__init__(
target_metric=target_metric,
horizon=horizon,
metric_aggregation=metric_aggregation,
backtest_params=backtest_params,
experiment_folder=experiment_folder,
runner=runner,
storage=storage,
metrics=metrics,
)
self.pipeline = pipeline
if sampler is None:
self.sampler: BaseSampler = TPESampler()
else:
self.sampler = sampler

def fit(
self,
ts: TSDataset,
timeout: Optional[int] = None,
n_trials: Optional[int] = None,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
**optuna_kwargs,
) -> BasePipeline:
"""
Start automatic pipeline tuning.
Parameters
----------
ts:
tsdataset to fit on
timeout:
timeout for optuna. N.B. this is timeout for each worker
n_trials:
number of trials for optuna. N.B. this is number of trials for each worker
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
optuna_kwargs:
additional kwargs for optuna :py:meth:`optuna.study.Study.optimize`
"""
if self._optuna is None:
self._optuna = self._init_optuna()

self._optuna.tune(
objective=self.objective(
ts=ts,
pipeline=self.pipeline,
target_metric=self.target_metric,
metric_aggregation=self.metric_aggregation,
metrics=self.metrics,
backtest_params=self.backtest_params,
initializer=initializer,
callback=callback,
),
runner=self.runner,
n_trials=n_trials,
timeout=timeout,
**optuna_kwargs,
)

return get_from_params(**self._optuna.study.best_trial.params)

def _summary(self, study: List[FrozenTrial]) -> List[dict]:
"""Get information from trial summary."""
study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs), "state": trial.state}
for trial in study
]
return study_params

@staticmethod
def objective(
ts: TSDataset,
pipeline: BasePipeline,
target_metric: Metric,
metric_aggregation: MetricAggregationStatistics,
metrics: List[Metric],
backtest_params: dict,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
) -> Callable[[Trial], float]:
"""
Optuna objective wrapper.
Parameters
----------
ts:
tsdataset to fit on
pipeline:
pipeline to tune
target_metric:
metric to optimize
metric_aggregation:
aggregation method for per-segment metrics
metrics:
list of metrics to compute
backtest_params:
custom parameters for backtest instead of default backtest parameters
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
Returns
-------
objective:
function that runs specified trial and returns its evaluated score
"""
dict_of_distrs = {
UniformDistribution: lambda x: ("suggest_uniform", {"low": x.low, "high": x.high}),
LogUniformDistribution: lambda x: ("suggest_loguniform", {"low": x.low, "high": x.high}),
DiscreteUniformDistribution: lambda x: (
"suggest_discrete_uniform",
{"low": x.low, "high": x.high, "q": x.q},
),
IntUniformDistribution: lambda x: ("suggest_int", {"low": x.low, "high": x.high, "step": x.step}),
IntLogUniformDistribution: lambda x: (
"suggest_int",
{"low": x.low, "high": x.high, "step": x.step},
),
CategoricalDistribution: lambda x: ("suggest_categorical", {"choices": x.choices}),
}

def _objective(trial: Trial) -> float:

params_to_tune = pipeline.params_to_tune()

# using received optuna.distribution objects to call corresponding trial.suggest_xxx
params_suggested = {}
for param_name, param_distr in params_to_tune.items():
method_name, method_kwargs = dict_of_distrs[type(param_distr)](param_distr)
method = getattr(trial, method_name)
params_suggested[param_name] = method(param_name, **method_kwargs)

# create pipeline instance with the parameters to try
pipeline_trial_params: BasePipeline = pipeline.set_params(**params_suggested)

if initializer is not None:
initializer(pipeline=pipeline_trial_params)

metrics_df, forecast_df, fold_info_df = pipeline_trial_params.backtest(
ts, metrics=metrics, **backtest_params
)

if callback is not None:
callback(metrics_df=metrics_df, forecast_df=forecast_df, fold_info_df=fold_info_df)

aggregated_metrics = aggregate_metrics_df(metrics_df)

for metric in aggregated_metrics:
trial.set_user_attr(metric, aggregated_metrics[metric])

return aggregated_metrics[f"{target_metric.name}_{metric_aggregation}"]

return _objective

def _init_optuna(self):
"""Initialize optuna."""
# sampler receives no hyperparameters here and optimizes only the hyperparameters suggested in objective
optuna = Optuna(
direction="maximize" if self.target_metric.greater_is_better else "minimize",
study_name=self.experiment_folder,
storage=self.storage,
sampler=self.sampler,
)
return optuna
Loading

0 comments on commit f4b50fe

Please sign in to comment.