From 8a1b7b4357f6ffc0b7d9e903a4110618c38ea291 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Thu, 18 Nov 2021 14:56:37 +0100 Subject: [PATCH 01/23] adds an elasticsearch scaler based on search template Signed-off-by: Nicolas L --- go.mod | 1 + go.sum | 2 + pkg/scalers/elasticsearch_scaler.go | 276 ++++++++++++++++ pkg/scalers/elasticsearch_scaler_test.go | 398 +++++++++++++++++++++++ pkg/scalers/metrics_api_scaler.go | 2 +- pkg/scaling/scale_handler.go | 2 + tests/scalers/elasticsearch.test.ts | 329 +++++++++++++++++++ 7 files changed, 1009 insertions(+), 1 deletion(-) create mode 100644 pkg/scalers/elasticsearch_scaler.go create mode 100644 pkg/scalers/elasticsearch_scaler_test.go create mode 100644 tests/scalers/elasticsearch.test.ts diff --git a/go.mod b/go.mod index 3b1f2e8a1c6..60793db6db5 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/Shopify/sarama v1.30.0 github.com/aws/aws-sdk-go v1.42.3 github.com/denisenkom/go-mssqldb v0.11.0 + github.com/elastic/go-elasticsearch/v7 v7.15.1 github.com/go-logr/logr v0.4.0 github.com/go-playground/assert/v2 v2.0.1 github.com/go-redis/redis/v8 v8.11.4 diff --git a/go.sum b/go.sum index a14105f5414..1bb81244dcc 100644 --- a/go.sum +++ b/go.sum @@ -254,6 +254,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elastic/go-elasticsearch/v7 v7.15.1 h1:Wd8RLHb5D8xPBU8vGlnLXyflkso9G+rCmsXjqH8LLQQ= +github.com/elastic/go-elasticsearch/v7 v7.15.1/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go new file mode 100644 index 00000000000..2b6308c1fef --- /dev/null +++ b/pkg/scalers/elasticsearch_scaler.go @@ -0,0 +1,276 @@ +package scalers + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "github.com/elastic/go-elasticsearch/v7" + kedautil "github.com/kedacore/keda/v2/pkg/util" + "github.com/tidwall/gjson" + "io/ioutil" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + "net/http" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "strconv" + "strings" +) + +type elasticsearchScaler struct { + metadata *elasticsearchMetadata + esClient *elasticsearch.Client +} + +type elasticsearchMetadata struct { + addresses []string + unsafeSsl bool + username string + password string + indexes []string + searchTemplateName string + params []string + valueLocation string + targetValue int +} + +var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler") + +// NewElasticsearchScaler creates a new elasticsearch scaler +func NewElasticsearchScaler(config *ScalerConfig) (*elasticsearchScaler, error) { + meta, err := parseElasticsearchMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing elasticsearch metadata: %s", err) + } + + esClient, err := newElasticsearchClient(meta) + if err != nil { + return nil, fmt.Errorf("error getting elasticsearch client: %s", err) + } + return &elasticsearchScaler{ + metadata: meta, + esClient: esClient, + }, nil +} + +const defaultUnsafeSsl = false + +func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, error) { + meta := elasticsearchMetadata{} + + var err error + addresses, err := GetFromAuthOrMeta(config, "addresses") + if err != nil { + return nil, err + } + meta.addresses = splitAndTrimBySep(addresses, ",") + + if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { + meta.unsafeSsl, err = strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("error parsing unsafeSsL: %s", err) + } + } else { + meta.unsafeSsl = defaultUnsafeSsl + } + + if val, ok := config.AuthParams["username"]; ok { + meta.username = val + } else if val, ok := config.TriggerMetadata["username"]; ok { + meta.username = val + } + + if config.AuthParams["password"] != "" { + meta.password = config.AuthParams["password"] + } else if config.TriggerMetadata["passwordFromEnv"] != "" { + meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]] + } + + index, err := GetFromAuthOrMeta(config, "index") + if err != nil { + return nil, err + } + meta.indexes = splitAndTrimBySep(index, ";") + + meta.searchTemplateName, err = GetFromAuthOrMeta(config, "searchTemplateName") + if err != nil { + return nil, err + } + + if val, ok := config.TriggerMetadata["params"]; ok { + meta.params = splitAndTrimBySep(val, ";") + } + + meta.valueLocation, err = GetFromAuthOrMeta(config, "valueLocation") + if err != nil { + return nil, err + } + + targetValue, err := GetFromAuthOrMeta(config, "targetValue") + if err != nil { + return nil, err + } + meta.targetValue, err = strconv.Atoi(targetValue) + if err != nil { + return nil, fmt.Errorf("targetValue parsing error %s", err.Error()) + } + + return &meta, nil +} + +// newElasticsearchClient creates elasticsearch db connection +func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, error) { + config := elasticsearch.Config{Addresses: meta.addresses} + if meta.username != "" { + config.Username = meta.username + } + if meta.password != "" { + config.Password = meta.password + } + + if meta.unsafeSsl { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + config.Transport = tr + } + + esClient, err := elasticsearch.NewClient(config) + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Found error when creating client: %s", err)) + return nil, err + } + + _, err = esClient.Info() + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Found error when pinging search engine: %s", err)) + return nil, err + } + return esClient, nil +} + +func (s *elasticsearchScaler) Close(ctx context.Context) error { + return nil +} + +// IsActive returns true if there are pending messages to be processed +func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) { + messages, err := s.getQueryResult() + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err)) + return false, err + } + return messages > 0, nil +} + +// getQueryResult returns result of the scaler query +func (s *elasticsearchScaler) getQueryResult() (int, error) { + // Build the request body. + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil { + elasticsearchLog.Error(err, "Error encoding query: %s", err) + } + + // Run the templated search + res, err := s.esClient.SearchTemplate( + &body, + s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...), + ) + if err != nil { + elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err)) + return 0, err + } + + defer res.Body.Close() + b, err := ioutil.ReadAll(res.Body) + if err != nil { + return 0, err + } + v, err := getValueFromSearch(b, s.metadata.valueLocation) + if err != nil { + return 0, err + } + return v, nil +} + +func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} { + params := map[string]interface{}{} + for _, p := range metadata.params { + if p != "" { + kv := splitAndTrimBySep(p, ":") + params[kv[0]] = kv[1] + } + } + query := map[string]interface{}{ + "id": metadata.searchTemplateName, + } + if len(params) > 0 { + query["params"] = params + } + return query +} + +func getValueFromSearch(body []byte, valueLocation string) (int, error) { + r := gjson.GetBytes(body, valueLocation) + errorMsg := "valueLocation must point to value of type number but got: '%s'" + if r.Type == gjson.String { + q, err := strconv.Atoi(r.String()) + if err != nil { + return 0, fmt.Errorf(errorMsg, r.String()) + } + return q, nil + } + if r.Type != gjson.Number { + return 0, fmt.Errorf(errorMsg, r.Type.String()) + } + return int(r.Num), nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) + metricName := kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", s.metadata.searchTemplateName)) + + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.targetValue, metricName), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric +func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + num, err := s.getQueryResult() + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(num), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// Splits a string separated by comma and trims space from all the elements. +func splitAndTrimBySep(s string, sep string) []string { + x := strings.Split(s, sep) + for i := range x { + x[i] = strings.Trim(x[i], " ") + } + return x +} diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go new file mode 100644 index 00000000000..130c641fe36 --- /dev/null +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -0,0 +1,398 @@ +package scalers + +import ( + "errors" + "fmt" + "github.com/stretchr/testify/assert" + "testing" +) + +type parseElasticsearchMetadataTestData struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + expectedMetadata *elasticsearchMetadata + expectedError error +} + +type paramsTestData struct { + name string + metadata map[string]string + resolvedEnv map[string]string + authParams map[string]string + expectedQuery map[string]interface{} +} + +var testCases = []parseElasticsearchMetadataTestData{ + { + name: "no addresses given", + metadata: map[string]string{}, + authParams: map[string]string{}, + expectedError: errors.New("no addresses given"), + }, + { + name: "no index given", + metadata: map[string]string{"addresses": "http://localhost:9200"}, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no index given"), + }, + { + name: "no searchTemplateName given", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + }, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no searchTemplateName given"), + }, + { + name: "no valueLocation given", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "searchTemplateName", + }, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no valueLocation given"), + }, + { + name: "no targetValue given", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "searchTemplateName", + "valueLocation": "toto", + }, + authParams: map[string]string{"username": "admin"}, + expectedError: errors.New("no targetValue given"), + }, + { + name: "all fields ok", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "unsafeSsl": "true", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: true, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + params: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + }, + expectedError: nil, + }, + { + name: "multi indexes", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "unsafeSsl": "false", + "index": "index1;index2", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: false, + indexes: []string{"index1", "index2"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + params: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + }, + expectedError: nil, + }, + { + name: "multi indexes trimmed", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "unsafeSsl": "false", + "index": "index1 ; index2", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: false, + indexes: []string{"index1", "index2"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + params: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + }, + expectedError: nil, + }, + { + name: "multi addresses", + metadata: map[string]string{ + "addresses": "http://localhost:9200,http://localhost:9201", + "unsafeSsl": "false", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200", "http://localhost:9201"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + params: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + }, + expectedError: nil, + }, + { + name: "multi addresses trimmed", + metadata: map[string]string{ + "addresses": "http://localhost:9200 , http://localhost:9201", + "unsafeSsl": "false", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200", "http://localhost:9201"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + params: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + }, + expectedError: nil, + }, + { + name: "password from env", + metadata: map[string]string{ + "addresses": "http://localhost:9200,http://localhost:9201", + "unsafeSsl": "false", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + "passwordFromEnv": "ELASTICSEARCH_PASSWORD", + }, + authParams: map[string]string{ + "username": "admin", + }, + resolvedEnv: map[string]string{ + "ELASTICSEARCH_PASSWORD": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200", "http://localhost:9201"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + params: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + }, + expectedError: nil, + }, +} + +func TestParseElasticsearchMetadata(t *testing.T) { + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + metadata, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: tc.metadata, + AuthParams: tc.authParams, + ResolvedEnv: tc.resolvedEnv, + }) + if tc.expectedError != nil { + assert.Contains(t, err.Error(), tc.expectedError.Error()) + } else if err != nil { + assert.NoError(t, err) + } else { + assert.NoError(t, err) + fmt.Println(tc.name) + assert.Equal(t, tc.expectedMetadata, metadata) + } + }) + } +} + +func TestUnsafeSslDefaultValue(t *testing.T) { + tc := &parseElasticsearchMetadataTestData{ + name: "all fields ok", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedMetadata: &elasticsearchMetadata{ + addresses: []string{"http://localhost:9200"}, + unsafeSsl: false, + indexes: []string{"index1"}, + username: "admin", + password: "password", + searchTemplateName: "myAwesomeSearch", + params: []string{"param1:value1"}, + valueLocation: "hits.hits[0]._source.value", + targetValue: 12, + }, + expectedError: nil, + } + metadata, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: tc.metadata, + AuthParams: tc.authParams, + }) + assert.NoError(t, err) + assert.Equal(t, tc.expectedMetadata, metadata) +} + +func TestBuildQuery(t *testing.T) { + + var testCases = []paramsTestData{ + { + name: "no params", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + }, + }, + { + name: "one param", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + "params": map[string]interface{}{ + "param1": "value1", + }, + }, + }, + { + name: "two params", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1:value1;param2:value2", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + "params": map[string]interface{}{ + "param1": "value1", + "param2": "value2", + }, + }, + }, + { + name: "params are trimmed", + metadata: map[string]string{ + "addresses": "http://localhost:9200", + "index": "index1", + "searchTemplateName": "myAwesomeSearch", + "params": "param1 : value1 ; param2 : value2 ", + "valueLocation": "hits.hits[0]._source.value", + "targetValue": "12", + }, + authParams: map[string]string{ + "username": "admin", + "password": "password", + }, + expectedQuery: map[string]interface{}{ + "id": "myAwesomeSearch", + "params": map[string]interface{}{ + "param1": "value1", + "param2": "value2", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + metadata, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: tc.metadata, + AuthParams: tc.authParams, + }) + assert.NoError(t, err) + assert.Equal(t, tc.expectedQuery, buildQuery(metadata)) + }) + } +} diff --git a/pkg/scalers/metrics_api_scaler.go b/pkg/scalers/metrics_api_scaler.go index 901b234435b..fd62a5bc773 100644 --- a/pkg/scalers/metrics_api_scaler.go +++ b/pkg/scalers/metrics_api_scaler.go @@ -186,7 +186,7 @@ func parseMetricsAPIMetadata(config *ScalerConfig) (*metricsAPIScalerMetadata, e // GetValueFromResponse uses provided valueLocation to access the numeric value in provided body func GetValueFromResponse(body []byte, valueLocation string) (*resource.Quantity, error) { r := gjson.GetBytes(body, valueLocation) - errorMsg := "valueLocation must point to value of type number or a string representing a Quanitity got: '%s'" + errorMsg := "valueLocation must point to value of type number or a string representing a Quantity got: '%s'" if r.Type == gjson.String { q, err := resource.ParseQuantity(r.String()) if err != nil { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 68be3bcadb9..d03ab966582 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -357,6 +357,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewCronScaler(config) case "external": return scalers.NewExternalScaler(config) + case "elasticsearch": + return scalers.NewElasticsearchScaler(config) case "external-push": return scalers.NewExternalPushScaler(config) case "gcp-pubsub": diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts new file mode 100644 index 00000000000..954b7271b91 --- /dev/null +++ b/tests/scalers/elasticsearch.test.ts @@ -0,0 +1,329 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForRollout} from "./helpers"; + +const testNamespace = 'elasticsearch-test' +const elasticsearchNamespace = 'elasticsearch' +const deploymentName = 'podinfo' +const indexName = 'keda' +const searchTemplateName = 'keda-search-template' +const elasticPassword = 'passw0rd!' +const kubectlExecCurl = `kubectl exec -n ${elasticsearchNamespace} elasticsearch-0 -- curl -sS -H "content-type: application/json" -u "elastic:${elasticPassword}"` + +test.before(t => { + // install elasticsearch + sh.exec(`kubectl create namespace ${elasticsearchNamespace}`) + const elasticsearchTmpFile = tmp.fileSync() + fs.writeFileSync(elasticsearchTmpFile.name, elasticsearchStatefulsetYaml.replace('{{ELASTIC_PASSWORD}}', elasticPassword)) + + t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') + t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace)) + + // Create the index and the search template + sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) + sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/_scripts/${searchTemplateName} -d '${elasticsearchSearchTemplate}'`) + + + sh.exec(`kubectl create namespace ${testNamespace}`) + + // deploy dummy app and scaled object + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, deployYaml.replace(/{{DEPLOYMENT_NAME}}/g, deploymentName) + .replace('{{ELASTICSEARCH_NAMESPACE}}', elasticsearchNamespace) + .replace('{{SEARCH_TEMPLATE_NAME}}', searchTemplateName) + .replace('{{INDEX_NAME}}', indexName) + .replace('{{ELASTIC_PASSWORD_BASE64}}', Buffer.from(elasticPassword).toString('base64')) + ) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) +}) + +test.serial('Deployment should have 0 replicas on start', t => { + const replicaCount = sh.exec( + `kubectl get deployment.apps/${deploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial(`Deployment should scale to 5 (the max) then back to 0`, t => { + + for (let i = 0; i < 5; i++) { + let doc = elasticsearchDummyDoc.replace("{{TIMESTAMP}}", new Date().toISOString()) + sh.exec(`${kubectlExecCurl} -XPOST http://localhost:9200/${indexName}/_doc -d '${doc}'`) + } + + let replicaCount = '0' + + const maxReplicaCount = '5' + + for (let i = 0; i < 90 && replicaCount !== maxReplicaCount; i++) { + replicaCount = sh.exec( + `kubectl get deployment.apps/${deploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== maxReplicaCount) { + sh.exec('sleep 2s') + } + } + + t.is(maxReplicaCount, replicaCount, `Replica count should be ${maxReplicaCount} after 60 seconds`) + + for (let i = 0; i < 36 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deployment.apps/${deploymentName} --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + if (replicaCount !== '0') { + sh.exec('sleep 5s') + } + } + + t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') +}) + +test.after.always.cb('clean up elasticsearch deployment', t => { + sh.exec(`kubectl delete namespace ${testNamespace}`) + + // uninstall elasticsearch + sh.exec(`kubectl delete --namespace ${elasticsearchNamespace} sts/elasticsearch`) + sh.exec(`kubectl delete namespace ${elasticsearchNamespace}`) + + t.end() +}) + +const deployYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: {{DEPLOYMENT_NAME}} + name: {{DEPLOYMENT_NAME}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{DEPLOYMENT_NAME}} + template: + metadata: + labels: + app: {{DEPLOYMENT_NAME}} + spec: + containers: + - image: stefanprodan/podinfo + name: {{DEPLOYMENT_NAME}} +--- +apiVersion: v1 +kind: Secret +metadata: + name: elasticsearch-secrets +type: Opaque +data: + password: {{ELASTIC_PASSWORD_BASE64}} +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-elasticsearch-secret +spec: + secretTargetRef: + - parameter: password + name: elasticsearch-secrets + key: password +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: elasticsearch-scaledobject +spec: + minReplicaCount: 0 + maxReplicaCount: 5 + pollingInterval: 3 + cooldownPeriod: 5 + scaleTargetRef: + name: {{DEPLOYMENT_NAME}} + triggers: + - type: elasticsearch + metadata: + addresses: "http://elasticsearch-svc.{{ELASTICSEARCH_NAMESPACE}}.svc.cluster.local:9200" + username: "elastic" + index: {{INDEX_NAME}} + searchTemplateName: {{SEARCH_TEMPLATE_NAME}} + valueLocation: "hits.total.value" + targetValue: "1" + params: "dummy_value:1;dumb_value:oOooo" + authenticationRef: + name: keda-trigger-auth-elasticsearch-secret +` + +const elasticsearchStatefulsetYaml = ` +kind: Service +apiVersion: v1 +metadata: + name: elasticsearch-svc +spec: + type: ClusterIP + ports: + - name: http + port: 9200 + targetPort: 9200 + protocol: TCP + selector: + name: elasticsearch +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: elasticsearch +spec: + replicas: 1 + selector: + matchLabels: + name: elasticsearch + template: + metadata: + labels: + name: elasticsearch + spec: + containers: + - name: elasticsearch + image: docker.elastic.co/elasticsearch/elasticsearch:7.15.1 + imagePullPolicy: IfNotPresent + env: + - name: POD_IP + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + - name: NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + - name: ES_JAVA_OPTS + value: -Xms256m -Xmx256m + - name: cluster.name + value: elasticsearch-keda + - name: cluster.initial_master_nodes + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + - name: node.data + value: "true" + - name: node.ml + value: "false" + - name: node.ingest + value: "false" + - name: node.master + value: "true" + - name: node.remote_cluster_client + value: "false" + - name: node.transform + value: "false" + - name: ELASTIC_PASSWORD + value: "{{ELASTIC_PASSWORD}}" + - name: xpack.security.enabled + value: "true" + ports: + - containerPort: 9200 + name: http + protocol: TCP + - containerPort: 9300 + name: transport + protocol: TCP + resources: + requests: + cpu: 100m + memory: 1Gi + limits: + memory: 1Gi + readinessProbe: + exec: + command: + - /usr/bin/curl + - -sS + - -u "elastic:{{ELASTIC_PASSWORD}}" + - http://localhost:9200 + failureThreshold: 3 + initialDelaySeconds: 10 + periodSeconds: 5 + successThreshold: 1 + timeoutSeconds: 5 + + + serviceName: elasticsearch-svc +` + +const elastisearchCreateIndex = ` +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "dummy": { + "type": "integer" + }, + "dumb": { + "type": "keyword" + } + } + }, + "settings": { + "number_of_replicas": 0, + "number_of_shards": 1 + } +}` + +const elasticsearchDummyDoc = ` +{ + "@timestamp": "{{TIMESTAMP}}", + "dummy": 1, + "dumb": "oOooo" +}` + +const elasticsearchSearchTemplate = ` +{ + "script": { + "lang": "mustache", + "source": { + "query": { + "bool": { + "filter": [ + { + "range": { + "@timestamp": { + "gte": "now-1m/m", + "lte": "now/m" + } + } + }, + { + "term": { + "dummy": "{{dummy_value}}" + } + }, + { + "term": { + "dumb": "{{dumb_value}}" + } + } + ] + } + } + } + } +}` From 44ba5558ab6bf86806d662c49a182a8d146f1d15 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 09:56:58 +0100 Subject: [PATCH 02/23] chore: proper indent Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler.go | 4 +-- pkg/scalers/elasticsearch_scaler_test.go | 42 ++++++++++++------------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index 2b6308c1fef..2a1951b61b1 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -35,7 +35,7 @@ type elasticsearchMetadata struct { searchTemplateName string params []string valueLocation string - targetValue int + targetValue int } var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler") @@ -206,7 +206,7 @@ func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} { } } query := map[string]interface{}{ - "id": metadata.searchTemplateName, + "id": metadata.searchTemplateName, } if len(params) > 0 { query["params"] = params diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 130c641fe36..09052257d91 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -8,9 +8,9 @@ import ( ) type parseElasticsearchMetadataTestData struct { - name string - metadata map[string]string - resolvedEnv map[string]string + name string + metadata map[string]string + resolvedEnv map[string]string authParams map[string]string expectedMetadata *elasticsearchMetadata expectedError error @@ -76,7 +76,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -91,7 +91,7 @@ var testCases = []parseElasticsearchMetadataTestData{ searchTemplateName: "myAwesomeSearch", params: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", - targetValue: 12, + targetValue: 12, }, expectedError: nil, }, @@ -104,7 +104,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -119,7 +119,7 @@ var testCases = []parseElasticsearchMetadataTestData{ searchTemplateName: "myAwesomeSearch", params: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", - targetValue: 12, + targetValue: 12, }, expectedError: nil, }, @@ -132,7 +132,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -147,7 +147,7 @@ var testCases = []parseElasticsearchMetadataTestData{ searchTemplateName: "myAwesomeSearch", params: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", - targetValue: 12, + targetValue: 12, }, expectedError: nil, }, @@ -160,7 +160,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -175,7 +175,7 @@ var testCases = []parseElasticsearchMetadataTestData{ searchTemplateName: "myAwesomeSearch", params: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", - targetValue: 12, + targetValue: 12, }, expectedError: nil, }, @@ -188,7 +188,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -203,7 +203,7 @@ var testCases = []parseElasticsearchMetadataTestData{ searchTemplateName: "myAwesomeSearch", params: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", - targetValue: 12, + targetValue: 12, }, expectedError: nil, }, @@ -216,7 +216,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", "passwordFromEnv": "ELASTICSEARCH_PASSWORD", }, authParams: map[string]string{ @@ -234,7 +234,7 @@ var testCases = []parseElasticsearchMetadataTestData{ searchTemplateName: "myAwesomeSearch", params: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", - targetValue: 12, + targetValue: 12, }, expectedError: nil, }, @@ -271,7 +271,7 @@ func TestUnsafeSslDefaultValue(t *testing.T) { "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -286,7 +286,7 @@ func TestUnsafeSslDefaultValue(t *testing.T) { searchTemplateName: "myAwesomeSearch", params: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", - targetValue: 12, + targetValue: 12, }, expectedError: nil, } @@ -309,7 +309,7 @@ func TestBuildQuery(t *testing.T) { "searchTemplateName": "myAwesomeSearch", "params": "", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -327,7 +327,7 @@ func TestBuildQuery(t *testing.T) { "searchTemplateName": "myAwesomeSearch", "params": "param1:value1", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -348,7 +348,7 @@ func TestBuildQuery(t *testing.T) { "searchTemplateName": "myAwesomeSearch", "params": "param1:value1;param2:value2", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", @@ -370,7 +370,7 @@ func TestBuildQuery(t *testing.T) { "searchTemplateName": "myAwesomeSearch", "params": "param1 : value1 ; param2 : value2 ", "valueLocation": "hits.hits[0]._source.value", - "targetValue": "12", + "targetValue": "12", }, authParams: map[string]string{ "username": "admin", From cc4c9f1507eae530f9d92a2f1b3f842391356ac6 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 10:04:09 +0100 Subject: [PATCH 03/23] chore: fixes lints Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler.go | 2 +- pkg/scalers/elasticsearch_scaler_test.go | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index 2a1951b61b1..d9eb2199cfa 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -41,7 +41,7 @@ type elasticsearchMetadata struct { var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler") // NewElasticsearchScaler creates a new elasticsearch scaler -func NewElasticsearchScaler(config *ScalerConfig) (*elasticsearchScaler, error) { +func NewElasticsearchScaler(config *ScalerConfig) (Scaler, error) { meta, err := parseElasticsearchMetadata(config) if err != nil { return nil, fmt.Errorf("error parsing elasticsearch metadata: %s", err) diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 09052257d91..dced6f0eed8 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -19,7 +19,6 @@ type parseElasticsearchMetadataTestData struct { type paramsTestData struct { name string metadata map[string]string - resolvedEnv map[string]string authParams map[string]string expectedQuery map[string]interface{} } @@ -241,7 +240,6 @@ var testCases = []parseElasticsearchMetadataTestData{ } func TestParseElasticsearchMetadata(t *testing.T) { - for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { metadata, err := parseElasticsearchMetadata(&ScalerConfig{ @@ -299,7 +297,6 @@ func TestUnsafeSslDefaultValue(t *testing.T) { } func TestBuildQuery(t *testing.T) { - var testCases = []paramsTestData{ { name: "no params", From e8c641562503dc5bca22f9a89be0cf97482459ea Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 10:31:00 +0100 Subject: [PATCH 04/23] chore: add changelog entry Signed-off-by: Nicolas L --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 050667a147a..36f7072849c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) - Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225)) - Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187)) +- Add an elasticsearch scaler based on search template ([#2304](https://github.com/kedacore/keda/pull/2304)) ### Improvements From 9d703c259d792bc2f1c4f96632bb06c5b2c4ab58 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 10:38:37 +0100 Subject: [PATCH 05/23] chore: rename params to parameters Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler.go | 16 +++++------ pkg/scalers/elasticsearch_scaler_test.go | 36 ++++++++++++------------ tests/scalers/elasticsearch.test.ts | 2 +- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index d9eb2199cfa..f190468f7e2 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -33,7 +33,7 @@ type elasticsearchMetadata struct { password string indexes []string searchTemplateName string - params []string + parameters []string valueLocation string targetValue int } @@ -101,8 +101,8 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e return nil, err } - if val, ok := config.TriggerMetadata["params"]; ok { - meta.params = splitAndTrimBySep(val, ";") + if val, ok := config.TriggerMetadata["parameters"]; ok { + meta.parameters = splitAndTrimBySep(val, ";") } meta.valueLocation, err = GetFromAuthOrMeta(config, "valueLocation") @@ -198,18 +198,18 @@ func (s *elasticsearchScaler) getQueryResult() (int, error) { } func buildQuery(metadata *elasticsearchMetadata) map[string]interface{} { - params := map[string]interface{}{} - for _, p := range metadata.params { + parameters := map[string]interface{}{} + for _, p := range metadata.parameters { if p != "" { kv := splitAndTrimBySep(p, ":") - params[kv[0]] = kv[1] + parameters[kv[0]] = kv[1] } } query := map[string]interface{}{ "id": metadata.searchTemplateName, } - if len(params) > 0 { - query["params"] = params + if len(parameters) > 0 { + query["params"] = parameters } return query } diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index dced6f0eed8..163828b0c8c 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -73,7 +73,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "unsafeSsl": "true", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -88,7 +88,7 @@ var testCases = []parseElasticsearchMetadataTestData{ username: "admin", password: "password", searchTemplateName: "myAwesomeSearch", - params: []string{"param1:value1"}, + parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, }, @@ -101,7 +101,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "unsafeSsl": "false", "index": "index1;index2", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -116,7 +116,7 @@ var testCases = []parseElasticsearchMetadataTestData{ username: "admin", password: "password", searchTemplateName: "myAwesomeSearch", - params: []string{"param1:value1"}, + parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, }, @@ -129,7 +129,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "unsafeSsl": "false", "index": "index1 ; index2", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -144,7 +144,7 @@ var testCases = []parseElasticsearchMetadataTestData{ username: "admin", password: "password", searchTemplateName: "myAwesomeSearch", - params: []string{"param1:value1"}, + parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, }, @@ -157,7 +157,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "unsafeSsl": "false", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -172,7 +172,7 @@ var testCases = []parseElasticsearchMetadataTestData{ username: "admin", password: "password", searchTemplateName: "myAwesomeSearch", - params: []string{"param1:value1"}, + parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, }, @@ -185,7 +185,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "unsafeSsl": "false", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -200,7 +200,7 @@ var testCases = []parseElasticsearchMetadataTestData{ username: "admin", password: "password", searchTemplateName: "myAwesomeSearch", - params: []string{"param1:value1"}, + parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, }, @@ -213,7 +213,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "unsafeSsl": "false", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", "passwordFromEnv": "ELASTICSEARCH_PASSWORD", @@ -231,7 +231,7 @@ var testCases = []parseElasticsearchMetadataTestData{ username: "admin", password: "password", searchTemplateName: "myAwesomeSearch", - params: []string{"param1:value1"}, + parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, }, @@ -267,7 +267,7 @@ func TestUnsafeSslDefaultValue(t *testing.T) { "addresses": "http://localhost:9200", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -282,7 +282,7 @@ func TestUnsafeSslDefaultValue(t *testing.T) { username: "admin", password: "password", searchTemplateName: "myAwesomeSearch", - params: []string{"param1:value1"}, + parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, }, @@ -304,7 +304,7 @@ func TestBuildQuery(t *testing.T) { "addresses": "http://localhost:9200", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "", + "parameters": "", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -322,7 +322,7 @@ func TestBuildQuery(t *testing.T) { "addresses": "http://localhost:9200", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1", + "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -343,7 +343,7 @@ func TestBuildQuery(t *testing.T) { "addresses": "http://localhost:9200", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1:value1;param2:value2", + "parameters": "param1:value1;param2:value2", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, @@ -365,7 +365,7 @@ func TestBuildQuery(t *testing.T) { "addresses": "http://localhost:9200", "index": "index1", "searchTemplateName": "myAwesomeSearch", - "params": "param1 : value1 ; param2 : value2 ", + "parameters": "param1 : value1 ; param2 : value2 ", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", }, diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index 954b7271b91..f4324b0c4bb 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -153,7 +153,7 @@ spec: searchTemplateName: {{SEARCH_TEMPLATE_NAME}} valueLocation: "hits.total.value" targetValue: "1" - params: "dummy_value:1;dumb_value:oOooo" + parameters: "dummy_value:1;dumb_value:oOooo" authenticationRef: name: keda-trigger-auth-elasticsearch-secret ` From 7b408fd06c2feaba175abc92c910801646d5b4d2 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 10:46:21 +0100 Subject: [PATCH 06/23] chore: fix comment Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index f190468f7e2..f339aa54431 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -266,7 +266,7 @@ func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, return append([]external_metrics.ExternalMetricValue{}, metric), nil } -// Splits a string separated by comma and trims space from all the elements. +// Splits a string separated by a specified separator and trims space from all the elements. func splitAndTrimBySep(s string, sep string) []string { x := strings.Split(s, sep) for i := range x { From 6afb6caa30acace1e41e60a674fba023d3b85283 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 11:10:09 +0100 Subject: [PATCH 07/23] :chore: trim Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index f4324b0c4bb..4c5401ff708 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -172,7 +172,7 @@ spec: protocol: TCP selector: name: elasticsearch ---- +--- apiVersion: apps/v1 kind: StatefulSet metadata: From bef268d62c5dba1846e2424c06892735894ef2f2 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 11:15:38 +0100 Subject: [PATCH 08/23] chore: fix imports order Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler.go | 9 +++++---- pkg/scalers/elasticsearch_scaler_test.go | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index f339aa54431..f7edc07a5df 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -6,19 +6,20 @@ import ( "crypto/tls" "encoding/json" "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + "github.com/elastic/go-elasticsearch/v7" kedautil "github.com/kedacore/keda/v2/pkg/util" "github.com/tidwall/gjson" - "io/ioutil" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" - "net/http" logf "sigs.k8s.io/controller-runtime/pkg/log" - "strconv" - "strings" ) type elasticsearchScaler struct { diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 163828b0c8c..82e8f490189 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -3,8 +3,9 @@ package scalers import ( "errors" "fmt" - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) type parseElasticsearchMetadataTestData struct { From 4664f216581173ace9da5aa9a0d6568409bc65b1 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 11:18:32 +0100 Subject: [PATCH 09/23] chore: fix bad if / else / else if Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 82e8f490189..fcf7e990550 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -250,8 +250,6 @@ func TestParseElasticsearchMetadata(t *testing.T) { }) if tc.expectedError != nil { assert.Contains(t, err.Error(), tc.expectedError.Error()) - } else if err != nil { - assert.NoError(t, err) } else { assert.NoError(t, err) fmt.Println(tc.name) From 0d5042f148f9331c4171d209ca80034d51e713f9 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 11:28:33 +0100 Subject: [PATCH 10/23] chore: fix sort in scale_handler Signed-off-by: Nicolas L --- pkg/scaling/scale_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index d03ab966582..4f815dfc59f 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -355,10 +355,10 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewCPUMemoryScaler(corev1.ResourceCPU, config) case "cron": return scalers.NewCronScaler(config) - case "external": - return scalers.NewExternalScaler(config) case "elasticsearch": return scalers.NewElasticsearchScaler(config) + case "external": + return scalers.NewExternalScaler(config) case "external-push": return scalers.NewExternalPushScaler(config) case "gcp-pubsub": From 6f3558962288bd2de54caf07c8c7054157d009a4 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 11:28:53 +0100 Subject: [PATCH 11/23] chore: fix name for sort-scalers hook Signed-off-by: Nicolas L --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b31a24da3ba..1430e5c745f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: entry: "(?i)(black|white)[_-]?(list|List)" pass_filenames: true - id: sort-scalers - name: Check if scalers are sorted in scaler_handler.go + name: Check if scalers are sorted in scale_handler.go language: system entry: "bash tools/sort_scalers.sh" files: .*scale_handler\.go$ From 4d835d39022b31cce8fa301efff5b70df4e4e3c1 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Fri, 19 Nov 2021 11:30:28 +0100 Subject: [PATCH 12/23] chore: trim Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index 4c5401ff708..ba37873b954 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -172,7 +172,7 @@ spec: protocol: TCP selector: name: elasticsearch ---- +--- apiVersion: apps/v1 kind: StatefulSet metadata: From 310815207cb7c5ece8822f9cb6b408cc99b92351 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Mon, 22 Nov 2021 09:08:56 +0100 Subject: [PATCH 13/23] chore: fixes after code review Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler.go | 23 +++++++++++------------ pkg/scalers/elasticsearch_scaler_test.go | 7 +++++++ tests/scalers/elasticsearch.test.ts | 2 +- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index f7edc07a5df..bc08b135ada 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -37,6 +37,7 @@ type elasticsearchMetadata struct { parameters []string valueLocation string targetValue int + metricName string } var elasticsearchLog = logf.Log.WithName("elasticsearch_scaler") @@ -73,7 +74,7 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { meta.unsafeSsl, err = strconv.ParseBool(val) if err != nil { - return nil, fmt.Errorf("error parsing unsafeSsL: %s", err) + return nil, fmt.Errorf("error parsing unsafeSsl: %s", err) } } else { meta.unsafeSsl = defaultUnsafeSsl @@ -120,6 +121,7 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e return nil, fmt.Errorf("targetValue parsing error %s", err.Error()) } + meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", meta.searchTemplateName))) return &meta, nil } @@ -133,12 +135,9 @@ func newElasticsearchClient(meta *elasticsearchMetadata) (*elasticsearch.Client, config.Password = meta.password } - if meta.unsafeSsl { - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - config.Transport = tr - } + transport := http.DefaultTransport.(*http.Transport) + transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: meta.unsafeSsl} + config.Transport = transport esClient, err := elasticsearch.NewClient(config) if err != nil { @@ -160,7 +159,7 @@ func (s *elasticsearchScaler) Close(ctx context.Context) error { // IsActive returns true if there are pending messages to be processed func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) { - messages, err := s.getQueryResult() + messages, err := s.getQueryResult(ctx) if err != nil { elasticsearchLog.Error(err, fmt.Sprintf("Error inspecting elasticsearch: %s", err)) return false, err @@ -169,7 +168,7 @@ func (s *elasticsearchScaler) IsActive(ctx context.Context) (bool, error) { } // getQueryResult returns result of the scaler query -func (s *elasticsearchScaler) getQueryResult() (int, error) { +func (s *elasticsearchScaler) getQueryResult(ctx context.Context) (int, error) { // Build the request body. var body bytes.Buffer if err := json.NewEncoder(&body).Encode(buildQuery(s.metadata)); err != nil { @@ -180,6 +179,7 @@ func (s *elasticsearchScaler) getQueryResult() (int, error) { res, err := s.esClient.SearchTemplate( &body, s.esClient.SearchTemplate.WithIndex(s.metadata.indexes...), + s.esClient.SearchTemplate.WithContext(ctx), ) if err != nil { elasticsearchLog.Error(err, fmt.Sprintf("Could not query elasticsearch: %s", err)) @@ -234,11 +234,10 @@ func getValueFromSearch(body []byte, valueLocation string) (int, error) { // GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { targetValue := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) - metricName := kedautil.NormalizeString(fmt.Sprintf("elasticsearch-%s", s.metadata.searchTemplateName)) externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.targetValue, metricName), + Name: GenerateMetricNameWithIndex(s.metadata.targetValue, s.metadata.metricName), }, Target: v2beta2.MetricTarget{ Type: v2beta2.AverageValueMetricType, @@ -253,7 +252,7 @@ func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2 // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *elasticsearchScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - num, err := s.getQueryResult() + num, err := s.getQueryResult(ctx) if err != nil { return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting elasticsearch: %s", err) } diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index fcf7e990550..5b923e7e5ed 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -92,6 +92,7 @@ var testCases = []parseElasticsearchMetadataTestData{ parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", }, expectedError: nil, }, @@ -120,6 +121,7 @@ var testCases = []parseElasticsearchMetadataTestData{ parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", }, expectedError: nil, }, @@ -148,6 +150,7 @@ var testCases = []parseElasticsearchMetadataTestData{ parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", }, expectedError: nil, }, @@ -176,6 +179,7 @@ var testCases = []parseElasticsearchMetadataTestData{ parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", }, expectedError: nil, }, @@ -204,6 +208,7 @@ var testCases = []parseElasticsearchMetadataTestData{ parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", }, expectedError: nil, }, @@ -235,6 +240,7 @@ var testCases = []parseElasticsearchMetadataTestData{ parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", }, expectedError: nil, }, @@ -284,6 +290,7 @@ func TestUnsafeSslDefaultValue(t *testing.T) { parameters: []string{"param1:value1"}, valueLocation: "hits.hits[0]._source.value", targetValue: 12, + metricName: "s0-elasticsearch-myAwesomeSearch", }, expectedError: nil, } diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index ba37873b954..1cd5bed44cf 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -19,7 +19,7 @@ test.before(t => { fs.writeFileSync(elasticsearchTmpFile.name, elasticsearchStatefulsetYaml.replace('{{ELASTIC_PASSWORD}}', elasticPassword)) t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') - t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace)) + t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 300)) // Create the index and the search template sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) From 8e3b4798aca04b90286f7e1ffa8e9b4e47fb604c Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Mon, 22 Nov 2021 09:58:35 +0100 Subject: [PATCH 14/23] chore: fix PR number Signed-off-by: Nicolas L --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36f7072849c..e1f696eb844 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ - Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) - Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225)) - Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187)) -- Add an elasticsearch scaler based on search template ([#2304](https://github.com/kedacore/keda/pull/2304)) +- Add an elasticsearch scaler based on search template ([#2311](https://github.com/kedacore/keda/pull/2311)) ### Improvements From ca423cbf0ed87d2a7c574c0d7ce16dd1b4a8d714 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Mon, 22 Nov 2021 10:24:00 +0100 Subject: [PATCH 15/23] proper handling of metricName + TU Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler.go | 2 +- pkg/scalers/elasticsearch_scaler_test.go | 30 ++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index bc08b135ada..42e7de2dc6f 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -237,7 +237,7 @@ func (s *elasticsearchScaler) GetMetricSpecForScaling(context.Context) []v2beta2 externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.targetValue, s.metadata.metricName), + Name: s.metadata.metricName, }, Target: v2beta2.MetricTarget{ Type: v2beta2.AverageValueMetricType, diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index 5b923e7e5ed..b85e865c2ad 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -1,6 +1,7 @@ package scalers import ( + "context" "errors" "fmt" "testing" @@ -24,6 +25,12 @@ type paramsTestData struct { expectedQuery map[string]interface{} } +type elasticsearchMetricIdentifier struct { + metadataTestData *parseElasticsearchMetadataTestData + scalerIndex int + name string +} + var testCases = []parseElasticsearchMetadataTestData{ { name: "no addresses given", @@ -77,6 +84,7 @@ var testCases = []parseElasticsearchMetadataTestData{ "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", + "scalerIndex": "0", }, authParams: map[string]string{ "username": "admin", @@ -399,3 +407,25 @@ func TestBuildQuery(t *testing.T) { }) } } + +func TestElasticsearchGetMetricSpecForScaling(t *testing.T) { + var elasticsearchMetricIdentifiers = []elasticsearchMetricIdentifier{ + {&testCases[5], 0, "s0-elasticsearch-myAwesomeSearch"}, + {&testCases[6], 1, "s1-elasticsearch-myAwesomeSearch"}, + } + + for _, testData := range elasticsearchMetricIdentifiers { + ctx := context.Background() + meta, err := parseElasticsearchMetadata(&ScalerConfig{ + TriggerMetadata: testData.metadataTestData.metadata, + AuthParams: testData.metadataTestData.authParams, + ScalerIndex: testData.scalerIndex, + }) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + elasticsearchScaler := elasticsearchScaler{metadata: meta, esClient: nil} + metricSpec := elasticsearchScaler.GetMetricSpecForScaling(ctx) + assert.Equal(t, metricSpec[0].External.Metric.Name, testData.name) + } +} From 9d5a921e78fae1c466dd79901c912c198c42e2a4 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Mon, 22 Nov 2021 10:28:03 +0100 Subject: [PATCH 16/23] chore: remove useless metadata Signed-off-by: Nicolas L --- pkg/scalers/elasticsearch_scaler_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/scalers/elasticsearch_scaler_test.go b/pkg/scalers/elasticsearch_scaler_test.go index b85e865c2ad..c219dbba801 100644 --- a/pkg/scalers/elasticsearch_scaler_test.go +++ b/pkg/scalers/elasticsearch_scaler_test.go @@ -84,7 +84,6 @@ var testCases = []parseElasticsearchMetadataTestData{ "parameters": "param1:value1", "valueLocation": "hits.hits[0]._source.value", "targetValue": "12", - "scalerIndex": "0", }, authParams: map[string]string{ "username": "admin", From 5dd78180909e5f425f563aec0cdbca52084a0537 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Mon, 22 Nov 2021 11:08:03 +0100 Subject: [PATCH 17/23] chore: increase timeout again Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index 1cd5bed44cf..76ea6a407c7 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -19,7 +19,7 @@ test.before(t => { fs.writeFileSync(elasticsearchTmpFile.name, elasticsearchStatefulsetYaml.replace('{{ELASTIC_PASSWORD}}', elasticPassword)) t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') - t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 300)) + t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 600)) // Create the index and the search template sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) From 62512f2c753dcaf8d6e6b810d3740f0eb1b85e8c Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Mon, 22 Nov 2021 11:38:00 +0100 Subject: [PATCH 18/23] chore: add debug output Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index 76ea6a407c7..d57d13a0ff3 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -20,6 +20,7 @@ test.before(t => { t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 600)) + sh.exec(`kubectl describe --namespace ${elasticsearchNamespace} statefulset/elasticsearch `) // Create the index and the search template sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) From 18b093b71bb1c68efc108c0adc891dd937cd7979 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Tue, 23 Nov 2021 08:42:39 +0100 Subject: [PATCH 19/23] chore: add debug output Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index d57d13a0ff3..274e192a9a9 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -20,7 +20,9 @@ test.before(t => { t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 600)) - sh.exec(`kubectl describe --namespace ${elasticsearchNamespace} statefulset/elasticsearch `) + sh.exec(`kubectl --namespace ${elasticsearchNamespace} describe statefulset/elasticsearch `) + sh.exec(`kubectl --namespace ${elasticsearchNamespace} describe pods`) + sh.exec(`kubectl --namespace ${elasticsearchNamespace} logs elasticsearch-0`) // Create the index and the search template sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) From 8c6f8360ff1ec9600981a2c31b3431ecaae3dc0e Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Tue, 23 Nov 2021 09:34:02 +0100 Subject: [PATCH 20/23] fix: disable mmap on elasticsearch statefulset Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index 274e192a9a9..af7c352daaa 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -240,6 +240,8 @@ spec: value: "{{ELASTIC_PASSWORD}}" - name: xpack.security.enabled value: "true" + - name: node.store.allow_mmap + value: "false" ports: - containerPort: 9200 name: http From 75cdc8fa8eddf6a0436dbe47d58e24c166333ace Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Tue, 23 Nov 2021 10:00:45 +0100 Subject: [PATCH 21/23] chore: cleanup debug output Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index af7c352daaa..a3d55ea8579 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -20,9 +20,6 @@ test.before(t => { t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 600)) - sh.exec(`kubectl --namespace ${elasticsearchNamespace} describe statefulset/elasticsearch `) - sh.exec(`kubectl --namespace ${elasticsearchNamespace} describe pods`) - sh.exec(`kubectl --namespace ${elasticsearchNamespace} logs elasticsearch-0`) // Create the index and the search template sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) From c8993986c8143627678e411eaa17f5c71e362436 Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Tue, 23 Nov 2021 12:16:11 +0100 Subject: [PATCH 22/23] chore: remove debug timeout Signed-off-by: Nicolas L --- tests/scalers/elasticsearch.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scalers/elasticsearch.test.ts b/tests/scalers/elasticsearch.test.ts index a3d55ea8579..3b04013d3c5 100644 --- a/tests/scalers/elasticsearch.test.ts +++ b/tests/scalers/elasticsearch.test.ts @@ -19,7 +19,7 @@ test.before(t => { fs.writeFileSync(elasticsearchTmpFile.name, elasticsearchStatefulsetYaml.replace('{{ELASTIC_PASSWORD}}', elasticPassword)) t.is(0, sh.exec(`kubectl apply --namespace ${elasticsearchNamespace} -f ${elasticsearchTmpFile.name}`).code, 'creating an elasticsearch statefulset should work.') - t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace, 600)) + t.is(0, waitForRollout('statefulset', "elasticsearch", elasticsearchNamespace)) // Create the index and the search template sh.exec(`${kubectlExecCurl} -XPUT http://localhost:9200/${indexName} -d '${elastisearchCreateIndex}'`) From 3dec64ddd6d9bccb59d664386247f69d2fab94bf Mon Sep 17 00:00:00 2001 From: Nicolas L Date: Tue, 23 Nov 2021 14:40:26 +0100 Subject: [PATCH 23/23] chore: fix typo in changelog and organize imports Signed-off-by: Nicolas L --- CHANGELOG.md | 2 +- pkg/scalers/elasticsearch_scaler.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc3363a7618..af9fe71f316 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ - Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181)) - Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225)) - Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187)) -- Add an elasticsearch scaler based on search template ([#2311](https://github.com/kedacore/keda/pull/2311)) +- Add Elasticsearch Scaler based on search template ([#2311](https://github.com/kedacore/keda/pull/2311)) - Cache metric names provided by KEDA Metrics Server ([#2279](https://github.com/kedacore/keda/pull/2279)) ### Improvements diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index 42e7de2dc6f..cf5330112bc 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -12,7 +12,6 @@ import ( "strings" "github.com/elastic/go-elasticsearch/v7" - kedautil "github.com/kedacore/keda/v2/pkg/util" "github.com/tidwall/gjson" "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -20,6 +19,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" ) type elasticsearchScaler struct {