Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the Datadog scaler, including a new optional parameter metricUnavailableValue to fill data when no Datadog metric was returned #2694

Merged
merged 18 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

- **Azure Queue:** Don't call Azure queue GetProperties API unnecessarily ([#2613](https://github.com/kedacore/keda/pull/2613))
- **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625))
- **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657))
- **GCP Pubsub Scaler** Adding e2e test for GCP PubSub scaler ([#1528](https://github.com/kedacore/keda/issues/1528))
- **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **RabbitMQ Scaler:** Include `vhost` for RabbitMQ when retrieving queue info with `useRegex` ([#2498](https://github.com/kedacore/keda/issues/2498))
Expand Down
155 changes: 101 additions & 54 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -39,10 +40,20 @@ type datadogMetadata struct {
vType valueType
metricName string
age int
useFiller bool
fillValue float64
}

var datadogLog = logf.Log.WithName("datadog_scaler")

var aggregator, filter, rollup *regexp.Regexp

func init() {
aggregator = regexp.MustCompile(`^(avg|sum|min|max):.*`)
filter = regexp.MustCompile(`.*\{.*\}.*`)
rollup = regexp.MustCompile(`.*\.rollup\(([a-z]+,)?\s*(.+)\)`)
}

// NewDatadogScaler creates a new Datadog scaler
func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
meta, err := parseDatadogMetadata(config)
Expand All @@ -60,17 +71,65 @@ func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error)
}, nil
}

// parseAndTransformDatadogQuery checks correctness of the user query and adds rollup if not available
func parseAndTransformDatadogQuery(q string, age int) (string, error) {
// Queries should start with a valid aggregator. If not found, prepend avg as default
if !aggregator.MatchString(q) {
q = "avg:" + q
}

// Wellformed Datadog queries require a filter (between curly brackets)
if !filter.MatchString(q) {
return "", fmt.Errorf("malformed Datadog query")
}

// Queries can contain rollup functions.
match := rollup.FindStringSubmatch(q)
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
if match != nil {
// If added, check that the number of seconds is an int
rollupAgg, err := strconv.Atoi(match[2])
if err != nil {
return "", fmt.Errorf("malformed Datadog query")
}

if rollupAgg > age {
return "", fmt.Errorf("rollup cannot be bigger than time window")
}
} else { // If not added, use a default rollup based on the time window size
s := fmt.Sprintf(".rollup(avg, %d)", age/5)
q += s
}

return q, nil
}

func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
meta := datadogMetadata{}

if val, ok := config.TriggerMetadata["age"]; ok {
age, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("age parsing error %s", err.Error())
}
meta.age = age

if age < 60 {
datadogLog.Info("selecting a window smaller than 60 seconds can cause Datadog not finding a metric value for the query")
}
} else {
meta.age = 90 // Default window 90 seconds
}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
query, err := parseAndTransformDatadogQuery(val, meta.age)

if err != nil {
return nil, fmt.Errorf("error in query: %s", err.Error())
}
meta.query = query
} else {
return nil, fmt.Errorf("no query given")
}
if !strings.Contains(meta.query, "{") {
return nil, fmt.Errorf("expecting query to contain `{`, got %s", meta.query)
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
Expand All @@ -82,20 +141,15 @@ func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
return nil, fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["age"]; ok {
age, err := strconv.Atoi(val)
if val, ok := config.TriggerMetadata["metricUnavailableValue"]; ok {
fillValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("age parsing error %s", err.Error())
return nil, fmt.Errorf("metricUnavailableValue parsing error %s", err.Error())
}
meta.age = age
} else {
meta.age = 90 // Default window 90 seconds
meta.fillValue = fillValue
meta.useFiller = true
}

// For all the points in a given window, we take the rollup to the window size
rollup := fmt.Sprintf(".rollup(avg, %d)", meta.age)
meta.query += rollup

if val, ok := config.TriggerMetadata["type"]; ok {
val = strings.ToLower(val)
switch val {
Expand Down Expand Up @@ -174,50 +228,19 @@ func (s *datadogScaler) Close(context.Context) error {
return nil
}

// IsActive returns true if we are able to get metrics from Datadog
// IsActive checks whether the value returned by Datadog is higher than the target value
func (s *datadogScaler) IsActive(ctx context.Context) (bool, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.metadata.apiKey,
},
"appKeyAuth": {
Key: s.metadata.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)
num, err := s.getQueryResult(ctx)

if err != nil {
return false, err
}

series := resp.GetSeries()

if len(series) == 0 {
return false, nil
}

points := series[0].GetPointlist()

if len(points) == 0 {
return false, nil
}

return true, nil
return num > float64(s.metadata.queryValue), nil
}

// getQueryResult returns result of the scaler query
func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) {
func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
Expand All @@ -237,24 +260,48 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) {
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)
resp, r, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)

