From eb8bd0f89594497f7660e13c3c4be3a8259e1001 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Melisa=20Tanr=C4=B1verdi?= Date: Mon, 3 Jan 2022 13:06:10 +0300 Subject: [PATCH 1/6] Add ActiveMQ scaler (#2305) Signed-off-by: melisatanrverdi --- pkg/scalers/activemq_scaler.go | 278 ++++++++++++++++ pkg/scalers/activemq_scaler_test.go | 273 ++++++++++++++++ pkg/scaling/scale_handler.go | 2 + tests/scalers/activemq.test.ts | 486 ++++++++++++++++++++++++++++ 4 files changed, 1039 insertions(+) create mode 100644 pkg/scalers/activemq_scaler.go create mode 100644 pkg/scalers/activemq_scaler_test.go create mode 100644 tests/scalers/activemq.test.ts diff --git a/pkg/scalers/activemq_scaler.go b/pkg/scalers/activemq_scaler.go new file mode 100644 index 00000000000..5e854ad26f2 --- /dev/null +++ b/pkg/scalers/activemq_scaler.go @@ -0,0 +1,278 @@ +package scalers + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "text/template" + + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type activeMQScaler struct { + metadata *activeMQMetadata + httpClient *http.Client +} + +type activeMQMetadata struct { + managementEndpoint string + destinationName string + brokerName string + username string + password string + restAPITemplate string + targetQueueSize int + metricName string + scalerIndex int +} + +type activeMQMonitoring struct { + MsgCount int `json:"value"` + Status int `json:"status"` + Timestamp int64 `json:"timestamp"` +} + +const ( + defaultTargetQueueSize = 10 + defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize" +) + +var activeMQLog = logf.Log.WithName("activeMQ_scaler") + +// NewActiveMQScaler creates a new activeMQ Scaler +func NewActiveMQScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseActiveMQMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing ActiveMQ metadata: %s", err) + } + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + + return &activeMQScaler{ + metadata: meta, + httpClient: httpClient, + }, nil +} + +func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) { + meta := activeMQMetadata{} + + if val, ok := config.TriggerMetadata["restAPITemplate"]; ok && val != "" { + meta.restAPITemplate = config.TriggerMetadata["restAPITemplate"] + var err error + if meta, err = getRestAPIParameters(meta); err != nil { + return nil, fmt.Errorf("can't parse restAPITemplate : %s ", err) + } + } else { + meta.restAPITemplate = defaultActiveMQRestAPITemplate + if config.TriggerMetadata["managementEndpoint"] == "" { + return nil, errors.New("no management endpoint given") + } + meta.managementEndpoint = config.TriggerMetadata["managementEndpoint"] + + if config.TriggerMetadata["destinationName"] == "" { + return nil, errors.New("no destination name given") + } + meta.destinationName = config.TriggerMetadata["destinationName"] + + if config.TriggerMetadata["brokerName"] == "" { + return nil, errors.New("no broker name given") + } + meta.brokerName = config.TriggerMetadata["brokerName"] + } + + if val, ok := config.TriggerMetadata["targetQueueSize"]; ok { + queueSize, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("invalid targetQueueSize - must be an integer") + } + + meta.targetQueueSize = queueSize + } else { + meta.targetQueueSize = defaultTargetQueueSize + } + + if val, ok := config.AuthParams["username"]; ok && val != "" { + meta.username = val + } else if val, ok := config.TriggerMetadata["username"]; ok && val != "" { + username := val + + if val, ok := config.ResolvedEnv[username]; ok && val != "" { + meta.username = val + } else { + meta.username = username + } + } + + if meta.username == "" { + return nil, fmt.Errorf("username cannot be empty") + } + + if val, ok := config.AuthParams["password"]; ok && val != "" { + meta.password = val + } else if val, ok := config.TriggerMetadata["password"]; ok && val != "" { + password := val + + if val, ok := config.ResolvedEnv[password]; ok && val != "" { + meta.password = val + } else { + meta.password = password + } + } + + if meta.password == "" { + return nil, fmt.Errorf("password cannot be empty") + } + + meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, kedautil.NormalizeString(fmt.Sprintf("activemq-%s", meta.destinationName))) + + meta.scalerIndex = config.ScalerIndex + + return &meta, nil +} + +func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) { + queueSize, err := s.getQueueMessageCount(ctx) + if err != nil { + activeMQLog.Error(err, "Unable to access activeMQ management endpoint", "managementEndpoint", s.metadata.managementEndpoint) + return false, err + } + + return queueSize > 0, nil +} + +// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName +func getRestAPIParameters(meta activeMQMetadata) (activeMQMetadata, error) { + u, err := url.ParseRequestURI(meta.restAPITemplate) + if err != nil { + return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %s", err) + } + + meta.managementEndpoint = u.Host + splitURL := strings.Split(strings.Split(u.Path, ":")[1], "/")[0] // This returns : type=Broker,brokerName=<>,destinationType=Queue,destinationName=<> + replacer := strings.NewReplacer(",", "&") + v, err := url.ParseQuery(replacer.Replace(splitURL)) // This returns a map with key: string types and element type [] string. : map[brokerName:[<>] destinationName:[<>] destinationType:[Queue] type:[Broker]] + if err != nil { + return meta, fmt.Errorf("unable to parse ActiveMQ restAPITemplate: %s", err) + } + + if len(v["destinationName"][0]) == 0 { + return meta, errors.New("no destinationName is given") + } + meta.destinationName = v["destinationName"][0] + + if len(v["brokerName"][0]) == 0 { + return meta, fmt.Errorf("no brokerName given: %s", meta.restAPITemplate) + } + meta.brokerName = v["brokerName"][0] + + return meta, nil +} + +func (s *activeMQScaler) getMonitoringEndpoint() (string, error) { + var buf bytes.Buffer + endpoint := map[string]string{ + "ManagementEndpoint": s.metadata.managementEndpoint, + "BrokerName": s.metadata.brokerName, + "DestinationName": s.metadata.destinationName, + } + template, err := template.New("monitoring_endpoint").Parse(defaultActiveMQRestAPITemplate) + if err != nil { + return "", fmt.Errorf("error parsing template: %s", err) + } + err = template.Execute(&buf, endpoint) + if err != nil { + return "", fmt.Errorf("error executing template: %s", err) + } + monitoringEndpoint := buf.String() + return monitoringEndpoint, nil +} + +func (s *activeMQScaler) getQueueMessageCount(ctx context.Context) (int, error) { + var monitoringInfo *activeMQMonitoring + var queueMessageCount int + + client := s.httpClient + url, err := s.getMonitoringEndpoint() + if err != nil { + return -1, err + } + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return -1, err + } + + // Add HTTP Auth and Headers + req.SetBasicAuth(s.metadata.username, s.metadata.password) + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + return -1, err + } + + defer resp.Body.Close() + + if err := json.NewDecoder(resp.Body).Decode(&monitoringInfo); err != nil { + return -1, err + } + if resp.StatusCode == 200 && monitoringInfo.Status == 200 { + queueMessageCount = monitoringInfo.MsgCount + } else { + return -1, fmt.Errorf("ActiveMQ management endpoint response error code : %d %d", resp.StatusCode, monitoringInfo.Status) + } + + activeMQLog.V(1).Info(fmt.Sprintf("ActiveMQ scaler: Providing metrics based on current queue size %d queue size limit %d", queueMessageCount, s.metadata.targetQueueSize)) + + return queueMessageCount, nil +} + +// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler +func (s *activeMQScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.targetQueueSize), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +func (s *activeMQScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + queueSize, err := s.getQueueMessageCount(ctx) + if err != nil { + return nil, fmt.Errorf("error inspecting ActiveMQ queue size: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(queueSize), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return []external_metrics.ExternalMetricValue{metric}, nil +} + +func (s *activeMQScaler) Close(context.Context) error { + return nil +} diff --git a/pkg/scalers/activemq_scaler_test.go b/pkg/scalers/activemq_scaler_test.go new file mode 100644 index 00000000000..46cbabbddee --- /dev/null +++ b/pkg/scalers/activemq_scaler_test.go @@ -0,0 +1,273 @@ +package scalers + +import ( + "context" + "fmt" + "net/http" + "testing" +) + +const ( + testInvalidRestAPITemplate = "testInvalidRestAPITemplate" +) + +type parseActiveMQMetadataTestData struct { + name string + metadata map[string]string + authParams map[string]string + isError bool +} + +type activeMQMetricIdentifier struct { + metadataTestData *parseActiveMQMetadataTestData + scalerIndex int + name string +} + +// Setting metric identifier mock name +var activeMQMetricIdentifiers = []activeMQMetricIdentifier{ + {&testActiveMQMetadata[1], 0, "s0-activemq-testQueue"}, + {&testActiveMQMetadata[9], 1, "s1-activemq-testQueue"}, +} + +var testActiveMQMetadata = []parseActiveMQMetadataTestData{ + { + name: "nothing passed", + metadata: map[string]string{}, + authParams: map[string]string{}, + isError: true, + }, + { + name: "properly formed metadata", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "no metricName passed, metricName is generated from destinationName", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "Invalid targetQueueSize using a string", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "AA", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing management endpoint should fail", + metadata: map[string]string{ + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing destination name, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing broker name, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing username, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "password": "pass123", + }, + isError: true, + }, + { + name: "missing password, should fail", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + }, + isError: true, + }, + { + name: "properly formed metadata with restAPITemplate", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, + { + name: "invalid restAPITemplate, should fail", + metadata: map[string]string{ + "restAPITemplate": testInvalidRestAPITemplate, + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: true, + }, + { + name: "missing username, should fail", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "password": "pass123", + }, + isError: true, + }, + { + name: "missing password, should fail", + metadata: map[string]string{ + "restAPITemplate": "http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=testQueue/QueueSize", + "targetQueueSize": "10", + "metricName": "testMetricName", + }, + authParams: map[string]string{ + "username": "testUsername", + }, + isError: true, + }, +} + +func TestParseActiveMQMetadata(t *testing.T) { + for _, testData := range testActiveMQMetadata { + t.Run(testData.name, func(t *testing.T) { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + if metadata != nil && metadata.password != "" && metadata.password != testData.authParams["password"] { + t.Error("Expected password from configuration but found something else: ", metadata.password) + fmt.Println(testData) + } + }) + } +} + +var testDefaultTargetQueueSize = []parseActiveMQMetadataTestData{ + { + name: "properly formed metadata", + metadata: map[string]string{ + "managementEndpoint": "localhost:8161", + "destinationName": "testQueue", + "brokerName": "localhost", + }, + authParams: map[string]string{ + "username": "testUsername", + "password": "pass123", + }, + isError: false, + }, +} + +func TestParseDefaultTargetQueueSize(t *testing.T) { + for _, testData := range testDefaultTargetQueueSize { + t.Run(testData.name, func(t *testing.T) { + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + switch { + case err != nil && !testData.isError: + t.Error("Expected success but got error", err) + case testData.isError && err == nil: + t.Error("Expected error but got success") + case metadata.targetQueueSize != defaultTargetQueueSize: + t.Error("Expected default targetQueueSize =", defaultTargetQueueSize, "but got", metadata.targetQueueSize) + } + }) + } +} + +func TestActiveMQGetMetricSpecForScaling(t *testing.T) { + for _, testData := range activeMQMetricIdentifiers { + ctx := context.Background() + metadata, err := parseActiveMQMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockActiveMQScaler := activeMQScaler{ + metadata: metadata, + httpClient: http.DefaultClient, + } + + metricSpec := mockActiveMQScaler.GetMetricSpecForScaling(ctx) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Errorf("Wrong External metric source name: %s, expected: %s", metricName, testData.name) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 12bf73ead4f..182dbdb9a33 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -327,6 +327,8 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp func buildScaler(ctx context.Context, client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { // TRIGGERS-START switch triggerType { + case "activemq": + return scalers.NewActiveMQScaler(config) case "artemis-queue": return scalers.NewArtemisQueueScaler(config) case "aws-cloudwatch": diff --git a/tests/scalers/activemq.test.ts b/tests/scalers/activemq.test.ts new file mode 100644 index 00000000000..a0a4391ec1a --- /dev/null +++ b/tests/scalers/activemq.test.ts @@ -0,0 +1,486 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForRollout} from './helpers' + +const activeMQNamespace = 'activemq-test' +const activemqConf = '/opt/apache-activemq-5.16.3/conf' +const activemqHome = '/opt/apache-activemq-5.16.3' +const activeMQPath = 'bin/activemq' +const activeMQUsername = 'admin' +const activeMQPassword = 'admin' +const destinationName = 'testQ' +const nginxDeploymentName = 'nginx-deployment' + +test.before(t => { + // install ActiveMQ + sh.exec(`kubectl create namespace ${activeMQNamespace}`) + const activeMQTmpFile = tmp.fileSync() + fs.writeFileSync(activeMQTmpFile.name, activeMQDeployYaml) + + t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${activeMQTmpFile.name}`).code, 'creating ActiveMQ deployment should work.') + t.is(0, waitForRollout('deployment', "activemq", activeMQNamespace)) + + const activeMQPod = sh.exec(`kubectl get pods --selector=app=activemq-app -n ${activeMQNamespace} -o jsonpath='{.items[0].metadata.name'}`).stdout + + // ActiveMQ ready check + let activeMQReady + for (let i = 0; i < 30; i++) { + activeMQReady = sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- curl -u ${activeMQUsername}:${activeMQPassword} -s http://localhost:8161/api/jolokia/exec/org.apache.activemq:type=Broker,brokerName=localhost,service=Health/healthStatus | sed -e 's/[{}]/''/g' | awk -v RS=',"' -F: '/^status/ {print $2}'`) + if (activeMQReady != 200) { + sh.exec('sleep 5s') + } + else { + break + } + } + + // deploy Nginx, scaledobject etc. + const nginxTmpFile = tmp.fileSync() + fs.writeFileSync(nginxTmpFile.name, nginxDeployYaml) + + t.is(0, sh.exec(`kubectl apply --namespace ${activeMQNamespace} -f ${nginxTmpFile.name}`).code, 'creating Nginx deployment should work.') + t.is(0, waitForRollout('deployment', "nginx-deployment", activeMQNamespace)) +}) + +test.serial('Deployment should have 0 replicas on start', t => { + const replicaCount = sh.exec(`kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial('Deployment should scale to 5 (the max) with 1000 messages on the queue then back to 0', t => { + const activeMQPod = sh.exec(`kubectl get pods --selector=app=activemq-app -n ${activeMQNamespace} -o jsonpath='{.items[0].metadata.name'}`).stdout + + // produce 1000 messages to ActiveMQ + t.is( + 0, + sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- ${activeMQPath} producer --destination ${destinationName} --messageCount 1000`).code, + 'produce 1000 message to the ActiveMQ queue' + ) + + let replicaCount = '0' + const maxReplicaCount = '5' + + for (let i = 0; i < 30 && replicaCount !== maxReplicaCount; i++) { + replicaCount = sh.exec(`kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== maxReplicaCount) { + sh.exec('sleep 2s') + } + } + t.is(maxReplicaCount, replicaCount, `Replica count should be ${maxReplicaCount} after 60 seconds`) + sh.exec('sleep 30s') + + // consume all messages from ActiveMQ + t.is( + 0, + sh.exec(`kubectl exec -n ${activeMQNamespace} ${activeMQPod} -- ${activeMQPath} consumer --destination ${destinationName} --messageCount 1000`).code, + 'consume all messages' + ) + + for (let i = 0; i < 50 && replicaCount !== '0'; i++) { + replicaCount = sh.exec( + `kubectl get deploy/${nginxDeploymentName} --namespace ${activeMQNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== '0') { + sh.exec('sleep 5s') + } + } + t.is('0', replicaCount, 'Replica count should be 0 after 3 minutes') + +}) + +test.after.always((t) => { + t.is(0, sh.exec(`kubectl delete namespace ${activeMQNamespace}`).code, 'Should delete ActiveMQ namespace') +}) + +const activeMQDeployYaml = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: activemq-app + name: activemq +spec: + replicas: 1 + revisionHistoryLimit: 10 + selector: + matchLabels: + app: activemq-app + template: + metadata: + labels: + app: activemq-app + spec: + containers: + - image: symptoma/activemq:5.16.3 + imagePullPolicy: IfNotPresent + name: activemq + ports: + - containerPort: 61616 + name: jmx + protocol: TCP + - containerPort: 8161 + name: ui + protocol: TCP + - containerPort: 61616 + name: openwire + protocol: TCP + - containerPort: 5672 + name: amqp + protocol: TCP + - containerPort: 61613 + name: stomp + protocol: TCP + - containerPort: 1883 + name: mqtt + protocol: TCP + resources: + requests: + memory: 500Mi + cpu: 200m + limits: + memory: 1000Mi + cpu: 400m + volumeMounts: + - name: activemq-config + mountPath: /opt/apache-activemq-5.16.3/webapps/api/WEB-INF/classes/jolokia-access.xml + subPath: jolokia-access.xml + - name: remote-access-cm + mountPath: /opt/apache-activemq-5.16.3/conf/jetty.xml + subPath: jetty.xml + volumes: + - name: activemq-config + configMap: + name: activemq-config + items: + - key: jolokia-access.xml + path: jolokia-access.xml + - name: remote-access-cm + configMap: + name: remote-access-cm + items: + - key: jetty.xml + path: jetty.xml +--- +apiVersion: v1 +kind: Service +metadata: + name: activemq +spec: + type: ClusterIP + selector: + app: activemq-app + ports: + - name: dashboard + port: 8161 + targetPort: 8161 + protocol: TCP + - name: openwire + port: 61616 + targetPort: 61616 + protocol: TCP + - name: amqp + port: 5672 + targetPort: 5672 + protocol: TCP + - name: stomp + port: 61613 + targetPort: 61613 + protocol: TCP + - name: mqtt + port: 1883 + targetPort: 1883 + protocol: TCP +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: activemq-config +data: + jolokia-access.xml: | + + + + 0.0.0.0/0 + + + + + com.sun.management:type=DiagnosticCommand + * + * + + + com.sun.management:type=HotSpotDiagnostic + * + * + + + +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: remote-access-cm +data: + jetty.xml: | + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + index.html + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +` +const nginxDeployYaml = ` +apiVersion: v1 +kind: Secret +metadata: + name: activemq-secret +type: Opaque +data: + activemq-password: YWRtaW4= + activemq-username: YWRtaW4= +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: trigger-auth-activemq +spec: + secretTargetRef: + - parameter: username + name: activemq-secret + key: activemq-username + - parameter: password + name: activemq-secret + key: activemq-password +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: nginx + name: ${nginxDeploymentName} +spec: + replicas: 0 + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - image: nginx + name: nginx + ports: + - containerPort: 80 +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: activemq-scaledobject + labels: + deploymentName: ${nginxDeploymentName} +spec: + scaleTargetRef: + name: ${nginxDeploymentName} + pollingInterval: 5 + cooldownPeriod: 5 + minReplicaCount: 0 + maxReplicaCount: 5 + triggers: + - type: activemq + metadata: + managementEndpoint: "activemq.${activeMQNamespace}:8161" + destinationName: "testQ" + brokerName: "localhost" + authenticationRef: + name: trigger-auth-activemq +` From 80a491e5ff973ab2e71c3c95187783e6824896bc Mon Sep 17 00:00:00 2001 From: Jorge Turrado Ferrero Date: Mon, 3 Jan 2022 11:40:22 +0100 Subject: [PATCH 2/6] Azure EventHub Scaler: don't expose connection string in metricName (#2404) Signed-off-by: jorturfer Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> --- pkg/scalers/azure_eventhub_scaler.go | 2 +- pkg/scalers/azure_eventhub_scaler_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 9db01737565..93f4a885f67 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -255,7 +255,7 @@ func (scaler *azureEventHubScaler) GetMetricSpecForScaling(context.Context) []v2 targetMetricVal := resource.NewQuantity(scaler.metadata.threshold, resource.DecimalSI) externalMetric := &v2beta2.ExternalMetricSource{ Metric: v2beta2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(scaler.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-eventhub-%s", scaler.metadata.eventHubInfo.EventHubConnection))), + Name: GenerateMetricNameWithIndex(scaler.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-eventhub-%s", scaler.metadata.eventHubInfo.EventHubConsumerGroup))), }, Target: v2beta2.MetricTarget{ Type: v2beta2.AverageValueMetricType, diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 636f7f7ffdc..21eb02982a0 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -66,8 +66,8 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat } var eventHubMetricIdentifiers = []eventHubMetricIdentifier{ - {&parseEventHubMetadataDataset[1], 0, "s0-azure-eventhub-none"}, - {&parseEventHubMetadataDataset[1], 1, "s1-azure-eventhub-none"}, + {&parseEventHubMetadataDataset[1], 0, "s0-azure-eventhub-testEventHubConsumerGroup"}, + {&parseEventHubMetadataDataset[1], 1, "s1-azure-eventhub-testEventHubConsumerGroup"}, } var testEventHubScaler = azureEventHubScaler{ From a36ce5607f41c05df3a017d727faaa70e5484998 Mon Sep 17 00:00:00 2001 From: Bert Verstraete Date: Mon, 3 Jan 2022 11:42:08 +0100 Subject: [PATCH 3/6] Kafka scaler: concurrent offset fetches (#2405) Signed-off-by: VerstraeteBert --- pkg/scalers/kafka_scaler.go | 88 +++++++++++++++++++++++++++---------- tests/README.md | 2 +- tests/scalers/kafka.test.ts | 10 ++--- 3 files changed, 72 insertions(+), 28 deletions(-) diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 0007da4fc9d..b23f7a569be 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/Shopify/sarama" v2beta2 "k8s.io/api/autoscaling/v2beta2" @@ -214,18 +215,13 @@ func (s *kafkaScaler) IsActive(ctx context.Context) (bool, error) { return false, err } - offsets, err := s.getOffsets(partitions) - if err != nil { - return false, err - } - - topicOffsets, err := s.getTopicOffsets(partitions) + consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions) if err != nil { return false, err } for _, partition := range partitions { - lag, err := s.getLagForPartition(partition, offsets, topicOffsets) + lag, err := s.getLagForPartition(partition, consumerOffsets, producerOffsets) if err != nil && lag == invalidOffset { return true, nil } @@ -307,7 +303,7 @@ func (s *kafkaScaler) getPartitions() ([]int32, error) { return partitions, nil } -func (s *kafkaScaler) getOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) { +func (s *kafkaScaler) getConsumerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, error) { offsets, err := s.admin.ListConsumerGroupOffsets(s.metadata.group, map[string][]int32{ s.metadata.topic: partitions, }) @@ -364,6 +360,42 @@ func (s *kafkaScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricS return []v2beta2.MetricSpec{metricSpec} } +type consumerOffsetResult struct { + consumerOffsets *sarama.OffsetFetchResponse + err error +} + +type producerOffsetResult struct { + producerOffsets map[int32]int64 + err error +} + +func (s *kafkaScaler) getConsumerAndProducerOffsets(partitions []int32) (*sarama.OffsetFetchResponse, map[int32]int64, error) { + consumerChan := make(chan consumerOffsetResult, 1) + go func() { + consumerOffsets, err := s.getConsumerOffsets(partitions) + consumerChan <- consumerOffsetResult{consumerOffsets, err} + }() + + producerChan := make(chan producerOffsetResult, 1) + go func() { + producerOffsets, err := s.getProducerOffsets(partitions) + producerChan <- producerOffsetResult{producerOffsets, err} + }() + + consumerRes := <-consumerChan + if consumerRes.err != nil { + return nil, nil, consumerRes.err + } + + producerRes := <-producerChan + if producerRes.err != nil { + return nil, nil, producerRes.err + } + + return consumerRes.consumerOffsets, producerRes.producerOffsets, nil +} + // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { partitions, err := s.getPartitions() @@ -371,19 +403,14 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS return []external_metrics.ExternalMetricValue{}, err } - offsets, err := s.getOffsets(partitions) - if err != nil { - return []external_metrics.ExternalMetricValue{}, err - } - - topicOffsets, err := s.getTopicOffsets(partitions) + consumerOffsets, producerOffsets, err := s.getConsumerAndProducerOffsets(partitions) if err != nil { return []external_metrics.ExternalMetricValue{}, err } totalLag := int64(0) for _, partition := range partitions { - lag, _ := s.getLagForPartition(partition, offsets, topicOffsets) + lag, _ := s.getLagForPartition(partition, consumerOffsets, producerOffsets) totalLag += lag } @@ -406,7 +433,12 @@ func (s *kafkaScaler) GetMetrics(ctx context.Context, metricName string, metricS return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, error) { +type brokerOffsetResult struct { + offsetResp *sarama.OffsetResponse + err error +} + +func (s *kafkaScaler) getProducerOffsets(partitions []int32) (map[int32]int64, error) { version := int16(0) if s.client.Config().Version.IsAtLeast(sarama.V0_10_1_0) { version = 1 @@ -430,17 +462,29 @@ func (s *kafkaScaler) getTopicOffsets(partitions []int32) (map[int32]int64, erro request.AddBlock(s.metadata.topic, partitionID, sarama.OffsetNewest, 1) } - offsets := make(map[int32]int64) - // Step 2: send requests, one per broker, and collect offsets + resultCh := make(chan brokerOffsetResult, len(requests)) + var wg sync.WaitGroup + wg.Add(len(requests)) for broker, request := range requests { - response, err := broker.GetAvailableOffsets(request) + go func(brCopy *sarama.Broker, reqCopy *sarama.OffsetRequest) { + defer wg.Done() + response, err := brCopy.GetAvailableOffsets(reqCopy) + resultCh <- brokerOffsetResult{response, err} + }(broker, request) + } - if err != nil { - return nil, err + wg.Wait() + close(resultCh) + + offsets := make(map[int32]int64) + + for brokerOffsetRes := range resultCh { + if brokerOffsetRes.err != nil { + return nil, brokerOffsetRes.err } - for _, blocks := range response.Blocks { + for _, blocks := range brokerOffsetRes.offsetResp.Blocks { for partitionID, block := range blocks { if block.Err != sarama.ErrNoError { return nil, block.Err diff --git a/tests/README.md b/tests/README.md index f976dbb6c13..963fc4026ea 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,4 +1,4 @@ -## Prerequisits +## Prerequisites - [node](https://nodejs.org/en/) - `kubectl` logged into a Kubernetes cluster. diff --git a/tests/scalers/kafka.test.ts b/tests/scalers/kafka.test.ts index 61413775ab9..2e5932990bd 100644 --- a/tests/scalers/kafka.test.ts +++ b/tests/scalers/kafka.test.ts @@ -11,7 +11,7 @@ const defaultKafkaClient = 'kafka-client' const strimziOperatorVersion = '0.18.0' const commandToCheckReplicas = `kubectl get deployments/kafka-consumer --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"` -const strimziOperatroYamlFile = tmp.fileSync() +const strimziOperatorYamlFile = tmp.fileSync() const kafkaClusterYamlFile = tmp.fileSync() const kafkaTopicYamlFile = tmp.fileSync() const kafkaClientYamlFile = tmp.fileSync() @@ -25,10 +25,10 @@ test.before('Set up, create necessary resources.', t => { sh.exec(`kubectl create namespace ${defaultNamespace}`) const strimziOperatorYaml = sh.exec(`curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/${strimziOperatorVersion}/strimzi-cluster-operator-${strimziOperatorVersion}.yaml`).stdout - fs.writeFileSync(strimziOperatroYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`)) + fs.writeFileSync(strimziOperatorYamlFile.name, strimziOperatorYaml.replace(/myproject/g, `${defaultNamespace}`)) t.is( 0, - sh.exec(`kubectl apply -f ${strimziOperatroYamlFile.name} --namespace ${defaultNamespace}`).code, + sh.exec(`kubectl apply -f ${strimziOperatorYamlFile.name} --namespace ${defaultNamespace}`).code, 'Deploying Strimzi operator should work.' ) @@ -195,7 +195,7 @@ test.after.always('Clean up, delete created resources.', t => { `${kafkaClientYamlFile.name}`, `${kafkaTopicYamlFile.name}`, `${kafkaClusterYamlFile.name}`, - `${strimziOperatroYamlFile}` + `${strimziOperatorYamlFile}` ] for (const resource of resources) { @@ -212,7 +212,7 @@ metadata: spec: kafka: version: 2.5.0 - replicas: 1 + replicas: 3 listeners: plain: {} tls: {} From 7b041575ca680795e65e323d611e331afe2f4047 Mon Sep 17 00:00:00 2001 From: Jorge Turrado Ferrero Date: Mon, 3 Jan 2022 11:44:45 +0100 Subject: [PATCH 4/6] Delete the cache entry when a ScaledObject is deleted (#2408) Signed-off-by: jorturfer Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> --- apis/keda/v1alpha1/withtriggers_types.go | 3 +- .../keda/scaledobject_controller_test.go | 75 +++++++++++++++++++ go.mod | 1 + go.sum | 1 + pkg/mock/mock_client/mock_interfaces.go | 2 +- pkg/mock/mock_scale/mock_interfaces.go | 2 +- pkg/scaling/scale_handler.go | 3 +- 7 files changed, 83 insertions(+), 4 deletions(-) diff --git a/apis/keda/v1alpha1/withtriggers_types.go b/apis/keda/v1alpha1/withtriggers_types.go index 7dcc410ea90..00b9821cff7 100644 --- a/apis/keda/v1alpha1/withtriggers_types.go +++ b/apis/keda/v1alpha1/withtriggers_types.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "fmt" + "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -90,5 +91,5 @@ func (t *WithTriggers) GetPollingInterval() time.Duration { // GenerateIdenitifier returns identifier for the object in for "kind.namespace.name" func (t *WithTriggers) GenerateIdenitifier() string { - return fmt.Sprintf("%s.%s.%s", t.Kind, t.Namespace, t.Name) + return strings.ToLower(fmt.Sprintf("%s.%s.%s", t.Kind, t.Namespace, t.Name)) } diff --git a/controllers/keda/scaledobject_controller_test.go b/controllers/keda/scaledobject_controller_test.go index a8052bcbe4d..bf40980a5d3 100644 --- a/controllers/keda/scaledobject_controller_test.go +++ b/controllers/keda/scaledobject_controller_test.go @@ -292,6 +292,81 @@ var _ = Describe("ScaledObjectController", func() { Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("s0-cron-UTC-0xxxx-1xxxx")) }) + //https://github.com/kedacore/keda/issues/2407 + It("cache is correctly recreated if SO is deleted and created", func() { + // Create the scaling target. + err := k8sClient.Create(context.Background(), generateDeployment("cache-regenerate")) + Expect(err).ToNot(HaveOccurred()) + + // Create the ScaledObject with one trigger. + so := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{Name: "cache-regenerate", Namespace: "default"}, + Spec: kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + Name: "cache-regenerate", + }, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + err = k8sClient.Create(context.Background(), so) + Expect(err).ToNot(HaveOccurred()) + + // Get and confirm the HPA. + hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} + Eventually(func() error { + return k8sClient.Get(context.Background(), types.NamespacedName{Name: "keda-hpa-cache-regenerate", Namespace: "default"}, hpa) + }).ShouldNot(HaveOccurred()) + Expect(hpa.Spec.Metrics).To(HaveLen(1)) + Expect(hpa.Spec.Metrics[0].External.Metric.Name).To(Equal("s0-cron-UTC-0xxxx-1xxxx")) + + // Delete the ScaledObject + err = k8sClient.Delete(context.Background(), so) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(30 * time.Second) + + // Create the same ScaledObject with a change in the trigger. + so = &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{Name: "cache-regenerate", Namespace: "default"}, + Spec: kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + Name: "cache-regenerate", + }, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "CET", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + err = k8sClient.Create(context.Background(), so) + Expect(err).ToNot(HaveOccurred()) + time.Sleep(30 * time.Second) + + // Get and confirm the HPA. + hpa2 := &autoscalingv2beta2.HorizontalPodAutoscaler{} + Eventually(func() error { + return k8sClient.Get(context.Background(), types.NamespacedName{Name: "keda-hpa-cache-regenerate", Namespace: "default"}, hpa2) + }).ShouldNot(HaveOccurred()) + Expect(hpa2.Spec.Metrics).To(HaveLen(1)) + Expect(hpa2.Spec.Metrics[0].External.Metric.Name).To(Equal("s0-cron-CET-0xxxx-1xxxx")) + }) + It("deploys ScaledObject and creates HPA, when IdleReplicaCount, MinReplicaCount and MaxReplicaCount is defined", func() { deploymentName := "idleminmax" diff --git a/go.mod b/go.mod index 59db30a8f31..f8c4d9bfbad 100644 --- a/go.mod +++ b/go.mod @@ -111,6 +111,7 @@ require ( github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.15 // indirect github.com/go-stack/stack v1.8.0 // indirect + github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect diff --git a/go.sum b/go.sum index 4406f76bd1c..3ba4ffdc45a 100644 --- a/go.sum +++ b/go.sum @@ -364,6 +364,7 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.2 h1:onZX1rnHT3Wv6cqNgYyFOOlgVKJrksuCMCRvJStbMYw= github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= diff --git a/pkg/mock/mock_client/mock_interfaces.go b/pkg/mock/mock_client/mock_interfaces.go index b33495dcdf2..0dae7ff258a 100644 --- a/pkg/mock/mock_client/mock_interfaces.go +++ b/pkg/mock/mock_client/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /go/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.2/pkg/client/interfaces.go +// Source: /go/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.3/pkg/client/interfaces.go // Package mock_client is a generated GoMock package. package mock_client diff --git a/pkg/mock/mock_scale/mock_interfaces.go b/pkg/mock/mock_scale/mock_interfaces.go index 48c6afb71bd..f889afc77b6 100644 --- a/pkg/mock/mock_scale/mock_interfaces.go +++ b/pkg/mock/mock_scale/mock_interfaces.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: /home/ahmed/go/pkg/mod/k8s.io/client-go@v0.22.2/scale/interfaces.go +// Source: /go/pkg/mod/k8s.io/client-go@v0.22.4/scale/interfaces.go // Package mock_scale is a generated GoMock package. package mock_scale diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 182dbdb9a33..45bbf788779 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -126,6 +126,7 @@ func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject cancel() } h.scaleLoopContexts.Delete(key) + delete(h.scalerCaches, key) h.recorder.Event(withTriggers, corev1.EventTypeNormal, eventreason.KEDAScalersStopped, "Stopped scalers watch") } else { h.logger.V(1).Info("ScaleObject was not found in controller cache", "key", key) @@ -163,7 +164,7 @@ func (h *scaleHandler) GetScalersCache(ctx context.Context, scalableObject inter return nil, err } - key := strings.ToLower(fmt.Sprintf("%s.%s.%s", withTriggers.Kind, withTriggers.Name, withTriggers.Namespace)) + key := withTriggers.GenerateIdenitifier() h.lock.RLock() if cache, ok := h.scalerCaches[key]; ok && cache.Generation == withTriggers.Generation { From 09250e5173beb732118ff72a65057d8e05025084 Mon Sep 17 00:00:00 2001 From: Tomas P Date: Mon, 20 Dec 2021 05:19:09 -0300 Subject: [PATCH 5/6] fix: add list and watch access for service accounts to cluster role (#2406) Signed-off-by: tomasspi --- config/rbac/role.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 259a76019e2..cc67337efde 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -25,6 +25,13 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - serviceaccounts + verbs: + - list + - watch - apiGroups: - '*' resources: From 74f10aff6e84ddc088bc7ae11798eab151daa1ec Mon Sep 17 00:00:00 2001 From: Jorge Turrado Ferrero Date: Mon, 20 Dec 2021 10:47:02 +0100 Subject: [PATCH 6/6] fix: add list and watch access for service accounts to cluster role (#2410) * fix: add list and watch access for service accounts to cluster role Signed-off-by: Jorge Turrado * Update changelog Signed-off-by: Jorge Turrado --- controllers/keda/scaledobject_controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index a631ce29c9c..fb28d3e1ea2 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -57,6 +57,7 @@ import ( // +kubebuilder:rbac:groups="",resources=configmaps;configmaps/status;events,verbs="*" // +kubebuilder:rbac:groups="",resources=pods;services;services;secrets;external,verbs=get;list;watch // +kubebuilder:rbac:groups="*",resources="*/scale",verbs="*" +// +kubebuilder:rbac:groups="",resources="serviceaccounts",verbs=list;watch // +kubebuilder:rbac:groups="*",resources="*",verbs=get // +kubebuilder:rbac:groups="apps",resources=deployments;statefulsets,verbs=list;watch // +kubebuilder:rbac:groups="coordination.k8s.io",resources=leases,verbs="*"