Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new Datadog scaler #2354

Merged
merged 16 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

- Add New Relic Scaler ([#2387](https://github.com/kedacore/keda/pull/2387))
- Add ActiveMQ Scaler ([#2305](https://github.com/kedacore/keda/pull/2305))
- Add New Datadog Scaler ([#2354](https://github.com/kedacore/keda/pull/2354))

### Improvements

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-autorest/autorest v0.11.22
github.com/Azure/go-autorest/autorest/azure/auth v0.5.9
github.com/DataDog/datadog-api-client-go v1.6.0 // indirect
github.com/Huawei/gophercloud v1.0.21
github.com/Shopify/sarama v1.30.0
github.com/aws/aws-sdk-go v1.42.16
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-api-client-go v1.6.0 h1:ccMzM4vw37/8ww9VKKydWMrI+xEs0uE13O5mkG9Ny/8=
github.com/DataDog/datadog-api-client-go v1.6.0/go.mod h1:QzaQF1cDO1/BIQG1fz14VrY+6RECUGkiwzDCtVbfP5c=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/Huawei/gophercloud v1.0.21 h1:HhtzZzRGZiVmLypqHlXrGAcdC1TJW99FLewfPSVktpY=
github.com/Huawei/gophercloud v1.0.21/go.mod h1:TUtAO2PE+Nj7/QdfUXbhi5Xu0uFKVccyukPA7UCxD9w=
Expand Down
307 changes: 307 additions & 0 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
package scalers

import (
"context"
"fmt"
"strconv"
"strings"
"time"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

datadog "github.com/DataDog/datadog-api-client-go/api/v1/datadog"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type datadogScaler struct {
metadata *datadogMetadata
apiClient *datadog.APIClient
}

type valueType int

const (
average = iota
global
)

type datadogMetadata struct {
apiKey string
appKey string
datadogSite string
query string
queryValue int
vType valueType
metricName string
age int
}

var datadogLog = logf.Log.WithName("datadog_scaler")

// NewDatadogScaler creates a new Datadog scaler
func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
meta, err := parseDatadogMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing Datadog metadata: %s", err)
}

apiClient, err := newDatadogConnection(ctx, meta, config)
if err != nil {
return nil, fmt.Errorf("error establishing Datadog connection: %s", err)
}
return &datadogScaler{
metadata: meta,
apiClient: apiClient,
}, nil
}

func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
meta := datadogMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("queryValue parsing error %s", err.Error())
}
meta.queryValue = queryValue
} else {
return nil, fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["age"]; ok {
age, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("age parsing error %s", err.Error())
}
meta.age = age
} else {
meta.age = 90 // Default window 90 seconds
}

// For all the points in a given window, we take the rollup to the window size
rollup := fmt.Sprintf(".rollup(avg, %d)", meta.age)
meta.query += rollup

if val, ok := config.TriggerMetadata["type"]; ok {
val = strings.ToLower(val)
switch val {
case "average":
meta.vType = average
case "global":
meta.vType = global
default:
return nil, fmt.Errorf("type has to be global or average")
}
} else {
meta.vType = average // Default to average between pods
}

if val, ok := config.AuthParams["apiKey"]; ok {
meta.apiKey = val
} else {
return nil, fmt.Errorf("no api key given")
}

if val, ok := config.AuthParams["appKey"]; ok {
meta.appKey = val
} else {
return nil, fmt.Errorf("no app key given")
}

siteVal := "datadoghq.com"

if val, ok := config.AuthParams["datadogSite"]; ok && val != "" {
siteVal = val
}

meta.datadogSite = siteVal

metricName := meta.query[0:strings.Index(meta.query, "{")]
meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("datadog-%s", metricName)))

return &meta, nil
}

// newDatddogConnection tests a connection to the Datadog API
func newDatadogConnection(ctx context.Context, meta *datadogMetadata, config *ScalerConfig) (*datadog.APIClient, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: meta.apiKey,
},
"appKeyAuth": {
Key: meta.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": meta.datadogSite,
})

configuration := datadog.NewConfiguration()
configuration.HTTPClient = kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false)
apiClient := datadog.NewAPIClient(configuration)

_, _, err := apiClient.AuthenticationApi.Validate(ctx)
if err != nil {
return nil, fmt.Errorf("error connecting to Datadog API endpoint: %v", err)
}

return apiClient, nil
}

// No need to close connections
func (s *datadogScaler) Close(context.Context) error {
return nil
}

// IsActive returns true if we are able to get metrics from Datadog
func (s *datadogScaler) IsActive(ctx context.Context) (bool, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.metadata.apiKey,
},
"appKeyAuth": {
Key: s.metadata.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)

if err != nil {
return false, err
}

series := resp.GetSeries()

if len(series) == 0 {
return false, nil
}

points := series[0].GetPointlist()

if len(points) == 0 {
return false, nil
}

return true, nil
}

// getQueryResult returns result of the scaler query
func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.metadata.apiKey,
},
"appKeyAuth": {
Key: s.metadata.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)
if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

series := resp.GetSeries()

if len(series) == 0 {
return 0, fmt.Errorf("no Datadog metrics returned")
}

points := series[0].GetPointlist()

if len(points) == 0 || len(points[0]) < 2 {
return 0, fmt.Errorf("no Datadog metrics returned")
}

return int(*points[0][1]), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *datadogScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
var externalMetric *v2beta2.ExternalMetricSource
externalMetric = new(v2beta2.ExternalMetricSource)
zroubalik marked this conversation as resolved.
Show resolved Hide resolved

targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI)

switch s.metadata.vType {
case average:
externalMetric = &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
case global:
externalMetric = &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.ValueMetricType,
Value: targetQueryValue,
},
}
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult(ctx)
if err != nil {
datadogLog.Error(err, "error getting metrics from Datadog")
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error getting metrics from Datadog: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: s.metadata.metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
}
Loading