Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support init mode for metric model #278

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions pkg/autoscaling/estimator/percentile.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/gocrane/crane/pkg/utils"
)

const callerFormat = "EVPACaller-%s-%s-%s"
const callerFormat = "EVPACaller-%s-%s"

type PercentileResourceEstimator struct {
Predictor prediction.Interface
Expand All @@ -29,7 +29,9 @@ type PercentileResourceEstimator struct {
func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler, config map[string]string, containerName string, currRes *corev1.ResourceRequirements) (corev1.ResourceList, error) {
recommendResource := corev1.ResourceList{}

caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID))
cpuMetricNamer := &metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.ContainerMetricType,
MetricName: corev1.ResourceCPU.String(),
Expand All @@ -43,7 +45,7 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi
}

cpuConfig := getCpuConfig(config)
tsList, err := utils.QueryPredictedValues(e.Predictor, fmt.Sprintf(callerFormat, string(evpa.UID), containerName, corev1.ResourceCPU), cpuConfig, cpuMetricNamer)
tsList, err := utils.QueryPredictedValues(e.Predictor, caller, cpuConfig, cpuMetricNamer)
if err != nil {
return nil, err
}
Expand All @@ -56,6 +58,7 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi
recommendResource[corev1.ResourceCPU] = *resource.NewMilliQuantity(cpuValue, resource.DecimalSI)

memoryMetricNamer := &metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.ContainerMetricType,
MetricName: corev1.ResourceMemory.String(),
Expand All @@ -69,7 +72,7 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi
}

memConfig := getMemConfig(config)
tsList, err = utils.QueryPredictedValues(e.Predictor, fmt.Sprintf(callerFormat, string(evpa.UID), containerName, corev1.ResourceMemory), memConfig, memoryMetricNamer)
tsList, err = utils.QueryPredictedValues(e.Predictor, caller, memConfig, memoryMetricNamer)
if err != nil {
return nil, err
}
Expand All @@ -86,7 +89,9 @@ func (e *PercentileResourceEstimator) GetResourceEstimation(evpa *autoscalingapi

func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler) {
for _, containerPolicy := range evpa.Spec.ResourcePolicy.ContainerPolicies {
caller := fmt.Sprintf(callerFormat, klog.KObj(evpa), string(evpa.UID))
cpuMetricNamer := &metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.ContainerMetricType,
MetricName: corev1.ResourceCPU.String(),
Expand All @@ -98,12 +103,12 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe
},
},
}
err := e.Predictor.DeleteQuery(cpuMetricNamer, fmt.Sprintf(callerFormat, string(evpa.UID), containerPolicy.ContainerName, corev1.ResourceCPU))
err := e.Predictor.DeleteQuery(cpuMetricNamer, caller)
if err != nil {
klog.ErrorS(err, "Failed to delete query.", "queryExpr", cpuMetricNamer.BuildUniqueKey())
}

memoryMetricNamer := &metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.ContainerMetricType,
MetricName: corev1.ResourceMemory.String(),
Expand All @@ -115,7 +120,7 @@ func (e *PercentileResourceEstimator) DeleteEstimation(evpa *autoscalingapi.Effe
},
},
}
err = e.Predictor.DeleteQuery(memoryMetricNamer, fmt.Sprintf(callerFormat, string(evpa.UID), containerPolicy.ContainerName, corev1.ResourceMemory))
err = e.Predictor.DeleteQuery(memoryMetricNamer, caller)
if err != nil {
klog.ErrorS(err, "Failed to delete query.", "queryExpr", memoryMetricNamer.BuildUniqueKey())
}
Expand All @@ -137,9 +142,22 @@ func getCpuConfig(config map[string]string) *predictionconfig.Config {
marginFraction = "0.15"
}

initModeStr, exists := config["cpu-model-init-mode"]
initMode := predictionconfig.ModelInitModeLazyTraining
if !exists {
initMode = predictionconfig.ModelInitMode(initModeStr)
}

