Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide Graphite scaler #1749

Merged
merged 7 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions pkg/scalers/graphite_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package scalers

import (
"context"
"fmt"
"io/ioutil"
"net/http"
url_pkg "net/url"
"strconv"

"github.com/tidwall/gjson"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
grapServerAddress = "serverAddress"
grapMetricName = "metricName"
grapQuery = "query"
grapThreshold = "threshold"
grapqueryTime = "queryTime"
)

type graphiteScaler struct {
metadata *graphiteMetadata
}

type graphiteMetadata struct {
serverAddress string
metricName string
query string
threshold int
from string
}

var graphiteLog = logf.Log.WithName("graphite_scaler")

// NewGraphiteScaler creates a new graphiteScaler
func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseGraphiteMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing graphite metadata: %s", err)
}

return &graphiteScaler{
metadata: meta,
}, nil
}

func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) {
meta := graphiteMetadata{}

if val, ok := config.TriggerMetadata[grapServerAddress]; ok && val != "" {
meta.serverAddress = val
} else {
return nil, fmt.Errorf("no %s given", grapServerAddress)
}

if val, ok := config.TriggerMetadata[grapQuery]; ok && val != "" {
meta.query = val
} else {
return nil, fmt.Errorf("no %s given", grapQuery)
}

if val, ok := config.TriggerMetadata[grapMetricName]; ok && val != "" {
meta.metricName = val
} else {
return nil, fmt.Errorf("no %s given", grapMetricName)
}

if val, ok := config.TriggerMetadata[grapqueryTime]; ok && val != "" {
meta.from = val
} else {
return nil, fmt.Errorf("no %s given", grapqueryTime)
}

if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" {
t, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %s", grapThreshold, err)
}

meta.threshold = t
}

return &meta, nil
}

func (s *graphiteScaler) IsActive(ctx context.Context) (bool, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return false, err
}

return val > 0, nil
}

func (s *graphiteScaler) Close() error {
return nil
}

func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "graphite", s.metadata.serverAddress, s.metadata.metricName)),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
queryEscaped := url_pkg.QueryEscape(s.metadata.query)
url := fmt.Sprintf("%s/render?target=%s&format=json", s.metadata.serverAddress, queryEscaped)
r, err := http.Get(url)
if err != nil {
return -1, err
}

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return -1, err
}
r.Body.Close()

result := gjson.GetBytes(b, "0.datapoints.#.0")
var v float64 = -1
for _, valur := range result.Array() {
if valur.String() != "" {
if float64(valur.Int()) > v {
v = float64(valur.Int())
}
}
}
return v, nil
}

func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
val, err := s.ExecuteGrapQuery()
if err != nil {
graphiteLog.Error(err, "error executing graphite query")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(val), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
67 changes: 67 additions & 0 deletions pkg/scalers/graphite_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package scalers

import (
"testing"
)

type parseGraphiteMetadataTestData struct {
metadata map[string]string
isError bool
}

type graphiteMetricIdentifier struct {
metadataTestData *parseGraphiteMetadataTestData
name string
}

var testGrapMetadata = []parseGraphiteMetadataTestData{
{map[string]string{}, true},
// all properly formed
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, false},
// missing serverAddress
{map[string]string{"serverAddress": "", "grapmetricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing metricName
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// malformed threshold
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "one", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing query
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "", "queryTime": "-30Seconds", "disableScaleToZero": "true"}, true},
// missing queryTime
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "query": "stats.counters.http.hello-world.request.count.count", "queryTime": "", "disableScaleToZero": "true"}, true},
// all properly formed, default disableScaleToZero
{map[string]string{"serverAddress": "http://localhost:81", "metricName": "request-count", "threshold": "100", "queryTime": "-30Seconds", "query": "stats.counters.http.hello-world.request.count.count"}, false},
}

var graphiteMetricIdentifiers = []graphiteMetricIdentifier{
{&testGrapMetadata[1], "graphite-http---localhost-81-request-count"},
}

func TestGraphiteParseMetadata(t *testing.T) {
for _, testData := range testGrapMetadata {
_, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadata})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGraphiteGetMetricSpecForScaling(t *testing.T) {
for _, testData := range graphiteMetricIdentifiers {
meta, err := parseGraphiteMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGraphiteScaler := graphiteScaler{
metadata: meta,
}

metricSpec := mockGraphiteScaler.GetMetricSpecForScaling()
metricName := metricSpec[0].External.Metric.Name
if metricName != testData.name {
t.Error("Wrong External metric source name:", metricName)
}
}
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewExternalPushScaler(config)
case "gcp-pubsub":
return scalers.NewPubSubScaler(config)
case "graphite":
return scalers.NewGraphiteScaler(config)
case "huawei-cloudeye":
return scalers.NewHuaweiCloudeyeScaler(config)
case "ibmmq":
Expand Down