Skip to content

Class Tune #1200

Merged
merged 18 commits into from
May 21, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Added
- Notebook `forecast_interpretation.ipynb` with forecast decomposition ([#1220](https://github.com/tinkoff-ai/etna/pull/1220))
-
- Class `Tune` for hyperparameter optimization within existing `Pipeline` ([#1200](https://github.com/tinkoff-ai/etna/pull/1200))
GooseIt marked this conversation as resolved.
Show resolved Hide resolved
-
-
### Changed
Expand Down
1 change: 1 addition & 0 deletions etna/auto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from etna.auto.auto import Auto
from etna.auto.auto import Tune
from etna.auto.pool import Pool
277 changes: 239 additions & 38 deletions etna/auto/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@

import pandas as pd
from hydra_slayer import get_from_params
from optuna.distributions import CategoricalDistribution
from optuna.distributions import DiscreteUniformDistribution
from optuna.distributions import IntLogUniformDistribution
from optuna.distributions import IntUniformDistribution
from optuna.distributions import LogUniformDistribution
from optuna.distributions import UniformDistribution
from optuna.samplers import BaseSampler
from optuna.samplers import TPESampler
from optuna.storages import BaseStorage
from optuna.storages import RDBStorage
from optuna.trial import FrozenTrial
from optuna.trial import Trial
from typing_extensions import Protocol

Expand Down Expand Up @@ -81,6 +90,11 @@ def summary(self) -> pd.DataFrame:
"""Get Auto trials summary."""
pass

@abstractmethod
def _summary(self, study: List[FrozenTrial]) -> List[dict]:
"""Get information from trial summary."""
pass

@abstractmethod
def top_k(self, k: int = 5) -> List[Pipeline]:
GooseIt marked this conversation as resolved.
Show resolved Hide resolved
"""
Expand All @@ -93,39 +107,6 @@ def top_k(self, k: int = 5) -> List[Pipeline]:
"""
pass

@staticmethod
@abstractmethod
def objective(
ts: TSDataset,
target_metric: Metric,
metric_aggregation: MetricAggregationStatistics,
metrics: List[Metric],
backtest_params: dict,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
) -> Callable[[Trial], float]:
"""
Optuna objective wrapper.

Parameters
----------
ts:
tsdataset to fit on
target_metric:
metric to optimize
metric_aggregation:
aggregation method for per-segment metrics
metrics:
list of metrics to compute
backtest_params:
custom parameters for backtest instead of default backtest parameters
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
"""
pass


class AutoBase(AutoAbstract):
"""Base Class for ``Auto`` and ``Tune``, implementing core logic behind these classes."""
Expand Down Expand Up @@ -193,11 +174,7 @@ def summary(self) -> pd.DataFrame:

study = self._optuna.study.get_trials()

study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state}
for trial in study
]

study_params = self._summary(study=study)
return pd.DataFrame(study_params)

def top_k(self, k: int = 5) -> List[Pipeline]:
GooseIt marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -316,6 +293,14 @@ def fit(

return get_from_params(**self._optuna.study.best_trial.user_attrs["pipeline"])

def _summary(self, study: List[FrozenTrial]) -> List[dict]:
"""Get information from trial summary."""
study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs["pipeline"]), "state": trial.state}
for trial in study
]
return study_params

