diff --git a/CHANGELOG.md b/CHANGELOG.md index d7fedce792a..a2779c2bc79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ Here is an overview of all new **experimental** features: ### Fixes - **General**: Hashicorp Vault PKI doesn't fail with due to KeyPair mismatch ([#6028](https://github.com/kedacore/keda/issues/6028)) +- **JetStream**: Consumer leader change is correctly detected ([#6042](https://github.com/kedacore/keda/issues/6042)) ### Deprecations diff --git a/pkg/scalers/nats_jetstream_scaler.go b/pkg/scalers/nats_jetstream_scaler.go index 52f8043b2bd..47a0bbe1f36 100644 --- a/pkg/scalers/nats_jetstream_scaler.go +++ b/pkg/scalers/nats_jetstream_scaler.go @@ -218,20 +218,6 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context } if s.metadata.clusterSize > 1 { - // we know who the consumer leader and its monitoring url is, query it directly - if s.metadata.consumerLeader != "" && s.metadata.monitoringLeaderURL != "" { - natsJetStreamMonitoringLeaderURL := s.metadata.monitoringLeaderURL - - jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL) - if err != nil { - return err - } - - s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringLeaderURL) - return nil - } - - // we haven't found the consumer yet, grab the list of hosts and try each one natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL("") if err != nil { return err diff --git a/pkg/scalers/nats_jetstream_scaler_test.go b/pkg/scalers/nats_jetstream_scaler_test.go index 10312d3873c..c77811011f7 100644 --- a/pkg/scalers/nats_jetstream_scaler_test.go +++ b/pkg/scalers/nats_jetstream_scaler_test.go @@ -440,22 +440,6 @@ func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) { } } -func TestInvalidateNATSJetStreamCachedMonitoringData(t *testing.T) { - meta, err := parseNATSJetStreamMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0}) - if err != nil { - t.Fatal("Could not parse metadata:", err) - } - - mockJetStreamScaler := natsJetStreamScaler{ - stream: nil, - metadata: meta, - httpClient: http.DefaultClient, - logger: InitializeLogger(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0}, "nats_jetstream_scaler"), - } - - mockJetStreamScaler.invalidateNATSJetStreamCachedMonitoringData() -} - func TestNATSJetStreamClose(t *testing.T) { mockJetStreamScaler, err := NewNATSJetStreamScaler(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0}) if err != nil { diff --git a/tests/scalers/nats_jetstream/helper/nats_helper.go b/tests/scalers/nats_jetstream/helper/nats_helper.go index bbce0aca4e4..bc056d535ba 100644 --- a/tests/scalers/nats_jetstream/helper/nats_helper.go +++ b/tests/scalers/nats_jetstream/helper/nats_helper.go @@ -100,6 +100,28 @@ spec: ] restartPolicy: OnFailure backoffLimit: 4 + ` + + StepDownConsumer = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: step-down + namespace: {{.TestNamespace}} +spec: + ttlSecondsAfterFinished: 15 + template: + spec: + containers: + - name: stepdown + image: "natsio/nats-box:0.13.2" + imagePullPolicy: Always + command: [ + 'sh', '-c', 'nats context save local --server {{.NatsAddress}} --select && + nats consumer cluster step-down {{.NatsStream}} {{.NatsConsumer}}' + ] + restartPolicy: OnFailure + backoffLimit: 4 ` DeploymentTemplate = ` diff --git a/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go b/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go index 7fff0d49638..61e631780ca 100644 --- a/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go +++ b/tests/scalers/nats_jetstream/nats_jetstream_cluster/nats_jetstream_cluster_test.go @@ -234,6 +234,10 @@ func testActivation(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeployme func testScaleOut(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeploymentTemplateData) { t.Log("--- testing scale out ---") + // We force the change of consumer leader to ensure that KEDA detects the change and + // handles it properly + KubectlApplyWithTemplate(t, data, "stepDownTemplate", nats.StepDownConsumer) + KubectlApplyWithTemplate(t, data, "publishJobTemplate", nats.PublishJobTemplate) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3),