if r.StatusCode == 429 {
rateLimit := r.Header.Get("X-Ratelimit-Limit")
rateLimitReset := r.Header.Get("X-Ratelimit-Reset")

return -1, fmt.Errorf("your Datadog account reached the %s queries per hour rate limit, next limit reset will happen in %s seconds: %s", rateLimit, rateLimitReset, err)
}

if r.StatusCode != 200 {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

series := resp.GetSeries()

if len(series) > 1 {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series")
}

if len(series) == 0 {
return 0, fmt.Errorf("no Datadog metrics returned")
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}

points := series[0].GetPointlist()

if len(points) == 0 || len(points[0]) < 2 {
return 0, fmt.Errorf("no Datadog metrics returned")
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}

return int(*points[0][1]), nil
// Return the last point from the series
index := len(points) - 1
return *points[index][1], nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
Expand Down Expand Up @@ -301,7 +348,7 @@ func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metri

metric := external_metrics.ExternalMetricValue{
MetricName: s.metadata.metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Value: *resource.NewMilliQuantity(int64(num*1000), resource.DecimalSI),
Timestamp: metav1.Now(),
}

Expand Down
50 changes: 49 additions & 1 deletion pkg/scalers/datadog_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import (
"testing"
)

type datadogQueries struct {
input string
age int
output string
isError bool
}

type datadogMetricIdentifier struct {
metadataTestData *datadogAuthMetadataTestData
scalerIndex int
Expand All @@ -17,11 +24,48 @@ type datadogAuthMetadataTestData struct {
isError bool
}

var testParseQueries = []datadogQueries{
{"", 0, "", true},
// All properly formed
{"avg:system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},
{"sum:system.cpu.user{*}.rollup(30)", 30, "sum:system.cpu.user{*}.rollup(30)", false},
{"avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", 120, "avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", false},

// Default aggregator
{"system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},

// Default rollup
{"min:system.cpu.user{*}", 120, "min:system.cpu.user{*}.rollup(avg, 24)", false},

// Missing filter
{"min:system.cpu.user", 120, "", true},

// Malformed rollup
{"min:system.cpu.user{*}.rollup(avg)", 120, "", true},
}

func TestDatadogScalerParseQueries(t *testing.T) {
for _, testData := range testParseQueries {
output, err := parseAndTransformDatadogQuery(testData.input, testData.age)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}

if output != testData.output {
t.Errorf("Expected %s, got %s", testData.output, output)
}
}
}

var testDatadogMetadata = []datadogAuthMetadataTestData{
{map[string]string{}, map[string]string{}, true},

// all properly formed
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default age
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default type
Expand All @@ -32,6 +76,10 @@ var testDatadogMetadata = []datadogAuthMetadataTestData{
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong query value type
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "notanint", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// malformed query
{map[string]string{"query": "sum:trace.redis.command.hits", "queryValue": "7", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong unavailableMetricValue type
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "notafloat", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},

// success api/app keys
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
Expand Down
9 changes: 5 additions & 4 deletions tests/scalers/datadog.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many
// keda based deployment should start scaling up with http requests issued
let replicaCount = '1'
for (let i = 0; i < 60 && replicaCount !== '3'; i++) {
t.log(`Waited ${5 * i} seconds for nginx deployment to scale up`)
t.log(`Waited ${15 * i} seconds for nginx deployment to scale up`)

replicaCount = sh.exec(
`kubectl get deployment nginx --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
Expand All @@ -151,11 +151,12 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many
)

for (let i = 0; i < 50 && replicaCount !== '1'; i++) {
t.log(`Waited ${15 * i} seconds for nginx deployment to scale down`)
replicaCount = sh.exec(
`kubectl get deployment nginx --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
).stdout
if (replicaCount !== '1') {
sh.exec('sleep 5')
sh.exec('sleep 15')
}
}

Expand All @@ -164,6 +165,7 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many
})

test.after.always.cb('clean up datadog resources', t => {
sh.exec(`kubectl delete scaledobject -n ${testNamespace} --all`)
sh.exec(`helm repo rm datadog`)
sh.exec(`kubectl delete namespace ${datadogNamespace} --force`)
sh.exec(`kubectl delete namespace ${testNamespace} --force`)
Expand Down Expand Up @@ -282,7 +284,6 @@ spec:
name: nginx
minReplicaCount: 1
maxReplicaCount: 3
pollingInterval: 5
cooldownPeriod: 10
advanced:
horizontalPodAutoscalerConfig:
Expand All @@ -295,7 +296,7 @@ spec:
query: "avg:nginx.net.request_per_s{cluster_name:keda-datadog-cluster}"
queryValue: "2"
type: "global"
age: "60"
age: "120"
authenticationRef:
name: keda-trigger-auth-datadog-secret
`