Skip to content

Commit

Permalink
add activation feature for cpu & memory scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Kun Woo Yoo <vbtkdpf148@gmail.com>
  • Loading branch information
kunwooy committed Sep 18, 2024
1 parent f6358d5 commit af57e63
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 56 deletions.
117 changes: 108 additions & 9 deletions pkg/scalers/cpu_memory_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package scalers
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"

"github.com/go-logr/logr"
Expand All @@ -11,24 +13,31 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/external_metrics"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

type cpuMemoryScaler struct {
metadata *cpuMemoryMetadata
resourceName v1.ResourceName
logger logr.Logger
client client.Client
}

type cpuMemoryMetadata struct {
Type v2.MetricTargetType
AverageValue *resource.Quantity
AverageUtilization *int32
ContainerName string
Type v2.MetricTargetType
AverageValue *resource.Quantity
AverageUtilization *int32
ContainerName string
ActivationAverageValue *resource.Quantity
ActivationAverageUtilization *int32
ScalableObjectName string
ScalableObjectType string
ScalableObjectNamespace string
}

// NewCPUMemoryScaler creates a new cpuMemoryScaler
func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig) (Scaler, error) {
func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig, client client.Client) (Scaler, error) {
logger := InitializeLogger(config, "cpu_memory_scaler")

meta, parseErr := parseResourceMetadata(config, logger)
Expand All @@ -40,12 +49,13 @@ func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.Scal
metadata: meta,
resourceName: resourceName,
logger: logger,
client: client,
}, nil
}

func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*cpuMemoryMetadata, error) {
meta := &cpuMemoryMetadata{}
var value string
var value, activationValue string
var ok bool
value, ok = config.TriggerMetadata["type"]
switch {
Expand All @@ -63,17 +73,31 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge
if value, ok = config.TriggerMetadata["value"]; !ok || value == "" {
return nil, fmt.Errorf("no value given")
}
if activationValue, ok = config.TriggerMetadata["activationValue"]; !ok || activationValue == "" {
activationValue = "0"
}

switch meta.Type {
case v2.AverageValueMetricType:
averageValueQuantity := resource.MustParse(value)
meta.AverageValue = &averageValueQuantity

activationValueQuantity := resource.MustParse(activationValue)
meta.ActivationAverageValue = &activationValueQuantity
case v2.UtilizationMetricType:
valueNum, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return nil, err
}
utilizationNum := int32(valueNum)
meta.AverageUtilization = &utilizationNum

valueNum, err = strconv.ParseInt(activationValue, 10, 32)
if err != nil {
return nil, err
}
activationAverageUtilization := int32(valueNum)
meta.ActivationAverageUtilization = &activationAverageUtilization
default:
return nil, fmt.Errorf("unsupported metric type, allowed values are 'Utilization' or 'AverageValue'")
}
Expand All @@ -82,6 +106,10 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge
meta.ContainerName = value
}

meta.ScalableObjectName = config.ScalableObjectName
meta.ScalableObjectNamespace = config.ScalableObjectNamespace
meta.ScalableObjectType = config.ScalableObjectType

return meta, nil
}

Expand All @@ -90,6 +118,42 @@ func (s *cpuMemoryScaler) Close(context.Context) error {
return nil
}

