Skip to content

Commit

Permalink
e2e tests for graphite scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Brandon Pinske <brandon@pinske.info>
Signed-off-by: Brandon Pinske <brandon.pinske@crowdstrike.com>
  • Loading branch information
bpinske authored and Brandon Pinske committed Sep 13, 2021
1 parent ef60e92 commit c28f28f
Show file tree
Hide file tree
Showing 5 changed files with 563 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))

### Improvements

Expand Down
6 changes: 6 additions & 0 deletions CREATE-NEW-SCALER.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,9 @@ The constructor should have the following parameters:
## Lifecycle of a scaler

The scaler is created and closed everytime KEDA or HPA wants to call `GetMetrics`, and everytime a new ScaledObject is created or updated that has a trigger for that scaler. Thus, a developer of a scaler should not assume that the scaler will maintain any state between these calls.

## Note
The scaler code is embedded into the two separate binaries comprising KEDA, the operator and the custom metrics server component. The metrics server must be occasionally rebuilt published and deployed to k8s for it to have the same code as your operator.

GetMetricSpecForScaling() is executed by the operator for the purposes of scaling up to and down to 0 replicas.
GetMetrics() is executed by the custom metrics server in response to a calls against the external metrics api, whether by the HPA loop or by curl
79 changes: 42 additions & 37 deletions pkg/scalers/graphite_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ package scalers

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
url_pkg "net/url"
"strconv"
"strings"

"github.com/tidwall/gjson"
v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/kedacore/keda/v2/pkg/scalers/authentication"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

Expand All @@ -27,11 +25,12 @@ const (
grapMetricName = "metricName"
grapQuery = "query"
grapThreshold = "threshold"
grapqueryTime = "queryTime"
grapQueryTime = "queryTime"
)

type graphiteScaler struct {
metadata *graphiteMetadata
metadata *graphiteMetadata
httpClient *http.Client
}

type graphiteMetadata struct {
Expand All @@ -47,6 +46,12 @@ type graphiteMetadata struct {
password string // +optional
}

type grapQueryResult []struct {
Target string `json:"target"`
Tags map[string]interface{} `json:"tags"`
Datapoints [][]float64 `json:"datapoints"`
}

var graphiteLog = logf.Log.WithName("graphite_scaler")

// NewGraphiteScaler creates a new graphiteScaler
Expand All @@ -55,8 +60,12 @@ func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) {
if err != nil {
return nil, fmt.Errorf("error parsing graphite metadata: %s", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout)

return &graphiteScaler{
metadata: meta,
metadata: meta,
httpClient: httpClient,
}, nil
}

Expand All @@ -81,10 +90,10 @@ func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) {
return nil, fmt.Errorf("no %s given", grapMetricName)
}

if val, ok := config.TriggerMetadata[grapqueryTime]; ok && val != "" {
if val, ok := config.TriggerMetadata[grapQueryTime]; ok && val != "" {
meta.from = val
} else {
return nil, fmt.Errorf("no %s given", grapqueryTime)
return nil, fmt.Errorf("no %s given", grapQueryTime)
}

if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" {
Expand All @@ -96,31 +105,22 @@ func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) {
meta.threshold = t
}

authModes, ok := config.TriggerMetadata["authModes"]
_, ok := config.TriggerMetadata["authMode"]
// no authMode specified
if !ok {
return &meta, nil
}

authTypes := strings.Split(authModes, ",")
for _, t := range authTypes {
authType := authentication.Type(strings.TrimSpace(t))
switch authType {
case authentication.BasicAuthType:
if len(config.AuthParams["username"]) == 0 {
return nil, errors.New("no username given")
}

meta.username = config.AuthParams["username"]
// password is optional. For convenience, many application implement basic auth with
// username as apikey and password as empty
meta.password = config.AuthParams["password"]
meta.enableBasicAuth = true
default:
return nil, fmt.Errorf("err incorrect value for authMode is given: %s", t)
}
if len(config.AuthParams["username"]) == 0 {
return nil, errors.New("no username given")
}

meta.username = string(config.AuthParams["username"])
// password is optional. For convenience, many application implement basic auth with
// username as apikey and password as empty
meta.password = string(config.AuthParams["password"])
meta.enableBasicAuth = true

return &meta, nil
}

Expand Down Expand Up @@ -156,7 +156,6 @@ func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
}

func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
client := &http.Client{}
queryEscaped := url_pkg.QueryEscape(s.metadata.query)
url := fmt.Sprintf("%s/render?from=%s&target=%s&format=json", s.metadata.serverAddress, s.metadata.from, queryEscaped)
req, err := http.NewRequest("GET", url, nil)
Expand All @@ -166,7 +165,7 @@ func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
if s.metadata.enableBasicAuth {
req.SetBasicAuth(s.metadata.username, s.metadata.password)
}
r, err := client.Do(req)
r, err := s.httpClient.Do(req)
if err != nil {
return -1, err
}
Expand All @@ -177,16 +176,22 @@ func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) {
}
r.Body.Close()

result := gjson.GetBytes(b, "0.datapoints.#.0")
var v float64 = -1
for _, valur := range result.Array() {
if valur.String() != "" {
if float64(valur.Int()) > v {
v = float64(valur.Int())
}
}
var result grapQueryResult
err = json.Unmarshal(b, &result)
if err != nil {
return -1, err
}

if len(result) == 0 {
return 0, nil
} else if len(result) > 1 {
return -1, fmt.Errorf("graphite query %s returned multiple series", s.metadata.query)
}
return v, nil

// https://graphite-api.readthedocs.io/en/latest/api.html#json
datapoint := result[0].Datapoints[0][0]

return datapoint, nil
}

func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
Expand Down
Loading

0 comments on commit c28f28f

Please sign in to comment.