diff --git a/pkg/scalers/graphite_scaler.go b/pkg/scalers/graphite_scaler.go new file mode 100644 index 00000000000..ea5471edb3c --- /dev/null +++ b/pkg/scalers/graphite_scaler.go @@ -0,0 +1,206 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net/http" + url_pkg "net/url" + "strconv" + "strings" + + "github.com/tidwall/gjson" + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/kedacore/keda/v2/pkg/scalers/authentication" + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + grapServerAddress = "serverAddress" + grapMetricName = "metricName" + grapQuery = "query" + grapThreshold = "threshold" + grapqueryTime = "queryTime" +) + +type graphiteScaler struct { + metadata *graphiteMetadata +} + +type graphiteMetadata struct { + serverAddress string + metricName string + query string + threshold int + from string + + // basic auth + enableBasicAuth bool + username string + password string // +optional +} + +var graphiteLog = logf.Log.WithName("graphite_scaler") + +// NewGraphiteScaler creates a new graphiteScaler +func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseGraphiteMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing graphite metadata: %s", err) + } + return &graphiteScaler{ + metadata: meta, + }, nil +} + +func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) { + meta := graphiteMetadata{} + + if val, ok := config.TriggerMetadata[grapServerAddress]; ok && val != "" { + meta.serverAddress = val + } else { + return nil, fmt.Errorf("no %s given", grapServerAddress) + } + + if val, ok := config.TriggerMetadata[grapQuery]; ok && val != "" { + meta.query = val + } else { + return nil, fmt.Errorf("no %s given", grapQuery) + } + + if val, ok := config.TriggerMetadata[grapMetricName]; ok && val != "" { + meta.metricName = val + } else { + return nil, fmt.Errorf("no %s given", grapMetricName) + } + + if val, ok := config.TriggerMetadata[grapqueryTime]; ok && val != "" { + meta.from = val + } else { + return nil, fmt.Errorf("no %s given", grapqueryTime) + } + + if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" { + t, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("error parsing %s: %s", grapThreshold, err) + } + + meta.threshold = t + } + + authModes, ok := config.TriggerMetadata["authModes"] + // no authMode specified + if !ok { + return &meta, nil + } + + authTypes := strings.Split(authModes, ",") + for _, t := range authTypes { + authType := authentication.Type(strings.TrimSpace(t)) + switch authType { + case authentication.BasicAuthType: + if len(config.AuthParams["username"]) == 0 { + return nil, errors.New("no username given") + } + + meta.username = config.AuthParams["username"] + // password is optional. For convenience, many application implement basic auth with + // username as apikey and password as empty + meta.password = config.AuthParams["password"] + meta.enableBasicAuth = true + default: + return nil, fmt.Errorf("err incorrect value for authMode is given: %s", t) + } + } + + return &meta, nil +} + +func (s *graphiteScaler) IsActive(ctx context.Context) (bool, error) { + val, err := s.ExecuteGrapQuery() + if err != nil { + graphiteLog.Error(err, "error executing graphite query") + return false, err + } + + return val > 0, nil +} + +func (s *graphiteScaler) Close() error { + return nil +} + +func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "graphite", s.metadata.serverAddress, s.metadata.metricName)), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) { + client := &http.Client{} + queryEscaped := url_pkg.QueryEscape(s.metadata.query) + url := fmt.Sprintf("%s/render?from=%s&target=%s&format=json", s.metadata.serverAddress, s.metadata.from, queryEscaped) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return -1, err + } + if s.metadata.enableBasicAuth { + req.SetBasicAuth(s.metadata.username, s.metadata.password) + } + r, err := client.Do(req) + if err != nil { + return -1, err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return -1, err + } + r.Body.Close() + + result := gjson.GetBytes(b, "0.datapoints.#.0") + var v float64 = -1 + for _, valur := range result.Array() { + if valur.String() != "" { + if float64(valur.Int()) > v { + v = float64(valur.Int()) + } + } + } + return v, nil +} + +func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + val, err := s.ExecuteGrapQuery() + if err != nil { + graphiteLog.Error(err, "error executing graphite query") + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(val), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} diff --git a/pkg/scalers/graphite_scaler_test.go b/pkg/scalers/graphite_scaler_test.go new file mode 100644 index 00000000000..b51de1cb05a --- /dev/null +++ b/pkg/scalers/graphite_scaler_test.go @@ -0,0 +1,67 @@ +package scalers + +import ( + "testing" +) + +type parseGraphiteMetadataTestData struct { + metadata map[string]string + isError bool +} + +type graphiteMetricIdentifier struct { + metadataTestData *parseGraphiteMetadataTestData + name string +} + +var testGrapMetadata = []parseGraphiteMetadataTestData{ + {map[string]string{}, true}, + // all properly formed + {map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, false}, + // missing serverAddress + {map[string]string{"serverAddress": "", "grapmetricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true}, + // missing metricName + {map[string]string{"serverAddress": "http://localhost:81", "metricName": "", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true}, + // malformed threshold + {map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "one", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true}, + // missing query + {map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true}, + // missing queryTime + {map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "", "disableScaleToZero": "true"}, true}, + // all properly formed, default disableScaleToZero + {map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "queryTime": "-30Seconds", "query": "stats.counters.http.hello-world.request.count.count"}, false}, +} + +var graphiteMetricIdentifiers = []graphiteMetricIdentifier{ + {&testGrapMetadata[1], "graphite-http---localhost-81-request-count"}, +} + +func TestGraphiteParseMetadata(t *testing.T) { + for _, testData := range testGrapMetadata { + _, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestGraphiteGetMetricSpecForScaling(t *testing.T) { + for _, testData := range graphiteMetricIdentifiers { + meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockGraphiteScaler := graphiteScaler{ + metadata: meta, + } + + metricSpec := mockGraphiteScaler.GetMetricSpecForScaling() + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 1ddb78bf543..d14c5f7f089 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -436,6 +436,8 @@ func buildScaler(client client.Client, triggerType string, config *scalers.Scale return scalers.NewExternalPushScaler(config) case "gcp-pubsub": return scalers.NewPubSubScaler(config) + case "graphite": + return scalers.NewGraphiteScaler(config) case "huawei-cloudeye": return scalers.NewHuaweiCloudeyeScaler(config) case "ibmmq":