historyLength, exists := config["cpu-model-history-length"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving the constant to a common file, like EVPACaller-%s-%scpu-model-history-length24h etc.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving the constant to a common file, like EVPACaller-%s-%scpu-model-history-length24h etc.?

Yes, we can do it later

if !exists {
historyLength = "24h"
}

return &predictionconfig.Config{
InitMode: &initMode,
Percentile: &predictionapi.Percentile{
Aggregated: true,
HistoryLength: historyLength,
SampleInterval: sampleInterval,
MarginFraction: marginFraction,
Percentile: percentile,
Expand All @@ -166,9 +184,22 @@ func getMemConfig(props map[string]string) *predictionconfig.Config {
marginFraction = "0.15"
}

initModeStr, exists := props["mem-model-init-mode"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of memory is almost same like cpu, how about a function ?

initMode := predictionconfig.ModelInitModeLazyTraining
if !exists {
initMode = predictionconfig.ModelInitMode(initModeStr)
}

historyLength, exists := props["mem-model-history-length"]
if !exists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why history length of cpu and memory is not same, cpu is 24h but memory is 48h?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why history length of cpu and memory is not same, cpu is 24h but memory is 48h?

This is just an empirical value borrowed from vpa. memory is incompressible resource, so use longer history data is more safe and robust. cpu is compressible resource, and generally it is daily cycle because of people traffic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

historyLength = "48h"
}

return &predictionconfig.Config{
InitMode: &initMode,
Percentile: &predictionapi.Percentile{
Aggregated: true,
HistoryLength: historyLength,
SampleInterval: sampleInterval,
MarginFraction: marginFraction,
Percentile: percentile,
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/evpa/effective_vpa_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func recordMetric(evpa *autoscalingapi.EffectiveVerticalPodAutoscaler, status *a
"resourceName": fmt.Sprintf("%s/%s", evpa.Namespace, evpa.Spec.TargetRef.Name),
}

if status.Recommendation == nil {
return
}
for _, container := range status.Recommendation.ContainerRecommendations {
resourceRequirement, found := utils.GetResourceByPodTemplate(podTemplate, container.ContainerName)
if !found {
Expand Down
55 changes: 32 additions & 23 deletions pkg/controller/timeseriesprediction/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,21 @@ func (c *MetricContext) GetMetricNamer(conf *predictionapi.PredictionMetric) met
return nil
}
if conf.ExpressionQuery != nil {
namer.Metric = &metricquery.Metric{
Type: metricquery.PromQLMetricType,
MetricName: conf.ResourceIdentifier,
Prom: &metricquery.PromNamerInfo{
QueryExpr: conf.ExpressionQuery.Expression,
Selector: labels.Nothing(),
namer = metricnaming.GeneralMetricNamer{
CallerName: c.GetCaller(),
Metric: &metricquery.Metric{
Type: metricquery.PromQLMetricType,
MetricName: conf.ResourceIdentifier,
Prom: &metricquery.PromNamerInfo{
QueryExpr: conf.ExpressionQuery.Expression,
Selector: labels.Nothing(),
},
},
}
klog.InfoS("GetQueryStr", "tsp", klog.KObj(c.SeriesPrediction), "queryExpr", conf.ExpressionQuery.Expression)
}
if conf.ResourceQuery != nil {
namer = c.ResourceToMetricNamer(conf.ResourceQuery)
namer = c.ResourceToMetricNamer(conf.ResourceQuery, c.GetCaller())
klog.InfoS("GetQueryStr", "tsp", klog.KObj(c.SeriesPrediction), "resourceQuery", conf.ResourceQuery)
}
return &namer
Expand Down Expand Up @@ -139,30 +142,36 @@ func metricSelectorToQueryExpr(m *predictionapi.MetricQuery) string {
return fmt.Sprintf("%s{%s}", m.MetricName, strings.Join(conditions, ","))
}

func (c *MetricContext) ResourceToMetricNamer(resourceName *corev1.ResourceName) metricnaming.GeneralMetricNamer {
func (c *MetricContext) ResourceToMetricNamer(resourceName *corev1.ResourceName, caller string) metricnaming.GeneralMetricNamer {
var namer metricnaming.GeneralMetricNamer

// Node
if strings.ToLower(c.TargetKind) == strings.ToLower(predconf.TargetKindNode) {
namer.Metric = &metricquery.Metric{
Type: metricquery.NodeMetricType,
MetricName: resourceName.String(),
Node: &metricquery.NodeNamerInfo{
Name: c.Name,
Selector: labels.Everything(),
namer = metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.NodeMetricType,
MetricName: resourceName.String(),
Node: &metricquery.NodeNamerInfo{
Name: c.Name,
Selector: labels.Everything(),
},
},
}
} else {
// workload
namer.Metric = &metricquery.Metric{
Type: metricquery.WorkloadMetricType,
MetricName: resourceName.String(),
Workload: &metricquery.WorkloadNamerInfo{
Namespace: c.Namespace,
Kind: c.TargetKind,
APIVersion: c.APIVersion,
Name: c.Name,
Selector: c.Selector,
namer = metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.WorkloadMetricType,
MetricName: resourceName.String(),
Workload: &metricquery.WorkloadNamerInfo{
Namespace: c.Namespace,
Kind: c.TargetKind,
APIVersion: c.APIVersion,
Name: c.Name,
Selector: c.Selector,
},
},
}
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/metricnaming/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,36 @@ import (
"github.com/gocrane/crane/pkg/querybuilder"
)

// MetricNamer is an interface. it is the bridge between predictor and different data sources and other component.
// MetricNamer is an interface. it is the bridge between predictor and different data sources and other component such as caller.
type MetricNamer interface {
// Used for datasource provider, data source provider call QueryBuilder
QueryBuilder() querybuilder.QueryBuilder
// Used for predictor now
BuildUniqueKey() string

Validate() error

// Means the caller of this MetricNamer, different caller maybe use the same metric
Caller() string
}

var _ MetricNamer = &GeneralMetricNamer{}

type GeneralMetricNamer struct {
Metric *metricquery.Metric
Metric *metricquery.Metric
CallerName string
}

func (gmn *GeneralMetricNamer) Caller() string {
return gmn.CallerName
}

func (gmn *GeneralMetricNamer) QueryBuilder() querybuilder.QueryBuilder {
return NewQueryBuilder(gmn.Metric)
}

func (gmn *GeneralMetricNamer) BuildUniqueKey() string {
return gmn.Metric.BuildUniqueKey()
return gmn.CallerName + "/" + gmn.Metric.BuildUniqueKey()
}

func (gmn *GeneralMetricNamer) Validate() error {
Expand Down
18 changes: 9 additions & 9 deletions pkg/metricquery/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ const (

var (
NotMatchWorkloadError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", WorkloadMetricType)
NotMatchContainerError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", ContainerMetricType)
NotMatchPodError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", PodMetricType)
NotMatchNodeError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", NodeMetricType)
NotMatchPromError = fmt.Errorf("metric type %v, but no WorkloadNamerInfo provided", PromQLMetricType)
NotMatchContainerError = fmt.Errorf("metric type %v, but no ContainerNamerInfo provided", ContainerMetricType)
NotMatchPodError = fmt.Errorf("metric type %v, but no PodNamerInfo provided", PodMetricType)
NotMatchNodeError = fmt.Errorf("metric type %v, but no NodeNamerInfo provided", NodeMetricType)
NotMatchPromError = fmt.Errorf("metric type %v, but no PromNamerInfo provided", PromQLMetricType)
)

type Metric struct {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (m *Metric) keyByWorkload() string {
m.Workload.APIVersion,
m.Workload.Namespace,
m.Workload.Name,
selectorStr}, "-")
selectorStr}, "_")
}

func (m *Metric) keyByContainer() string {
Expand All @@ -168,7 +168,7 @@ func (m *Metric) keyByContainer() string {
m.Container.WorkloadName,
m.Container.PodName,
m.Container.ContainerName,
selectorStr}, "-")
selectorStr}, "_")
}

func (m *Metric) keyByPod() string {
Expand All @@ -181,7 +181,7 @@ func (m *Metric) keyByPod() string {
strings.ToLower(m.MetricName),
m.Pod.Namespace,
m.Pod.Name,
selectorStr}, "-")
selectorStr}, "_")
}
func (m *Metric) keyByNode() string {
selectorStr := ""
Expand All @@ -192,7 +192,7 @@ func (m *Metric) keyByNode() string {
string(m.Type),
strings.ToLower(m.MetricName),
m.Node.Name,
selectorStr}, "-")
selectorStr}, "_")
}

func (m *Metric) keyByPromQL() string {
Expand All @@ -205,7 +205,7 @@ func (m *Metric) keyByPromQL() string {
m.Prom.Namespace,
strings.ToLower(m.MetricName),
m.Prom.QueryExpr,
selectorStr}, "-")
selectorStr}, "_")
}

