Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add predicted data samples when time series cycle duration is Hour #137

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions pkg/prediction/dsp/prediction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dsp

import (
"encoding/json"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -210,7 +211,10 @@ func (p *periodicSignalPrediction) updateAggregateSignals(id string, tsList []*c
var predictedTimeSeriesList []*common.TimeSeries

for _, ts := range tsList {
klog.V(4).InfoS("Got time series.", "queryExpr", id, "samples", ts.Samples, "labels", ts.Labels)
if klog.V(6).Enabled() {
sampleData, err := json.Marshal(ts.Samples)
klog.V(6).Infof("Got time series, queryExpr: %v, samples: %v, labels: %v, err: %v", id, string(sampleData), ts.Labels, err)
}
var chosenEstimator Estimator
var signal *Signal
var nCycles int
Expand All @@ -232,7 +236,7 @@ func (p *periodicSignalPrediction) updateAggregateSignals(id string, tsList []*c
signal = SamplesToSignal(ts.Samples, config.historyResolution)
signal, nCycles = signal.Truncate(cycleDuration)
if nCycles >= 2 {
chosenEstimator = bestEstimator(config.estimators, signal, nCycles, cycleDuration)
chosenEstimator = bestEstimator(id, config.estimators, signal, nCycles, cycleDuration)
}
}

Expand All @@ -241,13 +245,26 @@ func (p *periodicSignalPrediction) updateAggregateSignals(id string, tsList []*c
intervalSeconds := int64(config.historyResolution.Seconds())
nextTimestamp := ts.Samples[len(ts.Samples)-1].Timestamp + intervalSeconds

samples := make([]common.Sample, len(estimatedSignal.Samples))
for i := range estimatedSignal.Samples {
samples[i] = common.Sample{
Value: estimatedSignal.Samples[i],
Timestamp: nextTimestamp,
// Hack(temporary):
// Because the dsp predict only append one cycle points, when the cycle is hour, then estimate signal samples only contains at most one hour points
// This leads to tsp predictWindowSeconds more than 3600 will be always out of date. because the predicted data end point timestamp is always ts.Samples[len(ts.Samples)-1].Timestamp+ Hour in one model update interval loop
// If its cycle is hour, then we append 24 hour points to avoid the out of dated predicted data
// Alternative option 1: Reduce predictWindowSeconds in tsp less than one hour and model update interval to one hour too.
// Alternative option 2. Do not support hour cycle any more, because it is too short in production env. now the dsp can not handle hour cycle well.
cycles := 1
if cycleDuration == Hour {
cycles = 24
}
n := len(estimatedSignal.Samples)
samples := make([]common.Sample, n*cycles)
for c := 0; c < cycles; c++ {
for i := range estimatedSignal.Samples {
samples[i+c*n] = common.Sample{
Value: estimatedSignal.Samples[i],
Timestamp: nextTimestamp,
}
nextTimestamp += intervalSeconds
}
nextTimestamp += intervalSeconds
}

predictedTimeSeriesList = append(predictedTimeSeriesList, &common.TimeSeries{
Expand All @@ -268,7 +285,7 @@ func (p *periodicSignalPrediction) updateAggregateSignals(id string, tsList []*c
}
}

func bestEstimator(estimators []Estimator, signal *Signal, totalCycles int, cycleDuration time.Duration) Estimator {
func bestEstimator(id string, estimators []Estimator, signal *Signal, totalCycles int, cycleDuration time.Duration) Estimator {
samplesPerCycle := len(signal.Samples) / totalCycles

history := &Signal{
Expand All @@ -287,15 +304,15 @@ func bestEstimator(estimators []Estimator, signal *Signal, totalCycles int, cycl
estimated := estimators[i].GetEstimation(history, cycleDuration)
if estimated != nil {
pe, err := accuracy.PredictionError(actual.Samples, estimated.Samples)
klog.InfoS("Testing estimators ...", "estimator", estimators[i].String(), "error", pe)
klog.V(6).InfoS("Testing estimators ...", "key", id, "estimator", estimators[i].String(), "pe", pe, "error", err)
if err == nil && pe < minPE {
minPE = pe
bestEstimator = estimators[i]
}
}
}

klog.InfoS("Got the best estimator.", "estimator", bestEstimator.String(), "error", minPE)
klog.V(4).InfoS("Got the best estimator.", "key", id, "estimator", bestEstimator.String(), "minPE", minPE, "totalCycles", totalCycles)
return bestEstimator
}

Expand Down