func (s *cpuMemoryScaler) getHPA(ctx context.Context) (*v2.HorizontalPodAutoscaler, error) {
if s.metadata.ScalableObjectType == "ScaledObject" {
scaledObject := &kedav1alpha1.ScaledObject{}
err := s.client.Get(ctx, types.NamespacedName{
Name: s.metadata.ScalableObjectName,
Namespace: s.metadata.ScalableObjectNamespace,
}, scaledObject)

if err != nil {
return nil, err
}

hpa := &v2.HorizontalPodAutoscaler{}
err = s.client.Get(ctx, types.NamespacedName{
Name: scaledObject.Status.HpaName,
Namespace: s.metadata.ScalableObjectNamespace,
}, hpa)

if err != nil {
return nil, err
}

return hpa, nil
} else if s.metadata.ScalableObjectType == "ScaledJob" {
scaledJob := &kedav1alpha1.ScaledJob{}
err := s.client.Get(ctx, types.NamespacedName{
Name: s.metadata.ScalableObjectName,
Namespace: s.metadata.ScalableObjectNamespace,
}, scaledJob)

return nil, err
}

return nil, fmt.Errorf("invalid scalable object type: %s", s.metadata.ScalableObjectType)
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *cpuMemoryScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricSpec v2.MetricSpec
Expand Down Expand Up @@ -120,7 +184,42 @@ func (s *cpuMemoryScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSp
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity no need for cpu/memory scaler and always active for cpu/memory scaler
func (s *cpuMemoryScaler) GetMetricsAndActivity(_ context.Context, _ string) ([]external_metrics.ExternalMetricValue, bool, error) {
return nil, true, nil
// GetMetricsAndActivity only returns the activity of the cpu/memory scaler
func (s *cpuMemoryScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
hpa, err := s.getHPA(ctx)
if err != nil {
return nil, false, err
}

if hpa == nil {
return nil, false, fmt.Errorf("HPA not found")
}

for _, metric := range hpa.Status.CurrentMetrics {
if metric.Resource == nil {
continue
}

if string(metric.Resource.Name) != metricName {
continue
}

if s.metadata.Type == v2.AverageValueMetricType {
averageValue := metric.Resource.Current.AverageValue
if averageValue == nil {
return nil, false, fmt.Errorf("HPA has no average value")
}

return nil, averageValue.Cmp(*s.metadata.ActivationAverageValue) == 1, nil
} else if s.metadata.Type == v2.UtilizationMetricType {
averageUtilization := metric.Resource.Current.AverageUtilization
if averageUtilization == nil {
return nil, false, fmt.Errorf("HPA has no average utilization")
}

return nil, *averageUtilization > *s.metadata.ActivationAverageUtilization, nil
}
}

return nil, false, fmt.Errorf("no matching resource metric found for %s", s.resourceName)
}
104 changes: 59 additions & 45 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package scaling

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -748,60 +749,73 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler,
}

for _, spec := range metricSpecs {
if spec.External == nil {
continue
}
if spec.Resource != nil {
metricName := spec.Resource.Name.String()
_, isMetricActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName)
if err != nil {
result.Err = err
logger.Error(err, "error getting metric source", "source", result.TriggerName, "metricName", metricName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error())
continue
}

metricName := spec.External.Metric.Name
metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err)
metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive)
result.IsActive = isMetricActive
} else if spec.External != nil {
metricName := spec.External.Metric.Name

var latency time.Duration
metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName)
metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err)
if latency != -1 {
metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency)
}
result.Metrics = append(result.Metrics, metrics...)
logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err)

if scalerConfig.TriggerUseCachedMetrics {
result.Records[metricName] = metricscache.MetricsRecord{
IsActive: isMetricActive,
Metric: metrics,
ScalerError: err,
var latency time.Duration
metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName)
metricscollector.RecordScalerError(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, err)
if latency != -1 {
metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, latency)
}
result.Metrics = append(result.Metrics, metrics...)
logger.V(1).Info("Getting metrics and activity from scaler", "scaler", result.TriggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err)

if scalerConfig.TriggerUseCachedMetrics {
result.Records[metricName] = metricscache.MetricsRecord{
IsActive: isMetricActive,
Metric: metrics,
ScalerError: err,
}
}
}

if err != nil {
result.Err = err
if scaledObject.IsUsingModifiers() {
logger.Error(err, "error getting metric source", "source", result.TriggerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error())
if err != nil {
result.Err = err
if scaledObject.IsUsingModifiers() {
logger.Error(err, "error getting metric source", "source", result.TriggerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error())
} else {
logger.Error(err, "error getting scale decision", "scaler", result.TriggerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}
} else {
logger.Error(err, "error getting scale decision", "scaler", result.TriggerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
}
} else {
result.IsActive = isMetricActive
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metric.MetricName, true, metricValue)
}
if !scaledObject.IsUsingModifiers() {
if isMetricActive {
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", metricName)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", spec.Resource.Name)
result.IsActive = isMetricActive
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metric.MetricName, true, metricValue)
}
if !scaledObject.IsUsingModifiers() {
if isMetricActive {
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", metricName)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", result.TriggerName, "metricName", spec.Resource.Name)
}
}
metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive)
}
metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, result.TriggerName, triggerIndex, metricName, true, isMetricActive)
}
}

result.Pairs, err = modifiers.GetPairTriggerAndMetric(scaledObject, metricName, scalerConfig.TriggerName)
if err != nil {
logger.Error(err, "error pairing triggers & metrics for compositeScaler")
result.Pairs, err = modifiers.GetPairTriggerAndMetric(scaledObject, metricName, scalerConfig.TriggerName)
if err != nil {
logger.Error(err, "error pairing triggers & metrics for compositeScaler")
}
} else {
logger.Error(errors.New("error parsing metric for the scaler"), "both resource and external metrics are nil", "scaler", result.TriggerName)
}
}
return result
Expand Down
4 changes: 2 additions & 2 deletions pkg/scaling/scalers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
case "couchdb":
return scalers.NewCouchDBScaler(ctx, config)
case "cpu":
return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config)
return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config, client)
case "cron":
return scalers.NewCronScaler(config)
case "datadog":
Expand Down Expand Up @@ -202,7 +202,7 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
case "loki":
return scalers.NewLokiScaler(config)
case "memory":
return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config)
return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config, client)
case "metrics-api":
return scalers.NewMetricsAPIScaler(config)
case "mongodb":
Expand Down

0 comments on commit af57e63

Please sign in to comment.