// Query is used to do query for different data source. you can extends it with your data source query
Expand Down
13 changes: 13 additions & 0 deletions pkg/prediction/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,20 @@ type AlgorithmModelConfig struct {
UpdateInterval time.Duration
}

type ModelInitMode string

const (
// means recover or init the algorithm model directly from history datasource, this process may block because it is time consuming for data fetching & model gen
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure which mode is default?

Copy link
Contributor Author

@kitianFresh kitianFresh Apr 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sure which mode is default?

this param is specified by caller or user, if it is not specified, default is history mode, original logic

ModelInitModeHistory ModelInitMode = "history"
// means recover or init the algorithm model from real time datasource async, predictor can not do predicting before the data is accumulating to window length
// this is more safe to do some data accumulating and make the prediction data is robust.
ModelInitModeLazyTraining ModelInitMode = "lazytraining"
// means recover or init the model from a checkpoint, it can be restored directly and immediately to do predict.
ModelInitModeCheckpoint ModelInitMode = "checkpoint"
)

type Config struct {
InitMode *ModelInitMode
DSP *v1alpha1.DSP
Percentile *v1alpha1.Percentile
}
4 changes: 4 additions & 0 deletions pkg/prediction/dsp/prediction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type periodicSignalPrediction struct {
modelConfig config.AlgorithmModelConfig
}

