diff --git a/CHANGELOG.md b/CHANGELOG.md index 040523c9e67..bf513d02ccf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ - Add `unsafeSsl` parameter in InfluxDB scaler ([#2157](https://github.com/kedacore/keda/pull/2157)) - Improve metric name creation to be unique using scaler index inside the scaler ([#2161](https://github.com/kedacore/keda/pull/2161)) - Improve error message if `IdleReplicaCount` are equal to `MinReplicaCount` to be the same as the check ([#2212](https://github.com/kedacore/keda/pull/2212)) +- Improve Cloudwatch Scaler metric exporting logic ([#2243](https://github.com/kedacore/keda/pull/2243)) ### Breaking Changes diff --git a/pkg/scalers/aws_cloudwatch_scaler.go b/pkg/scalers/aws_cloudwatch_scaler.go index 5f1d9fc3da7..d8b3100d784 100644 --- a/pkg/scalers/aws_cloudwatch_scaler.go +++ b/pkg/scalers/aws_cloudwatch_scaler.go @@ -27,6 +27,7 @@ const ( defaultMetricCollectionTime = 300 defaultMetricStat = "Average" defaultMetricStatPeriod = 300 + defaultMetricEndTimeOffset = 0 ) type awsCloudwatchScaler struct { @@ -44,7 +45,9 @@ type awsCloudwatchMetadata struct { metricCollectionTime int64 metricStat string + metricUnit string metricStatPeriod int64 + metricEndTimeOffset int64 awsRegion string @@ -67,44 +70,41 @@ func NewAwsCloudwatchScaler(config *ScalerConfig) (Scaler, error) { }, nil } -func parseMetricValues(config *ScalerConfig) (*awsCloudwatchMetadata, error) { - metricsMeta := awsCloudwatchMetadata{} - - if val, ok := config.TriggerMetadata["metricCollectionTime"]; ok && val != "" { - if n, ok := strconv.ParseInt(val, 10, 64); ok == nil { - metricsMeta.metricCollectionTime = n - } else { - return nil, fmt.Errorf("metricCollectionTime not a valid number") +func getIntMetadataValue(metadata map[string]string, key string, required bool, defaultValue int64) (int64, error) { + if val, ok := metadata[key]; ok && val != "" { + value, err := strconv.Atoi(val) + if err != nil { + return 0, fmt.Errorf("error parsing %s metadata: %v", key, err) } - } else { - metricsMeta.metricCollectionTime = defaultMetricCollectionTime + return int64(value), nil + } + + if required { + return 0, fmt.Errorf("metadata %s not given", key) } - if val, ok := config.TriggerMetadata["metricStatPeriod"]; ok && val != "" { - if n, ok := strconv.ParseInt(val, 10, 64); ok == nil { - metricsMeta.metricStatPeriod = n - } else { - return nil, fmt.Errorf("metricStatPeriod not a valid number") + return defaultValue, nil +} + +func getFloatMetadataValue(metadata map[string]string, key string, required bool, defaultValue float64) (float64, error) { + if val, ok := metadata[key]; ok && val != "" { + value, err := strconv.ParseFloat(val, 64) + if err != nil { + return 0, fmt.Errorf("error parsing %s metadata: %v", key, err) } - } else { - metricsMeta.metricStatPeriod = defaultMetricStatPeriod + return value, nil } - if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" { - metricsMeta.metricStat = val - } else { - metricsMeta.metricStat = defaultMetricStat + if required { + return 0, fmt.Errorf("metadata %s not given", key) } - return &metricsMeta, nil + return defaultValue, nil } func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, error) { - meta, err := parseMetricValues(config) - - if err != nil { - return nil, fmt.Errorf("an error occurred when the scaler tried to get the metrics values") - } + var err error + meta := awsCloudwatchMetadata{} if val, ok := config.TriggerMetadata["namespace"]; ok && val != "" { meta.namespace = val @@ -134,48 +134,50 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e return nil, fmt.Errorf("dimensionName and dimensionValue are not matching in size") } - if val, ok := config.TriggerMetadata["targetMetricValue"]; ok && val != "" { - targetMetricValue, err := strconv.ParseFloat(val, 64) - if err != nil { - cloudwatchLog.Error(err, "Error parsing targetMetricValue metadata") - } else { - meta.targetMetricValue = targetMetricValue - } - } else { - return nil, fmt.Errorf("target Metric Value not given") - } - - if val, ok := config.TriggerMetadata["minMetricValue"]; ok && val != "" { - minMetricValue, err := strconv.ParseFloat(val, 64) - if err != nil { - cloudwatchLog.Error(err, "Error parsing minMetricValue metadata") - } else { - meta.minMetricValue = minMetricValue - } - } else { - return nil, fmt.Errorf("min metric value not given") + meta.targetMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "targetMetricValue", true, 0) + if err != nil { + return nil, err } - if val, ok := config.TriggerMetadata["metricCollectionTime"]; ok && val != "" { - metricCollectionTime, err := strconv.Atoi(val) - if err != nil { - cloudwatchLog.Error(err, "Error parsing metricCollectionTime metadata") - } else { - meta.metricCollectionTime = int64(metricCollectionTime) - } + meta.minMetricValue, err = getFloatMetadataValue(config.TriggerMetadata, "minMetricValue", true, 0) + if err != nil { + return nil, err } + meta.metricStat = defaultMetricStat if val, ok := config.TriggerMetadata["metricStat"]; ok && val != "" { meta.metricStat = val } + if err = checkMetricStat(meta.metricStat); err != nil { + return nil, err + } - if val, ok := config.TriggerMetadata["metricStatPeriod"]; ok && val != "" { - metricStatPeriod, err := strconv.Atoi(val) - if err != nil { - cloudwatchLog.Error(err, "Error parsing metricStatPeriod metadata") - } else { - meta.metricStatPeriod = int64(metricStatPeriod) - } + meta.metricStatPeriod, err = getIntMetadataValue(config.TriggerMetadata, "metricStatPeriod", false, defaultMetricStatPeriod) + if err != nil { + return nil, err + } + + if err = checkMetricStatPeriod(meta.metricStatPeriod); err != nil { + return nil, err + } + + meta.metricCollectionTime, err = getIntMetadataValue(config.TriggerMetadata, "metricCollectionTime", false, defaultMetricCollectionTime) + if err != nil { + return nil, err + } + + if meta.metricCollectionTime < 0 || meta.metricCollectionTime%meta.metricStatPeriod != 0 { + return nil, fmt.Errorf("metricCollectionTime must be greater than 0 and a multiple of metricStatPeriod(%d), %d is given", meta.metricStatPeriod, meta.metricCollectionTime) + } + + meta.metricEndTimeOffset, err = getIntMetadataValue(config.TriggerMetadata, "metricEndTimeOffset", false, defaultMetricEndTimeOffset) + if err != nil { + return nil, err + } + + meta.metricUnit = config.TriggerMetadata["metricUnit"] + if err = checkMetricUnit(meta.metricUnit); err != nil { + return nil, err } if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { @@ -184,16 +186,54 @@ func parseAwsCloudwatchMetadata(config *ScalerConfig) (*awsCloudwatchMetadata, e return nil, fmt.Errorf("no awsRegion given") } - auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) + meta.awsAuthorization, err = getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) if err != nil { return nil, err } - meta.awsAuthorization = auth - meta.scalerIndex = config.ScalerIndex - return meta, nil + return &meta, nil +} + +func checkMetricStat(stat string) error { + for _, s := range cloudwatch.Statistic_Values() { + if stat == s { + return nil + } + } + return fmt.Errorf("metricStat '%s' is not one of %v", stat, cloudwatch.Statistic_Values()) +} + +func checkMetricUnit(unit string) error { + if unit == "" { + return nil + } + for _, u := range cloudwatch.StandardUnit_Values() { + if unit == u { + return nil + } + } + return fmt.Errorf("metricUnit '%s' is not one of %v", unit, cloudwatch.StandardUnit_Values()) +} + +func checkMetricStatPeriod(period int64) error { + if period < 1 { + return fmt.Errorf("metricStatPeriod can not be smaller than 1, however, %d is provided", period) + } else if period <= 60 { + switch period { + case 1, 5, 10, 30, 60: + return nil + default: + return fmt.Errorf("metricStatPeriod < 60 has to be one of [1, 5, 10, 30], however, %d is provided", period) + } + } + + if period%60 != 0 { + return fmt.Errorf("metricStatPeriod >= 60 has to be a multiple of 60, however, %d is provided", period) + } + + return nil } func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { @@ -273,9 +313,18 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) { }) } + endTime := time.Now().Add(time.Second * -1 * time.Duration(c.metadata.metricEndTimeOffset)).Truncate(time.Duration(c.metadata.metricStatPeriod) * time.Second) + startTime := endTime.Add(time.Second * -1 * time.Duration(c.metadata.metricCollectionTime)) + + var metricUnit *string + if c.metadata.metricUnit != "" { + metricUnit = aws.String(c.metadata.metricUnit) + } + input := cloudwatch.GetMetricDataInput{ - StartTime: aws.Time(time.Now().Add(time.Second * -1 * time.Duration(c.metadata.metricCollectionTime))), - EndTime: aws.Time(time.Now()), + StartTime: aws.Time(startTime), + EndTime: aws.Time(endTime), + ScanBy: aws.String(cloudwatch.ScanByTimestampDescending), MetricDataQueries: []*cloudwatch.MetricDataQuery{ { Id: aws.String("c1"), @@ -287,6 +336,7 @@ func (c *awsCloudwatchScaler) GetCloudwatchMetrics() (float64, error) { }, Period: aws.Int64(c.metadata.metricStatPeriod), Stat: aws.String(c.metadata.metricStat), + Unit: metricUnit, }, ReturnData: aws.Bool(true), }, diff --git a/pkg/scalers/aws_cloudwatch_test.go b/pkg/scalers/aws_cloudwatch_test.go index 2bfb1f2adaa..81f40a369d1 100644 --- a/pkg/scalers/aws_cloudwatch_test.go +++ b/pkg/scalers/aws_cloudwatch_test.go @@ -232,6 +232,81 @@ var testAWSCloudwatchMetadata = []parseAWSCloudwatchMetadataTestData{ "awsRegion": "eu-west-1"}, testAWSAuthentication, false, "Missing metricStatPeriod not generate error because will get the default value"}, + {map[string]string{ + "namespace": "AWS/SQS", + "dimensionName": "QueueName", + "dimensionValue": "keda", + "metricName": "ApproximateNumberOfMessagesVisible", + "targetMetricValue": "2", + "minMetricValue": "0", + "metricStat": "Average", + "metricUnit": "Count", + "metricEndTimeOffset": "60", + "awsRegion": "eu-west-1"}, + testAWSAuthentication, false, + "set a supported metricUnit"}, + {map[string]string{ + "namespace": "AWS/SQS", + "dimensionName": "QueueName", + "dimensionValue": "keda", + "metricName": "ApproximateNumberOfMessagesVisible", + "targetMetricValue": "2", + "minMetricValue": "0", + "metricCollectionTime": "300", + "metricStat": "SomeStat", + "awsRegion": "eu-west-1"}, + testAWSAuthentication, true, + "metricStat is not supported"}, + {map[string]string{ + "namespace": "AWS/SQS", + "dimensionName": "QueueName", + "dimensionValue": "keda", + "metricName": "ApproximateNumberOfMessagesVisible", + "targetMetricValue": "2", + "minMetricValue": "0", + "metricStatPeriod": "300", + "metricCollectionTime": "100", + "metricStat": "Average", + "awsRegion": "eu-west-1"}, + testAWSAuthentication, true, + "metricCollectionTime smaller than metricStatPeriod"}, + {map[string]string{ + "namespace": "AWS/SQS", + "dimensionName": "QueueName", + "dimensionValue": "keda", + "metricName": "ApproximateNumberOfMessagesVisible", + "targetMetricValue": "2", + "minMetricValue": "0", + "metricStatPeriod": "250", + "metricStat": "Average", + "awsRegion": "eu-west-1"}, + testAWSAuthentication, true, + "unsupported metricStatPeriod"}, + {map[string]string{ + "namespace": "AWS/SQS", + "dimensionName": "QueueName", + "dimensionValue": "keda", + "metricName": "ApproximateNumberOfMessagesVisible", + "targetMetricValue": "2", + "minMetricValue": "0", + "metricStatPeriod": "25", + "metricStat": "Average", + "awsRegion": "eu-west-1"}, + testAWSAuthentication, true, + "unsupported metricStatPeriod"}, + {map[string]string{ + "namespace": "AWS/SQS", + "dimensionName": "QueueName", + "dimensionValue": "keda", + "metricName": "ApproximateNumberOfMessagesVisible", + "targetMetricValue": "2", + "minMetricValue": "0", + "metricStatPeriod": "25", + "metricStat": "Average", + "metricUnit": "Hour", + "awsRegion": "eu-west-1"}, + testAWSAuthentication, true, + "unsupported metricUnit"}, } var awsCloudwatchMetricIdentifiers = []awsCloudwatchMetricIdentifier{