diff --git a/CHANGELOG.md b/CHANGELOG.md index 47a139e13da..d278c433fc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) - ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016)) +- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092)) ### Improvements diff --git a/CREATE-NEW-SCALER.md b/CREATE-NEW-SCALER.md index 0383438f54b..b3eb1841bf6 100644 --- a/CREATE-NEW-SCALER.md +++ b/CREATE-NEW-SCALER.md @@ -69,3 +69,9 @@ The constructor should have the following parameters: ## Lifecycle of a scaler The scaler is created and closed everytime KEDA or HPA wants to call `GetMetrics`, and everytime a new ScaledObject is created or updated that has a trigger for that scaler. Thus, a developer of a scaler should not assume that the scaler will maintain any state between these calls. + +## Note +The scaler code is embedded into the two separate binaries comprising KEDA, the operator and the custom metrics server component. The metrics server must be occasionally rebuilt published and deployed to k8s for it to have the same code as your operator. + +GetMetricSpecForScaling() is executed by the operator for the purposes of scaling up to and down to 0 replicas. +GetMetrics() is executed by the custom metrics server in response to a calls against the external metrics api, whether by the HPA loop or by curl diff --git a/pkg/scalers/graphite_scaler.go b/pkg/scalers/graphite_scaler.go index ea5471edb3c..58bc52c671e 100644 --- a/pkg/scalers/graphite_scaler.go +++ b/pkg/scalers/graphite_scaler.go @@ -2,15 +2,14 @@ package scalers import ( "context" + "encoding/json" "errors" "fmt" "io/ioutil" "net/http" url_pkg "net/url" "strconv" - "strings" - "github.com/tidwall/gjson" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,7 +17,6 @@ import ( "k8s.io/metrics/pkg/apis/external_metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" - "github.com/kedacore/keda/v2/pkg/scalers/authentication" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -27,11 +25,12 @@ const ( grapMetricName = "metricName" grapQuery = "query" grapThreshold = "threshold" - grapqueryTime = "queryTime" + grapQueryTime = "queryTime" ) type graphiteScaler struct { - metadata *graphiteMetadata + metadata *graphiteMetadata + httpClient *http.Client } type graphiteMetadata struct { @@ -47,6 +46,12 @@ type graphiteMetadata struct { password string // +optional } +type grapQueryResult []struct { + Target string `json:"target"` + Tags map[string]interface{} `json:"tags"` + Datapoints [][]float64 `json:"datapoints"` +} + var graphiteLog = logf.Log.WithName("graphite_scaler") // NewGraphiteScaler creates a new graphiteScaler @@ -55,8 +60,12 @@ func NewGraphiteScaler(config *ScalerConfig) (Scaler, error) { if err != nil { return nil, fmt.Errorf("error parsing graphite metadata: %s", err) } + + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout) + return &graphiteScaler{ - metadata: meta, + metadata: meta, + httpClient: httpClient, }, nil } @@ -81,10 +90,10 @@ func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) { return nil, fmt.Errorf("no %s given", grapMetricName) } - if val, ok := config.TriggerMetadata[grapqueryTime]; ok && val != "" { + if val, ok := config.TriggerMetadata[grapQueryTime]; ok && val != "" { meta.from = val } else { - return nil, fmt.Errorf("no %s given", grapqueryTime) + return nil, fmt.Errorf("no %s given", grapQueryTime) } if val, ok := config.TriggerMetadata[grapThreshold]; ok && val != "" { @@ -96,31 +105,22 @@ func parseGraphiteMetadata(config *ScalerConfig) (*graphiteMetadata, error) { meta.threshold = t } - authModes, ok := config.TriggerMetadata["authModes"] + _, ok := config.TriggerMetadata["authMode"] // no authMode specified if !ok { return &meta, nil } - authTypes := strings.Split(authModes, ",") - for _, t := range authTypes { - authType := authentication.Type(strings.TrimSpace(t)) - switch authType { - case authentication.BasicAuthType: - if len(config.AuthParams["username"]) == 0 { - return nil, errors.New("no username given") - } - - meta.username = config.AuthParams["username"] - // password is optional. For convenience, many application implement basic auth with - // username as apikey and password as empty - meta.password = config.AuthParams["password"] - meta.enableBasicAuth = true - default: - return nil, fmt.Errorf("err incorrect value for authMode is given: %s", t) - } + if len(config.AuthParams["username"]) == 0 { + return nil, errors.New("no username given") } + meta.username = string(config.AuthParams["username"]) + // password is optional. For convenience, many application implement basic auth with + // username as apikey and password as empty + meta.password = string(config.AuthParams["password"]) + meta.enableBasicAuth = true + return &meta, nil } @@ -156,7 +156,6 @@ func (s *graphiteScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { } func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) { - client := &http.Client{} queryEscaped := url_pkg.QueryEscape(s.metadata.query) url := fmt.Sprintf("%s/render?from=%s&target=%s&format=json", s.metadata.serverAddress, s.metadata.from, queryEscaped) req, err := http.NewRequest("GET", url, nil) @@ -166,7 +165,7 @@ func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) { if s.metadata.enableBasicAuth { req.SetBasicAuth(s.metadata.username, s.metadata.password) } - r, err := client.Do(req) + r, err := s.httpClient.Do(req) if err != nil { return -1, err } @@ -177,16 +176,22 @@ func (s *graphiteScaler) ExecuteGrapQuery() (float64, error) { } r.Body.Close() - result := gjson.GetBytes(b, "0.datapoints.#.0") - var v float64 = -1 - for _, valur := range result.Array() { - if valur.String() != "" { - if float64(valur.Int()) > v { - v = float64(valur.Int()) - } - } + var result grapQueryResult + err = json.Unmarshal(b, &result) + if err != nil { + return -1, err + } + + if len(result) == 0 { + return 0, nil + } else if len(result) > 1 { + return -1, fmt.Errorf("graphite query %s returned multiple series", s.metadata.query) } - return v, nil + + // https://graphite-api.readthedocs.io/en/latest/api.html#json + datapoint := result[0].Datapoints[0][0] + + return datapoint, nil } func (s *graphiteScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { diff --git a/tests/scalers/graphite-deployment.yaml b/tests/scalers/graphite-deployment.yaml new file mode 100644 index 00000000000..d0de94ea9d1 --- /dev/null +++ b/tests/scalers/graphite-deployment.yaml @@ -0,0 +1,359 @@ +--- +# Source: graphite/templates/configmap-statsd.yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: graphite-statsd-configmap + labels: + app.kubernetes.io/name: graphite + helm.sh/chart: graphite-0.7.2 + app.kubernetes.io/instance: RELEASE-NAME + app.kubernetes.io/managed-by: Helm +data: + config_tcp.js: |- + { + "graphiteHost": "127.0.0.1", + "graphitePort": 2003, + "port": 8125, + "flushInterval": 10000, + "servers": [{ + "server": "./servers/tcp", + "address": "0.0.0.0", + "port": 8125 + }] + } + config_udp.js: |- + { + "graphiteHost": "127.0.0.1", + "graphitePort": 2003, + "port": 8125, + "flushInterval": 10000, + "servers": [{ + "server": "./servers/udp", + "address": "0.0.0.0", + "port": 8125 + }] + } +--- +# Source: graphite/templates/configmap.yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: graphite-configmap + labels: + app: graphite + chart: graphite-0.7.2 + release: RELEASE-NAME + heritage: Helm +data: + aggregation-rules.conf: |- + + carbon.conf: |- + [cache] + DATABASE = whisper + ENABLE_LOGROTATION = True + USER = + MAX_CACHE_SIZE = inf + MAX_UPDATES_PER_SECOND = 500 + MAX_CREATES_PER_MINUTE = 50 + MIN_TIMESTAMP_RESOLUTION = 1 + LINE_RECEIVER_INTERFACE = 0.0.0.0 + LINE_RECEIVER_PORT = 2003 + ENABLE_UDP_LISTENER = False + UDP_RECEIVER_INTERFACE = 0.0.0.0 + UDP_RECEIVER_PORT = 2003 + PICKLE_RECEIVER_INTERFACE = 0.0.0.0 + PICKLE_RECEIVER_PORT = 2004 + USE_INSECURE_UNPICKLER = False + CACHE_QUERY_INTERFACE = 0.0.0.0 + CACHE_QUERY_PORT = 7002 + USE_FLOW_CONTROL = True + LOG_UPDATES = False + LOG_CREATES = False + LOG_CACHE_HITS = False + LOG_CACHE_QUEUE_SORTS = False + CACHE_WRITE_STRATEGY = sorted + WHISPER_AUTOFLUSH = False + WHISPER_FALLOCATE_CREATE = True + CARBON_METRIC_INTERVAL = 10 + + GRAPHITE_URL = http://127.0.0.1:8080 + + [relay] + LINE_RECEIVER_INTERFACE = 0.0.0.0 + LINE_RECEIVER_PORT = 2013 + PICKLE_RECEIVER_INTERFACE = 0.0.0.0 + PICKLE_RECEIVER_PORT = 2014 + + RELAY_METHOD = rules + REPLICATION_FACTOR = 1 + DESTINATIONS = 127.0.0.1:2004 + MAX_QUEUE_SIZE = 10000 + MAX_DATAPOINTS_PER_MESSAGE = 500 + QUEUE_LOW_WATERMARK_PCT = 0.8 + TIME_TO_DEFER_SENDING = 0.0001 + USE_FLOW_CONTROL = True + CARBON_METRIC_INTERVAL = 10 + USE_RATIO_RESET=False + MIN_RESET_STAT_FLOW=1000 + MIN_RESET_RATIO=0.9 + MIN_RESET_INTERVAL=121 + + [aggregator] + LINE_RECEIVER_INTERFACE = 0.0.0.0 + LINE_RECEIVER_PORT = 2023 + + PICKLE_RECEIVER_INTERFACE = 0.0.0.0 + PICKLE_RECEIVER_PORT = 2024 + + # If set true, metric received will be forwarded to DESTINATIONS in addition to + # the output of the aggregation rules. If set false the carbon-aggregator will + # only ever send the output of aggregation. + FORWARD_ALL = True + DESTINATIONS = 127.0.0.1:2004 + REPLICATION_FACTOR = 1 + MAX_QUEUE_SIZE = 10000 + USE_FLOW_CONTROL = True + MAX_DATAPOINTS_PER_MESSAGE = 500 + MAX_AGGREGATION_INTERVALS = 5 + CARBON_METRIC_INTERVAL = 10 + dashboard.conf: |- + # This configuration file controls the behavior of the Dashboard UI, available + # at http://my-graphite-server/dashboard/. + # + # This file must contain a [ui] section that defines values for all of the + # following settings. + [ui] + default_graph_width = 400 + default_graph_height = 250 + automatic_variants = true + refresh_interval = 60 + autocomplete_delay = 375 + merge_hover_delay = 750 + + # You can set this 'default', 'white', or a custom theme name. + # To create a custom theme, copy the dashboard-default.css file + # to dashboard-myThemeName.css in the content/css directory and + # modify it to your liking. + theme = default + + [keyboard-shortcuts] + toggle_toolbar = ctrl-z + toggle_metrics_panel = ctrl-space + erase_all_graphs = alt-x + save_dashboard = alt-s + completer_add_metrics = alt-enter + completer_del_metrics = alt-backspace + give_completer_focus = shift-space + graphTemplates.conf: |- + [default] + background = black + foreground = white + majorLine = white + minorLine = grey + lineColors = blue,green,red,purple,brown,yellow,aqua,grey,magenta,pink,gold,rose + fontName = Sans + fontSize = 10 + fontBold = False + fontItalic = False + + [noc] + background = black + foreground = white + majorLine = white + minorLine = grey + lineColors = blue,green,red,yellow,purple,brown,aqua,grey,magenta,pink,gold,rose + fontName = Sans + fontSize = 10 + fontBold = False + fontItalic = False + + [plain] + background = white + foreground = black + minorLine = grey + majorLine = rose + + [summary] + background = black + lineColors = #6666ff, #66ff66, #ff6666 + + [alphas] + background = white + foreground = black + majorLine = grey + minorLine = rose + lineColors = 00ff00aa,ff000077,00337799 + graphite.wsgi.example: |- + import sys + sys.path.append('/opt/graphite/webapp') + + from graphite.wsgi import application + relay-rules.conf: |- + [default] + default = true + destinations = 0.0.0.0:2004 + rewrite-rules.conf: |- + + storage-aggregation.conf: |- + [min] + pattern = \.lower$ + xFilesFactor = 0.1 + aggregationMethod = min + + [max] + pattern = \.upper(_\d+)?$ + xFilesFactor = 0.1 + aggregationMethod = max + + [sum] + pattern = \.sum$ + xFilesFactor = 0 + aggregationMethod = sum + + [count] + pattern = \.count$ + xFilesFactor = 0 + aggregationMethod = sum + + [count_legacy] + pattern = ^stats_counts.* + xFilesFactor = 0 + aggregationMethod = sum + + [default_average] + pattern = .* + xFilesFactor = 0.3 + aggregationMethod = average + storage-schemas.conf: |- + [carbon] + pattern = ^carbon\. + retentions = 10s:6h,1m:90d + + [default_1min_for_1day] + pattern = .* + retentions = 10s:6h,1m:6d,10m:1800d + +--- +# Source: graphite/templates/service.yaml +apiVersion: v1 +kind: Service +metadata: + name: graphite + labels: + app.kubernetes.io/name: graphite + helm.sh/chart: graphite-0.7.2 + app.kubernetes.io/instance: RELEASE-NAME + app.kubernetes.io/managed-by: Helm +spec: + type: ClusterIP + ports: + - name: graphite-pickle + port: 2004 + protocol: TCP + - name: graphite-plain + port: 2003 + protocol: TCP + - name: graphite-udp + port: 2003 + protocol: UDP + - name: graphite-gui + port: 8080 + protocol: TCP + - name: aggregate-plain + port: 2023 + protocol: TCP + - name: aggregate-pickl + port: 2024 + protocol: TCP + - name: statsd + port: 8125 + protocol: UDP + - name: statsd-admin + port: 8126 + protocol: TCP + selector: + app.kubernetes.io/name: graphite + app.kubernetes.io/instance: RELEASE-NAME +--- +# Source: graphite/templates/statefulset.yaml +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: graphite + labels: + app.kubernetes.io/name: graphite + helm.sh/chart: graphite-0.7.2 + app.kubernetes.io/instance: RELEASE-NAME + app.kubernetes.io/managed-by: Helm +spec: + updateStrategy: + type: RollingUpdate + selector: + matchLabels: + app.kubernetes.io/name: graphite + app.kubernetes.io/instance: RELEASE-NAME + serviceName: graphite + template: + metadata: + labels: + app.kubernetes.io/name: graphite + app.kubernetes.io/instance: RELEASE-NAME + spec: + containers: + - image: graphiteapp/graphite-statsd:1.1.7-6 + name: graphite + ports: + - name: graphite-gui + containerPort: 8080 + - name: graphite-plain + containerPort: 2003 + - name: graphite-udp + containerPort: 2003 + protocol: UDP + - name: graphite-pickle + containerPort: 2004 + - name: aggregate-plain + containerPort: 2023 + - name: aggregate-pickl + containerPort: 2024 + - name: statsd + protocol: UDP + containerPort: 8125 + - name: statsd-admin + containerPort: 8126 + env: + - name: "STATSD_INTERFACE" + value: udp + - name: "GRAPHITE_TIME_ZONE" + value: Etc/UTC + livenessProbe: + httpGet: + path: / + port: graphite-gui + readinessProbe: + httpGet: + path: / + port: graphite-gui + resources: + {} + volumeMounts: + - name: graphite-configmap + mountPath: /opt/graphite/conf/ + - name: graphite-statsd-configmap + subPath: config_tcp.js + mountPath: /opt/statsd/config/tcp.js + - name: graphite-statsd-configmap + subPath: config_udp.js + mountPath: /opt/statsd/config/udp.js + - name: graphite-pvc + mountPath: /opt/graphite/storage/ + volumes: + - name: graphite-configmap + configMap: + name: graphite-configmap + - name: graphite-statsd-configmap + configMap: + name: graphite-statsd-configmap + - name: graphite-pvc + emptyDir: {} diff --git a/tests/scalers/graphite.test.ts b/tests/scalers/graphite.test.ts new file mode 100644 index 00000000000..1ee7b13a9d6 --- /dev/null +++ b/tests/scalers/graphite.test.ts @@ -0,0 +1,155 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForRollout} from "./helpers"; + +const graphiteNamespace = 'graphite' +const graphiteDeploymentFile = 'scalers/graphite-deployment.yaml' + +test.before(t => { + // install graphite + sh.exec(`kubectl create namespace ${graphiteNamespace}`) + t.is(0, + sh.exec(`kubectl apply --namespace ${graphiteNamespace} -f ${graphiteDeploymentFile}`).code, + 'creating a Graphite deployment should work.' + ) + // wait for graphite to load + t.is(0, waitForRollout('statefulset', "graphite", graphiteNamespace)) + + sh.config.silent = true + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, deployYaml.replace('{{GRAPHITE_NAMESPACE}}', graphiteNamespace)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${graphiteNamespace}`).code, + 'creating graphite scaling test deployment should work.' + ) + for (let i = 0; i < 10; i++) { + const readyReplicaCount = sh.exec(`kubectl get deployment php-apache-graphite --namespace ${graphiteNamespace} -o jsonpath="{.status.readyReplicas}`).stdout + if (readyReplicaCount != '1') { + sh.exec('sleep 2s') + } + } +}) + +test.serial('Deployment should have 0 replica on start', t => { + const replicaCount = sh.exec( + `kubectl get deployment php-apache-graphite --namespace ${graphiteNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial(`Deployment should scale to 5 (the max) with HTTP Requests exceeding in the rate then back to 0`, t => { + const tmpFile = tmp.fileSync() + t.log(tmpFile.name) + + fs.writeFileSync(tmpFile.name, generateRequestsYaml.replace('{{GRAPHITE_NAMESPACE}}', graphiteNamespace)) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${graphiteNamespace}`).code, + 'creating job should work.' + ) + + // keda based deployment should start scaling up with http requests issued + let replicaCount = '0' + for (let i = 0; i < 60 && replicaCount !== '5'; i++) { + t.log(`Waited ${5 * i} seconds for graphite-based deployments to scale up`) + const jobLogs = sh.exec(`kubectl logs -l job-name=generate-graphite-metrics -n ${graphiteNamespace}`).stdout + t.log(`Logs from the generate requests: ${jobLogs}`) + + replicaCount = sh.exec( + `kubectl get deployment php-apache-graphite --namespace ${graphiteNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== '5') { + sh.exec('sleep 5s') + } + } + + t.is('5', replicaCount, 'Replica count should be maxed at 5') + + for (let i = 0; i < 50 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment php-apache-graphite --namespace ${graphiteNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== '0') { + sh.exec('sleep 5s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') +}) + +test.after.always.cb('clean up graphite deployment', t => { + const resources = [ + 'scaledobject.keda.sh/graphite-scaledobject', + 'deployment.apps/php-apache-graphite', + 'job/generate-graphite-metrics', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${graphiteNamespace}`) + } + + // uninstall graphite + sh.exec(`kubectl delete --namespace ${graphiteNamespace} -f ${graphiteDeploymentFile}`) + sh.exec(`kubectl delete namespace ${graphiteNamespace}`) + + t.end() +}) + +const deployYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: php-apache-graphite +spec: + selector: + matchLabels: + run: php-apache-graphite + replicas: 0 + template: + metadata: + labels: + run: php-apache-graphite + spec: + containers: + - name: php-apache-graphite + image: k8s.gcr.io/hpa-example + ports: + - containerPort: 80 +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: graphite-scaledobject +spec: + scaleTargetRef: + name: php-apache-graphite + minReplicaCount: 0 + maxReplicaCount: 5 + pollingInterval: 5 + cooldownPeriod: 10 + triggers: + - type: graphite + metadata: + serverAddress: http://graphite.{{GRAPHITE_NAMESPACE}}.svc:8080 + metricName: https_metric + threshold: '100' + query: "https_metric" + queryTime: '-10Seconds'` + +const generateRequestsYaml = `apiVersion: batch/v1 +kind: Job +metadata: + name: generate-graphite-metrics +spec: + template: + spec: + containers: + - image: busybox + name: generate-graphite-metrics + command: ["/bin/sh"] + args: ["-c", "for i in $(seq 1 60);do echo $i; echo \\"https_metric 1000 $(date +%s)\\" | nc graphite.{{GRAPHITE_NAMESPACE}}.svc 2003; echo 'data sent :)'; sleep 1; done"] + restartPolicy: Never + activeDeadlineSeconds: 120 + backoffLimit: 2`