@staticmethod
def objective(
ts: TSDataset,
Expand Down Expand Up @@ -345,6 +330,11 @@ def objective(
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics

Returns
-------
objective:
function that runs specified trial and returns its evaluated score
"""

def _objective(trial: Trial) -> float:
Expand Down Expand Up @@ -387,3 +377,214 @@ def _init_optuna(self):
sampler=ConfigSampler(configs=pool_),
)
return optuna


class Tune(AutoBase):
"""Automatic tuning of custom pipeline."""

def __init__(
self,
pipeline: Pipeline,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try to replace Pipeline here with AbstractPipeline. Because this class isn't specific for the Pipeline. Other methods should be also updated to work with AbstractPipeline.

target_metric: Metric,
horizon: int,
metric_aggregation: MetricAggregationStatistics = "mean",
backtest_params: Optional[dict] = None,
experiment_folder: Optional[str] = None,
runner: Optional[AbstractRunner] = None,
storage: Optional[BaseStorage] = None,
metrics: Optional[List[Metric]] = None,
sampler: Optional[BaseSampler] = None,
):
"""
Initialize Auto class.

Parameters
----------
pipeline:
pipeline to optimize
target_metric:
metric to optimize
horizon:
horizon to forecast for
metric_aggregation:
aggregation method for per-segment metrics
backtest_params:
custom parameters for backtest instead of default backtest parameters
experiment_folder:
folder to store experiment results and name for optuna study
runner:
runner to use for distributed training
storage:
optuna storage to use
metrics:
list of metrics to compute
"""
super().__init__(
target_metric=target_metric,
horizon=horizon,
metric_aggregation=metric_aggregation,
backtest_params=backtest_params,
experiment_folder=experiment_folder,
runner=runner,
storage=storage,
metrics=metrics,
)
self.pipeline = pipeline
if sampler is None:
self.sampler: BaseSampler = TPESampler()
else:
self.sampler = sampler

def fit(
self,
ts: TSDataset,
timeout: Optional[int] = None,
n_trials: Optional[int] = None,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
**optuna_kwargs,
) -> Pipeline:
GooseIt marked this conversation as resolved.
Show resolved Hide resolved
"""
Start automatic pipeline tuning.

Parameters
----------
ts:
tsdataset to fit on
timeout:
timeout for optuna. N.B. this is timeout for each worker
n_trials:
number of trials for optuna. N.B. this is number of trials for each worker
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
optuna_kwargs:
additional kwargs for optuna :py:meth:`optuna.study.Study.optimize`
"""
if self._optuna is None:
self._optuna = self._init_optuna()

self._optuna.tune(
objective=self.objective(
ts=ts,
pipeline=self.pipeline,
target_metric=self.target_metric,
metric_aggregation=self.metric_aggregation,
metrics=self.metrics,
backtest_params=self.backtest_params,
initializer=initializer,
callback=callback,
),
runner=self.runner,
n_trials=n_trials,
timeout=timeout,
**optuna_kwargs,
)

return get_from_params(**self._optuna.study.best_trial.params)

def _summary(self, study: List[FrozenTrial]) -> List[dict]:
"""Get information from trial summary."""
study_params = [
{**trial.user_attrs, "pipeline": get_from_params(**trial.user_attrs), "state": trial.state}
for trial in study
]
return study_params

@staticmethod
def objective(
ts: TSDataset,
pipeline: Pipeline,
target_metric: Metric,
metric_aggregation: MetricAggregationStatistics,
metrics: List[Metric],
backtest_params: dict,
initializer: Optional[_Initializer] = None,
callback: Optional[_Callback] = None,
) -> Callable[[Trial], float]:
"""
Optuna objective wrapper.

Parameters
----------
ts:
tsdataset to fit on
pipeline:
pipeline to tune
target_metric:
metric to optimize
metric_aggregation:
aggregation method for per-segment metrics
metrics:
list of metrics to compute
backtest_params:
custom parameters for backtest instead of default backtest parameters
initializer:
is called before each pipeline backtest, can be used to initialize loggers
callback:
is called after each pipeline backtest, can be used to log extra metrics
GooseIt marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
objective:
function that runs specified trial and returns its evaluated score
"""
dict_of_distrs = {
UniformDistribution: lambda x: ("suggest_uniform", {"low": x.low, "high": x.high}),
LogUniformDistribution: lambda x: ("suggest_loguniform", {"low": x.low, "high": x.high}),
DiscreteUniformDistribution: lambda x: (
"suggest_discrete_uniform",
{"low": x.low, "high": x.high, "q": x.q},
),
IntUniformDistribution: lambda x: ("suggest_int", {"low": x.low, "high": x.high, "step": x.step}),
IntLogUniformDistribution: lambda x: (
"suggest_int",
{"low": x.low, "high": x.high, "step": x.step},
),
CategoricalDistribution: lambda x: ("suggest_categorical", {"choices": x.choices}),
}

def _objective(trial: Trial) -> float:

params_to_tune = pipeline.params_to_tune()

# using received optuna.distribution objects to call corresponding trial.suggest_xxx
params_suggested = {}
for param_name, param_distr in params_to_tune.items():
method_name, method_kwargs = dict_of_distrs[type(param_distr)](param_distr)
method = getattr(trial, method_name)
params_suggested[param_name] = method(param_name, **method_kwargs)

# create pipeline instance with the parameters to try
pipeline_trial_params: Pipeline = pipeline.set_params(**params_suggested)

if initializer is not None:
initializer(pipeline=pipeline_trial_params)

metrics_df, forecast_df, fold_info_df = pipeline_trial_params.backtest(
ts, metrics=metrics, **backtest_params
)

if callback is not None:
callback(metrics_df=metrics_df, forecast_df=forecast_df, fold_info_df=fold_info_df)

aggregated_metrics = aggregate_metrics_df(metrics_df)

for metric in aggregated_metrics:
trial.set_user_attr(metric, aggregated_metrics[metric])

return aggregated_metrics[f"{target_metric.name}_{metric_aggregation}"]

return _objective

def _init_optuna(self):
"""Initialize optuna."""
# sampler receives no hyperparameters here and optimizes only the hyperparameters suggested in objective
optuna = Optuna(
direction="maximize" if self.target_metric.greater_is_better else "minimize",
study_name=self.experiment_folder,
storage=self.storage,
sampler=self.sampler,
)
return optuna
5 changes: 4 additions & 1 deletion etna/core/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
from typing import Dict
from typing import Sequence
from typing import Tuple
from typing import TypeVar
from typing import cast

import hydra_slayer
from sklearn.base import BaseEstimator

TMixin = TypeVar("TMixin", bound="BaseMixin")


class BaseMixin:
"""Base mixin for etna classes."""
Expand Down Expand Up @@ -134,7 +137,7 @@ def _update_nested_structure(cls, structure: Any, keys: Sequence[str], value: An

return new_structure

def set_params(self, **params: dict) -> "BaseMixin":
def set_params(self: TMixin, **params: dict) -> TMixin:
"""Return new object instance with modified parameters.

Method also allows to change parameters of nested objects within the current object.
Expand Down
27 changes: 27 additions & 0 deletions tests/test_auto/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from os import unlink

import pytest
from optuna.storages import RDBStorage
from typing_extensions import Literal
from typing_extensions import NamedTuple

from etna.models import NaiveModel
from etna.pipeline import Pipeline


@pytest.fixture()
def optuna_storage():
yield RDBStorage("sqlite:///test.db")
unlink("test.db")


@pytest.fixture()
def trials():
class Trial(NamedTuple):
user_attrs: dict
state: Literal["COMPLETE", "RUNNING", "PENDING"] = "COMPLETE"

return [
Trial(user_attrs={"pipeline": pipeline.to_dict(), "SMAPE_median": i})
for i, pipeline in enumerate((Pipeline(NaiveModel(j), horizon=7) for j in range(10)))
]
Loading