Skip to content

POC: duplicated runs fold caching for backtest and stackingensemble via joblib.Memory #655

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,26 +384,8 @@ def _run_fold(
metrics: List[Metric],
forecast_params: Dict[str, Any],
) -> Dict[str, Any]:
"""Run fit-forecast pipeline of model for one fold."""
tslogger.start_experiment(job_type="crossval", group=str(fold_number))

pipeline = deepcopy(self)
pipeline.fit(ts=train)
forecast = pipeline.forecast(**forecast_params)
fold: Dict[str, Any] = {}
for stage_name, stage_df in zip(("train", "test"), (train, test)):
fold[f"{stage_name}_timerange"] = {}
fold[f"{stage_name}_timerange"]["start"] = stage_df.index.min()
fold[f"{stage_name}_timerange"]["end"] = stage_df.index.max()

forecast.df = forecast.df.loc[mask.target_timestamps]
test.df = test.df.loc[mask.target_timestamps]

fold["forecast"] = forecast
fold["metrics"] = deepcopy(self._compute_metrics(metrics=metrics, y_true=test, y_pred=forecast))

tslogger.log_backtest_run(pd.DataFrame(fold["metrics"]), forecast.to_pandas(), test.to_pandas())
tslogger.finish_experiment()
fold = run_fold_helper(self, train, test, fold_number, mask, metrics, forecast_params)

return fold

Expand Down Expand Up @@ -542,3 +524,52 @@ def backtest(
tslogger.finish_experiment()

return metrics_df, forecast_df, fold_info_df

def _run_fold_helper(
self,
train: TSDataset,
test: TSDataset,
fold_number: int,
mask: FoldMask,
metrics: List[Metric],
forecast_params: Dict[str, Any],
) -> Dict[str, Any]:
"""Run fit-forecast pipeline of model for one fold."""
import shelve
with shelve.open('counter') as db:
db['counter'] += 1
tslogger.start_experiment(job_type="crossval", group=str(fold_number))

pipeline = deepcopy(self)
pipeline.fit(ts=train)
forecast = pipeline.forecast(**forecast_params)
fold: Dict[str, Any] = {}
for stage_name, stage_df in zip(("train", "test"), (train, test)):
fold[f"{stage_name}_timerange"] = {}
fold[f"{stage_name}_timerange"]["start"] = stage_df.index.min()
fold[f"{stage_name}_timerange"]["end"] = stage_df.index.max()

forecast.df = forecast.df.loc[mask.target_timestamps]
test.df = test.df.loc[mask.target_timestamps]

fold["forecast"] = forecast
fold["metrics"] = deepcopy(self._compute_metrics(metrics=metrics, y_true=test, y_pred=forecast))

tslogger.log_backtest_run(pd.DataFrame(fold["metrics"]), forecast.to_pandas(), test.to_pandas())
tslogger.finish_experiment()

return fold

import os
if os.environ.get("ETNA_CACHE") is not None:
from joblib import Memory
import atexit

location = './etna-cachedir'
memory = Memory(location, verbose=0)
atexit.register(lambda: memory.clear(False))

run_fold_helper = memory.cache(_run_fold_helper, verbose=0, ignore=["train", "test", "fold_number"])
else:
run_fold_helper = _run_fold_helper