func (p *periodicSignalPrediction) QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (prediction.Status, error) {
panic("implement me")
}

func NewPrediction(realtimeProvider providers.RealTime, historyProvider providers.History, mc config.AlgorithmModelConfig) prediction.Interface {
withCh, delCh := make(chan prediction.QueryExprWithCaller), make(chan prediction.QueryExprWithCaller)
return &periodicSignalPrediction{
Expand Down
4 changes: 4 additions & 0 deletions pkg/prediction/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
StatusNotStarted Status = "NotStarted"
StatusUnknown Status = "Unknown"
StatusDeleted Status = "Deleted"
// StatusInitializing means the prediction model is accumulating data until it satisfy the user specified time window such as 12h or 3d or 1w when use some real time data provider
// if support recover from checkpoint, then it maybe faster
StatusInitializing Status = "Initializing"
StatusExpired Status = "Expired"
)

type WithMetricEvent struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/prediction/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Interface interface {

DeleteQuery(metricNamer metricnaming.MetricNamer, caller string) error

// QueryPredictionStatus return the metricNamer prediction status. it is predictable only when it is ready
QueryPredictionStatus(ctx context.Context, metricNamer metricnaming.MetricNamer) (Status, error)

// QueryRealtimePredictedValues returns predicted values based on the specified query expression
QueryRealtimePredictedValues(ctx context.Context, metricNamer metricnaming.MetricNamer) ([]*common.TimeSeries, error)

Expand Down
7 changes: 7 additions & 0 deletions pkg/prediction/percentile/aggregate_signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type aggregateSignal struct {
lastSampleTime time.Time
minSampleWeight float64
totalSamplesCount int
sampleInterval time.Duration
creationTime time.Time
labels []common.Label
}
Expand All @@ -30,10 +31,16 @@ func (a *aggregateSignal) addSample(sampleTime time.Time, sampleValue float64) {
a.totalSamplesCount++
}

// largest is 290 years, so it can not be overflow
func (a *aggregateSignal) GetAggregationWindowLength() time.Duration {
return time.Duration(a.totalSamplesCount) * a.sampleInterval
}

func newAggregateSignal(c *internalConfig) *aggregateSignal {
return &aggregateSignal{
histogram: vpa.NewHistogram(c.histogramOptions),
minSampleWeight: c.minSampleWeight,
creationTime: time.Now(),
sampleInterval: c.sampleInterval,
}
}
Loading