From ffeb399684876fca5826d1804af8ddcd2d9753ad Mon Sep 17 00:00:00 2001 From: kitianFresh <1549722424@qq.com> Date: Thu, 21 Apr 2022 19:38:07 +0800 Subject: [PATCH] support init mode for metric model, and init by realtime provider is async until the data is accumlulating enough --- pkg/autoscaling/estimator/percentile.go | 43 ++++++- .../evpa/effective_vpa_controller.go | 3 + pkg/controller/timeseriesprediction/config.go | 55 +++++---- pkg/metricnaming/naming.go | 16 ++- pkg/metricquery/type.go | 18 +-- pkg/prediction/config/types.go | 13 +++ pkg/prediction/dsp/prediction.go | 4 + pkg/prediction/generic.go | 4 + pkg/prediction/interface.go | 3 + pkg/prediction/percentile/aggregate_signal.go | 7 ++ .../percentile/aggregate_signals.go | 57 ++++++++- pkg/prediction/percentile/config.go | 19 ++- pkg/prediction/percentile/prediction.go | 110 ++++++++++-------- pkg/providers/prom/prometheus.go | 4 +- pkg/recommend/advisor/ehpa.go | 13 ++- pkg/recommend/advisor/resource_request.go | 27 +++-- 16 files changed, 286 insertions(+), 110 deletions(-) diff --git a/pkg/autoscaling/estimator/percentile.go b/pkg/autoscaling/estimator/percentile.go index cd202599b..1751ba4b8 100644 --- a/pkg/autoscaling/estimator/percentile.go +++ b/pkg/autoscaling/estimator/percentile.go @@ -19,7 +19,7 @@ import ( "github.com/gocrane/crane/pkg/utils" ) -const callerFormat = "EVPACaller-%s-%s-%s" +const callerFormat = "EVPACaller-%s-%s" type PercentileResourceEstimator struct { Predictor prediction.Interface @@ -29,7 +29,9 @@ type PercentileResourceEstimator struct { func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler, config map[string]string, containerName string, currRes *corev1.ResourceRequirements) (corev1.ResourceList, error) { recommendResource := corev1.ResourceList{} + caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID)) cpuMetricNamer := &metricnaming.GeneralMetricNamer{ + CallerName: caller, Metric: &metricquery.Metric{ Type: metricquery.ContainerMetricType, MetricName: corev1.ResourceCPU.String(), @@ -43,7 +45,7 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi } cpuConfig := getCpuConfig(config) - tsList, err := utils.QueryPredictedValues(e.Predictor, fmt.Sprintf(callerFormat, string(evpa.UID), containerName, corev1.ResourceCPU), cpuConfig, cpuMetricNamer) + tsList, err := utils.QueryPredictedValues(e.Predictor, caller, cpuConfig, cpuMetricNamer) if err != nil { return nil, err } @@ -56,6 +58,7 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi recommendResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(cpuValue, resource.DecimalSI) memoryMetricNamer := &metricnaming.GeneralMetricNamer{ + CallerName: caller, Metric: &metricquery.Metric{ Type: metricquery.ContainerMetricType, MetricName: corev1.ResourceMemory.String(), @@ -69,7 +72,7 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi } memConfig := getMemConfig(config) - tsList, err = utils.QueryPredictedValues(e.Predictor, fmt.Sprintf(callerFormat, string(evpa.UID), containerName, corev1.ResourceMemory), memConfig, memoryMetricNamer) + tsList, err = utils.QueryPredictedValues(e.Predictor, caller, memConfig, memoryMetricNamer) if err != nil { return nil, err } @@ -86,7 +89,9 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler) { for _, containerPolicy := range evpa.Spec.ResourcePolicy.ContainerPolicies { + caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID)) cpuMetricNamer := &metricnaming.GeneralMetricNamer{ + CallerName: caller, Metric: &metricquery.Metric{ Type: metricquery.ContainerMetricType, MetricName: corev1.ResourceCPU.String(), @@ -98,12 +103,12 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe }, }, } - err := e.Predictor.DeleteQuery(cpuMetricNamer, fmt.Sprintf(callerFormat, string(evpa.UID), containerPolicy.ContainerName, corev1.ResourceCPU)) + err := e.Predictor.DeleteQuery(cpuMetricNamer, caller) if err != nil { klog.ErrorS(err, "Failed to delete query.", "queryExpr", cpuMetricNamer.BuildUniqueKey()) } - memoryMetricNamer := &metricnaming.GeneralMetricNamer{ + CallerName: caller, Metric: &metricquery.Metric{ Type: metricquery.ContainerMetricType, MetricName: corev1.ResourceMemory.String(), @@ -115,7 +120,7 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe }, }, } - err = e.Predictor.DeleteQuery(memoryMetricNamer, fmt.Sprintf(callerFormat, string(evpa.UID), containerPolicy.ContainerName, corev1.ResourceMemory)) + err = e.Predictor.DeleteQuery(memoryMetricNamer, caller) if err != nil { klog.ErrorS(err, "Failed to delete query.", "queryExpr", memoryMetricNamer.BuildUniqueKey()) } @@ -137,9 +142,22 @@ func getCpuConfig(config map[string]string) *predictionconfig.Config { marginFraction = "0.15" } + initModeStr, exists := config["cpu-model-init-mode"] + initMode := predictionconfig.ModelInitModeLazyTraining + if !exists { + initMode = predictionconfig.ModelInitMode(initModeStr) + } + + historyLength, exists := config["cpu-model-history-length"] + if !exists { + historyLength = "24h" + } + return &predictionconfig.Config{ + InitMode: &initMode, Percentile: &predictionapi.Percentile{ Aggregated: true, + HistoryLength: historyLength, SampleInterval: sampleInterval, MarginFraction: marginFraction, Percentile: percentile, @@ -166,9 +184,22 @@ func getMemConfig(props map[string]string) *predictionconfig.Config { marginFraction = "0.15" } + initModeStr, exists := props["mem-model-init-mode"] + initMode := predictionconfig.ModelInitModeLazyTraining + if !exists { + initMode = predictionconfig.ModelInitMode(initModeStr) + } + + historyLength, exists := props["mem-model-history-length"] + if !exists { + historyLength = "48h" + } + return &predictionconfig.Config{ + InitMode: &initMode, Percentile: &predictionapi.Percentile{ Aggregated: true, + HistoryLength: historyLength, SampleInterval: sampleInterval, MarginFraction: marginFraction, Percentile: percentile, diff --git a/pkg/controller/evpa/effective_vpa_controller.go b/pkg/controller/evpa/effective_vpa_controller.go index 35fb709be..4640efd4a 100644 --- a/pkg/controller/evpa/effective_vpa_controller.go +++ b/pkg/controller/evpa/effective_vpa_controller.go @@ -158,6 +158,9 @@ func recordMetric(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler, status *a "resourceName": fmt.Sprintf("%s/%s", evpa.Namespace, evpa.Spec.TargetRef.Name), } + if status.Recommendation == nil { + return + } for _, container := range status.Recommendation.ContainerRecommendations { resourceRequirement, found := utils.GetResourceByPodTemplate(podTemplate, container.ContainerName) if !found { diff --git a/pkg/controller/timeseriesprediction/config.go b/pkg/controller/timeseriesprediction/config.go index bcfdd0a8f..1aa8fc2f3 100644 --- a/pkg/controller/timeseriesprediction/config.go +++ b/pkg/controller/timeseriesprediction/config.go @@ -63,18 +63,21 @@ func (c *MetricContext) GetMetricNamer(conf *predictionapi.PredictionMetric) met return nil } if conf.ExpressionQuery != nil { - namer.Metric = &metricquery.Metric{ - Type: metricquery.PromQLMetricType, - MetricName: conf.ResourceIdentifier, - Prom: &metricquery.PromNamerInfo{ - QueryExpr: conf.ExpressionQuery.Expression, - Selector: labels.Nothing(), + namer = metricnaming.GeneralMetricNamer{ + CallerName: c.GetCaller(), + Metric: &metricquery.Metric{ + Type: metricquery.PromQLMetricType, + MetricName: conf.ResourceIdentifier, + Prom: &metricquery.PromNamerInfo{ + QueryExpr: conf.ExpressionQuery.Expression, + Selector: labels.Nothing(), + }, }, } klog.InfoS("GetQueryStr", "tsp", klog.KObj(c.SeriesPrediction), "queryExpr", conf.ExpressionQuery.Expression) } if conf.ResourceQuery != nil { - namer = c.ResourceToMetricNamer(conf.ResourceQuery) + namer = c.ResourceToMetricNamer(conf.ResourceQuery, c.GetCaller()) klog.InfoS("GetQueryStr", "tsp", klog.KObj(c.SeriesPrediction), "resourceQuery", conf.ResourceQuery) } return &namer @@ -139,30 +142,36 @@ func metricSelectorToQueryExpr(m *predictionapi.MetricQuery) string { return fmt.Sprintf("%s{%s}", m.MetricName, strings.Join(conditions, ",")) } -func (c *MetricContext) ResourceToMetricNamer(resourceName *corev1.ResourceName) metricnaming.GeneralMetricNamer { +func (c *MetricContext) ResourceToMetricNamer(resourceName *corev1.ResourceName, caller string) metricnaming.GeneralMetricNamer { var namer metricnaming.GeneralMetricNamer // Node if strings.ToLower(c.TargetKind) == strings.ToLower(predconf.TargetKindNode) { - namer.Metric = &metricquery.Metric{ - Type: metricquery.NodeMetricType, - MetricName: resourceName.String(), - Node: &metricquery.NodeNamerInfo{ - Name: c.Name, - Selector: labels.Everything(), + namer = metricnaming.GeneralMetricNamer{ + CallerName: caller, + Metric: &metricquery.Metric{ + Type: metricquery.NodeMetricType, + MetricName: resourceName.String(), + Node: &metricquery.NodeNamerInfo{ + Name: c.Name, + Selector: labels.Everything(), + }, }, } } else { // workload - namer.Metric = &metricquery.Metric{ - Type: metricquery.WorkloadMetricType, - MetricName: resourceName.String(), - Workload: &metricquery.WorkloadNamerInfo{ - Namespace: c.Namespace, - Kind: c.TargetKind, - APIVersion: c.APIVersion, - Name: c.Name, - Selector: c.Selector, + namer = metricnaming.GeneralMetricNamer{ + CallerName: caller, + Metric: &metricquery.Metric{ + Type: metricquery.WorkloadMetricType, + MetricName: resourceName.String(), + Workload: &metricquery.WorkloadNamerInfo{ + Namespace: c.Namespace, + Kind: c.TargetKind, + APIVersion: c.APIVersion, + Name: c.Name, + Selector: c.Selector, + }, }, } } diff --git a/pkg/metricnaming/naming.go b/pkg/metricnaming/naming.go index 38dc19492..a93165811 100644 --- a/pkg/metricnaming/naming.go +++ b/pkg/metricnaming/naming.go @@ -5,7 +5,7 @@ import ( "github.com/gocrane/crane/pkg/querybuilder" ) -// MetricNamer is an interface. it is the bridge between predictor and different data sources and other component. +// MetricNamer is an interface. it is the bridge between predictor and different data sources and other component such as caller. type MetricNamer interface { // Used for datasource provider, data source provider call QueryBuilder QueryBuilder() querybuilder.QueryBuilder @@ -13,10 +13,20 @@ type MetricNamer interface { BuildUniqueKey() string Validate() error + + // Means the caller of this MetricNamer, different caller maybe use the same metric + Caller() string } +var _ MetricNamer = &GeneralMetricNamer{} + type GeneralMetricNamer struct { - Metric *metricquery.Metric + Metric *metricquery.Metric + CallerName string +} + +func (gmn *GeneralMetricNamer) Caller() string { + return gmn.CallerName } func (gmn *GeneralMetricNamer) QueryBuilder() querybuilder.QueryBuilder { @@ -24,7 +34,7 @@ func (gmn *GeneralMetricNamer) QueryBuilder() querybuilder.QueryBuilder { } func (gmn *GeneralMetricNamer) BuildUniqueKey() string { - return gmn.Metric.BuildUniqueKey() + return gmn.CallerName + "/" + gmn.Metric.BuildUniqueKey() } func (gmn *GeneralMetricNamer) Validate() error { diff --git a/pkg/metricquery/type.go b/pkg/metricquery/type.go index acbbae3c6..e5f24adae 100644 --- a/pkg/metricquery/type.go +++ b/pkg/metricquery/type.go @@ -27,10 +27,10 @@ const ( var ( NotMatchWorkloadError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", WorkloadMetricType) - NotMatchContainerError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", ContainerMetricType) - NotMatchPodError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", PodMetricType) - NotMatchNodeError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", NodeMetricType) - NotMatchPromError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", PromQLMetricType) + NotMatchContainerError = fmt.Errorf("metric type %v, but no ContainerNamerInfo provided", ContainerMetricType) + NotMatchPodError = fmt.Errorf("metric type %v, but no PodNamerInfo provided", PodMetricType) + NotMatchNodeError = fmt.Errorf("metric type %v, but no NodeNamerInfo provided", NodeMetricType) + NotMatchPromError = fmt.Errorf("metric type %v, but no PromNamerInfo provided", PromQLMetricType) ) type Metric struct { @@ -153,7 +153,7 @@ func (m *Metric) keyByWorkload() string { m.Workload.APIVersion, m.Workload.Namespace, m.Workload.Name, - selectorStr}, "-") + selectorStr}, "_") } func (m *Metric) keyByContainer() string { @@ -168,7 +168,7 @@ func (m *Metric) keyByContainer() string { m.Container.WorkloadName, m.Container.PodName, m.Container.ContainerName, - selectorStr}, "-") + selectorStr}, "_") } func (m *Metric) keyByPod() string { @@ -181,7 +181,7 @@ func (m *Metric) keyByPod() string { strings.ToLower(m.MetricName), m.Pod.Namespace, m.Pod.Name, - selectorStr}, "-") + selectorStr}, "_") } func (m *Metric) keyByNode() string { selectorStr := "" @@ -192,7 +192,7 @@ func (m *Metric) keyByNode() string { string(m.Type), strings.ToLower(m.MetricName), m.Node.Name, - selectorStr}, "-") + selectorStr}, "_") } func (m *Metric) keyByPromQL() string { @@ -205,7 +205,7 @@ func (m *Metric) keyByPromQL() string { m.Prom.Namespace, strings.ToLower(m.MetricName), m.Prom.QueryExpr, - selectorStr}, "-") + selectorStr}, "_") } // Query is used to do query for different data source. you can extends it with your data source query diff --git a/pkg/prediction/config/types.go b/pkg/prediction/config/types.go index dc2e4c825..2782bd87a 100644 --- a/pkg/prediction/config/types.go +++ b/pkg/prediction/config/types.go @@ -10,7 +10,20 @@ type AlgorithmModelConfig struct { UpdateInterval time.Duration } +type ModelInitMode string + +const ( + // means recover or init the algorithm model directly from history datasource, this process may block because it is time consuming for data fetching & model gen + ModelInitModeHistory ModelInitMode = "history" + // means recover or init the algorithm model from real time datasource async, predictor can not do predicting before the data is accumulating to window length + // this is more safe to do some data accumulating and make the prediction data is robust. + ModelInitModeLazyTraining ModelInitMode = "lazytraining" + // means recover or init the model from a checkpoint, it can be restored directly and immediately to do predict. + ModelInitModeCheckpoint ModelInitMode = "checkpoint" +) + type Config struct { + InitMode *ModelInitMode DSP *v1alpha1.DSP Percentile *v1alpha1.Percentile } diff --git a/pkg/prediction/dsp/prediction.go b/pkg/prediction/dsp/prediction.go index bb72c259e..f4d9085b9 100644 --- a/pkg/prediction/dsp/prediction.go +++ b/pkg/prediction/dsp/prediction.go @@ -35,6 +35,10 @@ type periodicSignalPrediction struct { modelConfig config.AlgorithmModelConfig } +func (p *periodicSignalPrediction) QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (prediction.Status, error) { + panic("implement me") +} + func NewPrediction(realtimeProvider providers.RealTime, historyProvider providers.History, mc config.AlgorithmModelConfig) prediction.Interface { withCh, delCh := make(chan prediction.QueryExprWithCaller), make(chan prediction.QueryExprWithCaller) return &periodicSignalPrediction{ diff --git a/pkg/prediction/generic.go b/pkg/prediction/generic.go index 5169f12f3..79e420efe 100644 --- a/pkg/prediction/generic.go +++ b/pkg/prediction/generic.go @@ -26,6 +26,10 @@ const ( StatusNotStarted Status = "NotStarted" StatusUnknown Status = "Unknown" StatusDeleted Status = "Deleted" + // StatusInitializing means the prediction model is accumulating data until it satisfy the user specified time window such as 12h or 3d or 1w when use some real time data provider + // if support recover from checkpoint, then it maybe faster + StatusInitializing Status = "Initializing" + StatusExpired Status = "Expired" ) type WithMetricEvent struct { diff --git a/pkg/prediction/interface.go b/pkg/prediction/interface.go index f2b724007..01ac99ebe 100644 --- a/pkg/prediction/interface.go +++ b/pkg/prediction/interface.go @@ -19,6 +19,9 @@ type Interface interface { DeleteQuery(metricNamer metricnaming.MetricNamer, caller string) error + // QueryPredictionStatus return the metricNamer prediction status. it is predictable only when it is ready + QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (Status, error) + // QueryRealtimePredictedValues returns predicted values based on the specified query expression QueryRealtimePredictedValues(ctx context.Context, metricNamer metricnaming.MetricNamer) ([]*common.TimeSeries, error) diff --git a/pkg/prediction/percentile/aggregate_signal.go b/pkg/prediction/percentile/aggregate_signal.go index f58856cd8..8e3427b51 100644 --- a/pkg/prediction/percentile/aggregate_signal.go +++ b/pkg/prediction/percentile/aggregate_signal.go @@ -15,6 +15,7 @@ type aggregateSignal struct { lastSampleTime time.Time minSampleWeight float64 totalSamplesCount int + sampleInterval time.Duration creationTime time.Time labels []common.Label } @@ -30,10 +31,16 @@ func (a *aggregateSignal) addSample(sampleTime time.Time, sampleValue float64) { a.totalSamplesCount++ } +// largest is 290 years, so it can not be overflow +func (a *aggregateSignal) GetAggregationWindowLength() time.Duration { + return time.Duration(a.totalSamplesCount) * a.sampleInterval +} + func newAggregateSignal(c *internalConfig) *aggregateSignal { return &aggregateSignal{ histogram: vpa.NewHistogram(c.histogramOptions), minSampleWeight: c.minSampleWeight, creationTime: time.Now(), + sampleInterval: c.sampleInterval, } } diff --git a/pkg/prediction/percentile/aggregate_signals.go b/pkg/prediction/percentile/aggregate_signals.go index 545360a78..e0a50ad97 100644 --- a/pkg/prediction/percentile/aggregate_signals.go +++ b/pkg/prediction/percentile/aggregate_signals.go @@ -13,6 +13,11 @@ type aggregateSignals struct { callerMap map[string] /*expr*/ map[string] /*caller*/ struct{} signalMap map[string] /*expr*/ map[string] /*key*/ *aggregateSignal statusMap map[string] /*expr*/ prediction.Status + /** + todo: later we should split the predictor to another service as a common service, maybe an AI like system + different caller has different config. this is inevitable because we provide different features in one craned, both use underlying prediction + now we can not control the param of different callers. if we use only one config, then evpa & tsp & recommendation will interference and override with each other + */ configMap map[string] /*expr*/ *internalConfig } @@ -32,7 +37,7 @@ func (a *aggregateSignals) Add(qc prediction.QueryExprWithCaller) bool { QueryExpr := qc.MetricNamer.BuildUniqueKey() if qc.Config.Percentile != nil { - cfg, err := makeInternalConfig(qc.Config.Percentile) + cfg, err := makeInternalConfig(qc.Config.Percentile, qc.Config.InitMode) if err != nil { klog.ErrorS(err, "Failed to make internal config.", "queryExpr", QueryExpr) } else { @@ -44,7 +49,7 @@ func (a *aggregateSignals) Add(qc prediction.QueryExprWithCaller) bool { a.callerMap[QueryExpr] = map[string]struct{}{} } - if status, exists := a.statusMap[QueryExpr]; !exists || status == prediction.StatusDeleted { + if _, exists := a.statusMap[QueryExpr]; !exists { a.statusMap[QueryExpr] = prediction.StatusNotStarted } @@ -79,7 +84,7 @@ func (a *aggregateSignals) Delete(qc prediction.QueryExprWithCaller) bool /*need delete(a.callerMap, QueryExpr) delete(a.signalMap, QueryExpr) delete(a.configMap, QueryExpr) - a.statusMap[QueryExpr] = prediction.StatusDeleted + delete(a.statusMap, QueryExpr) return true } @@ -89,9 +94,6 @@ func (a *aggregateSignals) GetConfig(queryExpr string) *internalConfig { if a.configMap[queryExpr] != nil { return a.configMap[queryExpr] } - if a.statusMap[queryExpr] != prediction.StatusReady { - return nil - } return &defaultInternalConfig } @@ -107,6 +109,28 @@ func (a *aggregateSignals) SetSignal(queryExpr string, key string, signal *aggre a.statusMap[queryExpr] = prediction.StatusReady } +func (a *aggregateSignals) SetSignalWithStatus(id, key string, signal *aggregateSignal, status prediction.Status) { + a.mutex.Lock() + defer a.mutex.Unlock() + + if _, exists := a.signalMap[id]; !exists { + return + } + + a.signalMap[id][key] = signal + a.statusMap[id] = status +} + +func (a *aggregateSignals) SetSignalStatus(id, key string, status prediction.Status) { + a.mutex.Lock() + defer a.mutex.Unlock() + + if _, exists := a.signalMap[id]; !exists { + return + } + a.statusMap[id] = status +} + func (a *aggregateSignals) GetSignal(queryExpr string, key string) *aggregateSignal { a.mutex.RLock() defer a.mutex.RUnlock() @@ -146,6 +170,27 @@ func (a *aggregateSignals) SetSignals(queryExpr string, signals map[string]*aggr a.statusMap[queryExpr] = prediction.StatusReady } +func (a *aggregateSignals) SetSignalsWithStatus(queryExpr string, signals map[string]*aggregateSignal, status prediction.Status) { + a.mutex.Lock() + defer a.mutex.Unlock() + if _, exists := a.signalMap[queryExpr]; !exists { + return + } + for k, v := range signals { + a.signalMap[queryExpr][k] = v + } + a.statusMap[queryExpr] = status +} + +func (a *aggregateSignals) SetSignalsStatus(queryExpr string, status prediction.Status) { + a.mutex.Lock() + defer a.mutex.Unlock() + if _, exists := a.signalMap[queryExpr]; !exists { + return + } + a.statusMap[queryExpr] = status +} + func (a *aggregateSignals) GetSignals(queryExpr string) (map[string]*aggregateSignal, prediction.Status) { a.mutex.RLock() defer a.mutex.RUnlock() diff --git a/pkg/prediction/percentile/config.go b/pkg/prediction/percentile/config.go index cf86aa428..591fc333a 100644 --- a/pkg/prediction/percentile/config.go +++ b/pkg/prediction/percentile/config.go @@ -9,6 +9,7 @@ import ( "github.com/gocrane/api/prediction/v1alpha1" + "github.com/gocrane/crane/pkg/prediction/config" "github.com/gocrane/crane/pkg/utils" ) @@ -25,6 +26,7 @@ var defaultInternalConfig = internalConfig{ marginFraction: defaultMarginFraction, percentile: defaultPercentile, histogramOptions: defaultHistogramOptions, + historyLength: time.Hour * 24 * 7, } type internalConfig struct { @@ -36,6 +38,7 @@ type internalConfig struct { minSampleWeight float64 marginFraction float64 percentile float64 + initMode config.ModelInitMode } func (c *internalConfig) String() string { @@ -43,11 +46,17 @@ func (c *internalConfig) String() string { c.aggregated, c.historyLength, c.sampleInterval, c.histogramDecayHalfLife, c.minSampleWeight, c.marginFraction, c.percentile) } -func makeInternalConfig(p *v1alpha1.Percentile) (*internalConfig, error) { +// todo: later better to refine the algorithm params to a map not a struct to get more extendability, +// if not, we add some param is very difficult because it will modify crane api +func makeInternalConfig(p *v1alpha1.Percentile, initMode *config.ModelInitMode) (*internalConfig, error) { sampleInterval, err := utils.ParseDuration(p.SampleInterval) if err != nil { return nil, err } + historyLength, err := utils.ParseDuration(p.HistoryLength) + if err != nil { + return nil, err + } halfLife, err := utils.ParseDuration(p.Histogram.HalfLife) if err != nil { @@ -122,9 +131,15 @@ func makeInternalConfig(p *v1alpha1.Percentile) (*internalConfig, error) { return nil, err } + // default use history + mode := config.ModelInitModeHistory + if initMode != nil { + mode = *initMode + } c := &internalConfig{ + initMode: mode, aggregated: p.Aggregated, - historyLength: time.Hour * 24 * 7, + historyLength: historyLength, sampleInterval: sampleInterval, histogramOptions: options, histogramDecayHalfLife: halfLife, diff --git a/pkg/prediction/percentile/prediction.go b/pkg/prediction/percentile/prediction.go index 9e5119b5b..c4062fb41 100644 --- a/pkg/prediction/percentile/prediction.go +++ b/pkg/prediction/percentile/prediction.go @@ -23,6 +23,11 @@ type percentilePrediction struct { stopChMap sync.Map } +func (p *percentilePrediction) QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (prediction.Status, error) { + _, status := p.a.GetSignals(metricNamer.BuildUniqueKey()) + return status, nil +} + func (p *percentilePrediction) QueryPredictedTimeSeries(ctx context.Context, namer metricnaming.MetricNamer, startTime time.Time, endTime time.Time) ([]*common.TimeSeries, error) { var predictedTimeSeriesList []*common.TimeSeries queryExpr := namer.BuildUniqueKey() @@ -59,8 +64,8 @@ func (p *percentilePrediction) getPredictedValues(ctx context.Context, namer met queryExpr := namer.BuildUniqueKey() for { signals, status := p.a.GetSignals(queryExpr) - if status == prediction.StatusDeleted { - klog.V(4).InfoS("Aggregated has been deleted.", "queryExpr", queryExpr) + if status == prediction.StatusUnknown { + klog.V(4).InfoS("Aggregated has been deleted and unknown", "queryExpr", queryExpr) return predictedTimeSeriesList } if signals != nil && status == prediction.StatusReady { @@ -112,6 +117,11 @@ func (p *percentilePrediction) getPredictedValues(ctx context.Context, namer met } func (p *percentilePrediction) QueryRealtimePredictedValues(ctx context.Context, namer metricnaming.MetricNamer) ([]*common.TimeSeries, error) { + queryExpr := namer.BuildUniqueKey() + _, status := p.a.GetSignals(queryExpr) + if status != prediction.StatusReady { + return nil, fmt.Errorf("metric %v model status is %v, must be ready", queryExpr, status) + } return p.getPredictedValues(ctx, namer), nil } @@ -175,7 +185,7 @@ func (p *percentilePrediction) process(namer metricnaming.MetricNamer, config co var historyTimeSeriesList []*common.TimeSeries var err error queryExpr := namer.BuildUniqueKey() - cfg, err := makeInternalConfig(config.Percentile) + cfg, err := makeInternalConfig(config.Percentile, config.InitMode) if err != nil { return nil, err } @@ -276,14 +286,31 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { // todo: Do not block this management go routine here to do some time consuming operation. // We just init the signal and setting the status // we start the real time model updating directly. but there is a window time for each metricNamer in the algorithm config to ready status - if err := p.init(qc.MetricNamer); err != nil { - klog.ErrorS(err, "Failed to init percentilePrediction.") + c := p.a.GetConfig(QueryExpr) + if c == nil { + c = &defaultInternalConfig + } + + var initError error + switch c.initMode { + case config.ModelInitModeLazyTraining: + p.initByRealTimeProvider(qc.MetricNamer) + case config.ModelInitModeCheckpoint: + initError = p.initByCheckPoint(qc.MetricNamer) + case config.ModelInitModeHistory: + fallthrough + default: + // blocking + initError = p.initFromHistory(qc.MetricNamer) + } + + if initError != nil { + klog.ErrorS(initError, "Failed to init percentilePrediction.") continue } - // bug here: - // 1. first, judge the metric is already started, if so, we do not start the updating model routine again. - // 2. same query, but different config means different series analysis, but here GetConfig always return same config. + // note: same query, but different config means different series analysis, GetConfig always return same config. + // this is our default policy, one metric only has one config at a time. go func(namer metricnaming.MetricNamer) { queryExpr := namer.BuildUniqueKey() if c := p.a.GetConfig(queryExpr); c != nil { @@ -353,51 +380,26 @@ func (p *percentilePrediction) queryHistoryTimeSeries(namer metricnaming.MetricN // So, we can set a waiting time for the model trained completed. Because percentile is only used for request resource & resource estimation. // Because of the scenes, we do not need it give a result fastly after service start, we can tolerate it has some days delaying for collecting more data. // nolint:unused -func (p *percentilePrediction) initByRealTimeProvider(namer metricnaming.MetricNamer) error { +func (p *percentilePrediction) initByRealTimeProvider(namer metricnaming.MetricNamer) { queryExpr := namer.BuildUniqueKey() cfg := p.a.GetConfig(queryExpr) - latestTimeSeriesList, err := p.GetRealtimeProvider().QueryLatestTimeSeries(namer) - if err != nil { - klog.ErrorS(err, "Failed to get latest time series.") - return err - } if cfg.aggregated { signal := newAggregateSignal(cfg) - for _, ts := range latestTimeSeriesList { - for _, s := range ts.Samples { - t := time.Unix(s.Timestamp, 0) - signal.addSample(t, s.Value) - } - } - p.a.SetSignal(queryExpr, "__all__", signal) + + p.a.SetSignalWithStatus(queryExpr, "__all__", signal, prediction.StatusInitializing) } else { signals := map[string]*aggregateSignal{} - for _, ts := range latestTimeSeriesList { - if len(ts.Samples) < 1 { - continue - } - key := prediction.AggregateSignalKey(ts.Labels) - signal := newAggregateSignal(cfg) - for _, s := range ts.Samples { - t := time.Unix(s.Timestamp, 0) - signal.addSample(t, s.Value) - } - signal.labels = ts.Labels - signals[key] = signal - } - p.a.SetSignals(queryExpr, signals) + p.a.SetSignalsWithStatus(queryExpr, signals, prediction.StatusInitializing) } - - return nil } // todo: // nolint:unused func (p *percentilePrediction) initByCheckPoint(namer metricnaming.MetricNamer) error { - return nil + return fmt.Errorf("Do not support checkpoint now") } -func (p *percentilePrediction) init(namer metricnaming.MetricNamer) error { +func (p *percentilePrediction) initFromHistory(namer metricnaming.MetricNamer) error { queryExpr := namer.BuildUniqueKey() cfg := p.a.GetConfig(queryExpr) // Query history data for prediction @@ -449,11 +451,6 @@ func (p *percentilePrediction) addSamples(namer metricnaming.MetricNamer) { queryExpr := namer.BuildUniqueKey() c := p.a.GetConfig(queryExpr) - if _, status := p.a.GetSignals(queryExpr); status != prediction.StatusReady { - klog.InfoS("Aggregate signals not ready.", "queryExpr", queryExpr, "status", status) - return - } - if c.aggregated { key := "__all__" signal := p.a.GetSignal(queryExpr, key) @@ -468,7 +465,17 @@ func (p *percentilePrediction) addSamples(namer metricnaming.MetricNamer) { sample := ts.Samples[len(ts.Samples)-1] sampleTime := time.Unix(sample.Timestamp, 0) signal.addSample(sampleTime, sample.Value) - klog.V(6).InfoS("Sample added.", "sampleValue", sample.Value, "sampleTime", sampleTime, "queryExpr", queryExpr) + + // current time is reach the window length of percentile need to accumulating data, the model is ready to do predict + // all type need the aggregation window length is accumulating enough data. + // History: if the container is newly, then there maybe has not enough history data, so we need accumulating to make the confidence more reliable + // LazyTraining: directly accumulating data from real time metric provider until the data is enough + // Checkpoint: directly recover the model from a checkpoint, and then updating the model until accumulated data is enough + if signal.GetAggregationWindowLength() >= c.historyLength { + p.a.SetSignalStatus(queryExpr, key, prediction.StatusReady) + } + + klog.V(6).InfoS("Sample added.", "sampleValue", sample.Value, "sampleTime", sampleTime, "queryExpr", queryExpr, "history", c.historyLength, "aggregationWindowLength", signal.GetAggregationWindowLength()) } } else { // todo: find a way to remove the labels key, although we do not really use it now. @@ -485,11 +492,22 @@ func (p *percentilePrediction) addSamples(namer metricnaming.MetricNamer) { } sample := ts.Samples[len(ts.Samples)-1] sampleTime := time.Unix(sample.Timestamp, 0) + signal.addSample(sampleTime, sample.Value) if len(signal.labels) == 0 { signal.labels = ts.Labels } - klog.V(6).InfoS("Sample added.", "sampleValue", sample.Value, "sampleTime", sampleTime, "queryExpr", queryExpr, "key", key) + + // current time is reach the window length of percentile need to accumulating data, the model is ready to do predict + // all type need the aggregation window length is accumulating enough data. + // History: if the container is newly, then there maybe has not enough history data, so we need accumulating to make the confidence more reliable + // LazyTraining: directly accumulating data from real time metric provider until the data is enough + // Checkpoint: directly recover the model from a checkpoint, and then updating the model until accumulated data is enough + if signal.GetAggregationWindowLength() >= c.historyLength { + p.a.SetSignalStatus(queryExpr, key, prediction.StatusReady) + } + + klog.V(6).InfoS("Sample added.", "sampleValue", sample.Value, "sampleTime", sampleTime, "queryExpr", queryExpr, "key", key, "history", c.historyLength, "aggregationWindowLength", signal.GetAggregationWindowLength()) } } } diff --git a/pkg/providers/prom/prometheus.go b/pkg/providers/prom/prometheus.go index 00f0f72ca..7a34720d3 100644 --- a/pkg/providers/prom/prometheus.go +++ b/pkg/providers/prom/prometheus.go @@ -37,7 +37,7 @@ func (p *prom) QueryTimeSeries(namer metricnaming.MetricNamer, startTime time.Ti klog.Errorf("Failed to BuildQuery: %v", err) return nil, err } - klog.V(6).Infof("QueryTimeSeries metricNamer %v, timeout: %v", namer.BuildUniqueKey(), p.config.Timeout) + klog.V(6).Infof("QueryTimeSeries metricNamer %v, timeout: %v, query: %v", namer.BuildUniqueKey(), p.config.Timeout, promQuery.Prometheus.Query) timeoutCtx, cancelFunc := gocontext.WithTimeout(gocontext.Background(), p.config.Timeout) defer cancelFunc() timeSeries, err := p.ctx.QueryRangeSync(timeoutCtx, promQuery.Prometheus.Query, startTime, endTime, step) @@ -59,7 +59,7 @@ func (p *prom) QueryLatestTimeSeries(namer metricnaming.MetricNamer) ([]*common. //end := time.Now() // avoid no data latest. multiply 2 //start := end.Add(-step * 2) - klog.V(6).Infof("QueryLatestTimeSeries metricNamer %v, timeout: %v", namer.BuildUniqueKey(), p.config.Timeout) + klog.V(6).Infof("QueryLatestTimeSeries metricNamer %v, timeout: %v, query: %v", namer.BuildUniqueKey(), p.config.Timeout, promQuery.Prometheus.Query) timeoutCtx, cancelFunc := gocontext.WithTimeout(gocontext.Background(), p.config.Timeout) defer cancelFunc() timeSeries, err := p.ctx.QuerySync(timeoutCtx, promQuery.Prometheus.Query) diff --git a/pkg/recommend/advisor/ehpa.go b/pkg/recommend/advisor/ehpa.go index d686bc7d7..0d90aa37d 100644 --- a/pkg/recommend/advisor/ehpa.go +++ b/pkg/recommend/advisor/ehpa.go @@ -47,7 +47,8 @@ func (a *EHPAAdvisor) Advise(proposed *types.ProposedRecommendation) error { if err != nil { return err } - metricNamer := ResourceToWorkloadMetricNamer(target, &resourceCpu, labelSelector) + caller := fmt.Sprintf(callerFormat, klog.KObj(a.Recommendation), a.Recommendation.UID) + metricNamer := ResourceToWorkloadMetricNamer(target, &resourceCpu, labelSelector, caller) if err := metricNamer.Validate(); err != nil { return err } @@ -62,7 +63,7 @@ func (a *EHPAAdvisor) Advise(proposed *types.ProposedRecommendation) error { } cpuConfig := getPredictionCpuConfig() - tsListPrediction, err := utils.QueryPredictedTimeSeriesOnce(p, fmt.Sprintf(callerFormat, a.Recommendation.UID), + tsListPrediction, err := utils.QueryPredictedTimeSeriesOnce(p, caller, getPredictionCpuConfig(), metricNamer, timeNow, @@ -265,11 +266,12 @@ func (a *EHPAAdvisor) proposeTargetUtilization() (int32, int64, error) { var cpuUsage float64 // use percentile algo to get the 99 percentile cpu usage for this target for _, container := range a.PodTemplate.Spec.Containers { - metricNamer := ResourceToContainerMetricNamer(a.Recommendation.Spec.TargetRef.Namespace, a.Recommendation.Spec.TargetRef.Name, container.Name, corev1.ResourceCPU) + caller := fmt.Sprintf(callerFormat, klog.KObj(a.Recommendation), a.Recommendation.UID) + metricNamer := ResourceToContainerMetricNamer(a.Recommendation.Spec.TargetRef.Namespace, a.Recommendation.Spec.TargetRef.Name, container.Name, corev1.ResourceCPU, caller) cpuConfig := makeCpuConfig(a.ConfigProperties) tsList, err := utils.QueryPredictedValuesOnce(a.Recommendation, percentilePredictor, - fmt.Sprintf(callerFormat, a.Recommendation.UID), + caller, cpuConfig, metricNamer) if err != nil { @@ -389,9 +391,10 @@ func GetTargetLabelSelector(target *corev1.ObjectReference, scale *v1.Scale, ds } } -func ResourceToWorkloadMetricNamer(target *corev1.ObjectReference, resourceName *corev1.ResourceName, workloadLabelSelector labels.Selector) metricnaming.MetricNamer { +func ResourceToWorkloadMetricNamer(target *corev1.ObjectReference, resourceName *corev1.ResourceName, workloadLabelSelector labels.Selector, caller string) metricnaming.MetricNamer { // workload return &metricnaming.GeneralMetricNamer{ + CallerName: caller, Metric: &metricquery.Metric{ Type: metricquery.WorkloadMetricType, MetricName: resourceName.String(), diff --git a/pkg/recommend/advisor/resource_request.go b/pkg/recommend/advisor/resource_request.go index 24265dacf..c5022b418 100644 --- a/pkg/recommend/advisor/resource_request.go +++ b/pkg/recommend/advisor/resource_request.go @@ -17,7 +17,7 @@ import ( "github.com/gocrane/crane/pkg/utils" ) -const callerFormat = "RecommendationCaller-%s" +const callerFormat = "RecommendationCaller-%s-%s" const ( DefaultNamespace = "default" @@ -41,9 +41,14 @@ func makeCpuConfig(props map[string]string) *config.Config { marginFraction = "0.15" } + historyLength, exists := props["resource.cpu-model-history-length"] + if !exists { + historyLength = "168h" + } return &config.Config{ Percentile: &predictionapi.Percentile{ Aggregated: true, + HistoryLength: historyLength, SampleInterval: sampleInterval, MarginFraction: marginFraction, Percentile: percentile, @@ -70,9 +75,15 @@ func makeMemConfig(props map[string]string) *config.Config { marginFraction = "0.15" } + historyLength, exists := props["resource.mem-model-history-length"] + if !exists { + historyLength = "168h" + } + return &config.Config{ Percentile: &predictionapi.Percentile{ Aggregated: true, + HistoryLength: historyLength, SampleInterval: sampleInterval, MarginFraction: marginFraction, Percentile: percentile, @@ -106,11 +117,11 @@ func (a *ResourceRequestAdvisor) Advise(proposed *types.ProposedRecommendation) Target: map[corev1.ResourceName]string{}, } - metricNamer := ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.Name, c.Name, corev1.ResourceCPU) + caller := fmt.Sprintf(callerFormat, klog.KObj(a.Recommendation), a.Recommendation.UID) + metricNamer := ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.Name, c.Name, corev1.ResourceCPU, caller) klog.V(6).Infof("CPU query for resource request recommendation: %s", metricNamer.BuildUniqueKey()) cpuConfig := makeCpuConfig(a.ConfigProperties) - tsList, err := utils.QueryPredictedValuesOnce(a.Recommendation, p, - fmt.Sprintf(callerFormat, a.Recommendation.UID), cpuConfig, metricNamer) + tsList, err := utils.QueryPredictedValuesOnce(a.Recommendation, p, caller, cpuConfig, metricNamer) if err != nil { return err } @@ -120,11 +131,10 @@ func (a *ResourceRequestAdvisor) Advise(proposed *types.ProposedRecommendation) v := int64(tsList[0].Samples[0].Value * 1000) cr.Target[corev1.ResourceCPU] = resource.NewMilliQuantity(v, resource.DecimalSI).String() - metricNamer = ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.Name, c.Name, corev1.ResourceMemory) + metricNamer = ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.Name, c.Name, corev1.ResourceMemory, caller) klog.V(6).Infof("Memory query for resource request recommendation: %s", metricNamer.BuildUniqueKey()) memConfig := makeMemConfig(a.ConfigProperties) - tsList, err = utils.QueryPredictedValuesOnce(a.Recommendation, p, - fmt.Sprintf(callerFormat, a.Recommendation.UID), memConfig, metricNamer) + tsList, err = utils.QueryPredictedValuesOnce(a.Recommendation, p, caller, memConfig, metricNamer) if err != nil { return err } @@ -145,9 +155,10 @@ func (a *ResourceRequestAdvisor) Name() string { return "ResourceRequestAdvisor" } -func ResourceToContainerMetricNamer(namespace, workloadname, containername string, resourceName corev1.ResourceName) metricnaming.MetricNamer { +func ResourceToContainerMetricNamer(namespace, workloadname, containername string, resourceName corev1.ResourceName, caller string) metricnaming.MetricNamer { // container return &metricnaming.GeneralMetricNamer{ + CallerName: caller, Metric: &metricquery.Metric{ Type: metricquery.ContainerMetricType, MetricName: resourceName.String(),