diff --git a/pkg/scalers/etcd_scaler.go b/pkg/scalers/etcd_scaler.go index 83835bed93e..2d563d67aa0 100644 --- a/pkg/scalers/etcd_scaler.go +++ b/pkg/scalers/etcd_scaler.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "strconv" - "strings" "time" "github.com/go-logr/logr" @@ -38,18 +37,39 @@ type etcdScaler struct { } type etcdMetadata struct { - endpoints []string - watchKey string - value float64 - activationValue float64 - watchProgressNotifyInterval int - triggerIndex int + triggerIndex int + + Endpoints []string `keda:"name=endpoints, order=triggerMetadata"` + WatchKey string `keda:"name=watchKey, order=triggerMetadata"` + Value float64 `keda:"name=value, order=triggerMetadata"` + ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"` + WatchProgressNotifyInterval int `keda:"name=watchProgressNotifyInterval, order=triggerMetadata, optional, default=600"` + // TLS - enableTLS bool - cert string - key string - keyPassword string - ca string + EnableTLS string `keda:"name=tls, order=authParams, optional, default=disable"` + Cert string `keda:"name=cert, order=authParams, optional"` + Key string `keda:"name=key, order=authParams, optional"` + KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` + Ca string `keda:"name=ca, order=authParams, optional"` +} + +func (meta *etcdMetadata) Validate() error { + if meta.WatchProgressNotifyInterval <= 0 { + return errors.New("watchProgressNotifyInterval must be greater than 0") + } + + if meta.EnableTLS == etcdTLSEnable { + if meta.Cert == "" && meta.Key != "" { + return errors.New("cert must be provided with key") + } + if meta.Key == "" && meta.Cert != "" { + return errors.New("key must be provided with cert") + } + } else if meta.EnableTLS != etcdTLSDisable { + return fmt.Errorf("incorrect value for TLS given: %s", meta.EnableTLS) + } + + return nil } // NewEtcdScaler creates a new etcdScaler @@ -76,75 +96,11 @@ func NewEtcdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { }, nil } -func parseEtcdAuthParams(config *scalersconfig.ScalerConfig, meta *etcdMetadata) error { - meta.enableTLS = false - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - if val == etcdTLSEnable { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return errors.New("cert must be provided with key") - } - meta.ca = config.AuthParams["ca"] - meta.cert = config.AuthParams["cert"] - meta.key = config.AuthParams["key"] - if value, found := config.AuthParams["keyPassword"]; found { - meta.keyPassword = value - } else { - meta.keyPassword = "" - } - meta.enableTLS = true - } else if val != etcdTLSDisable { - return fmt.Errorf("err incorrect value for TLS given: %s", val) - } - } - - return nil -} - func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error) { meta := &etcdMetadata{} - var err error - meta.endpoints = strings.Split(config.TriggerMetadata[endpoints], ",") - if len(meta.endpoints) == 0 || meta.endpoints[0] == "" { - return nil, fmt.Errorf("endpoints required") - } - - meta.watchKey = config.TriggerMetadata[watchKey] - if len(meta.watchKey) == 0 { - return nil, fmt.Errorf("watchKey required") - } - - value, err := strconv.ParseFloat(config.TriggerMetadata[value], 64) - if err != nil || value <= 0 { - return nil, fmt.Errorf("value must be a float greater than 0") - } - meta.value = value - - meta.activationValue = 0 - if val, ok := config.TriggerMetadata[activationValue]; ok { - activationValue, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationValue must be a float") - } - meta.activationValue = activationValue - } - - meta.watchProgressNotifyInterval = defaultWatchProgressNotifyInterval - if val, ok := config.TriggerMetadata[watchProgressNotifyInterval]; ok { - interval, err := strconv.Atoi(val) - if err != nil || interval <= 0 { - return nil, fmt.Errorf("watchProgressNotifyInterval must be a int greater than 0") - } - meta.watchProgressNotifyInterval = interval - } - if err = parseEtcdAuthParams(config, meta); err != nil { - return meta, err + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %w", err) } meta.triggerIndex = config.TriggerIndex @@ -154,15 +110,15 @@ func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error func getEtcdClients(metadata *etcdMetadata) (*clientv3.Client, error) { var tlsConfig *tls.Config var err error - if metadata.enableTLS { - tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false) + if metadata.EnableTLS == etcdTLSEnable { + tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.Cert, metadata.Key, metadata.KeyPassword, metadata.Ca, false) if err != nil { return nil, err } } cli, err := clientv3.New(clientv3.Config{ - Endpoints: metadata.endpoints, + Endpoints: metadata.Endpoints, DialTimeout: 5 * time.Second, TLS: tlsConfig, }) @@ -189,16 +145,16 @@ func (s *etcdScaler) GetMetricsAndActivity(ctx context.Context, metricName strin } metric := GenerateMetricInMili(metricName, v) - return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.activationValue, nil + return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.ActivationValue, nil } // GetMetricSpecForScaling returns the metric spec for the HPA. func (s *etcdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.watchKey))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.WatchKey))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.value), + Target: GetMetricTargetMili(s.metricType, s.metadata.Value), } metricSpec := v2.MetricSpec{External: externalMetric, Type: etcdMetricType} return []v2.MetricSpec{metricSpec} @@ -209,16 +165,16 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { // It's possible for the watch to get terminated anytime, we need to run this in a retry loop runWithWatch := func() { - s.logger.Info("run watch", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Info("run watch", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) subCtx, cancel := context.WithCancel(ctx) subCtx = clientv3.WithRequireLeader(subCtx) - rch := s.client.Watch(subCtx, s.metadata.watchKey, clientv3.WithProgressNotify()) + rch := s.client.Watch(subCtx, s.metadata.WatchKey, clientv3.WithProgressNotify()) // rewatch to another etcd server when the network is isolated from the current etcd server. progress := make(chan bool) defer close(progress) go func() { - delayDuration := time.Duration(s.metadata.watchProgressNotifyInterval) * 2 * time.Second + delayDuration := time.Duration(s.metadata.WatchProgressNotifyInterval) * 2 * time.Second delay := time.NewTimer(delayDuration) defer delay.Stop() for { @@ -228,7 +184,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { case <-subCtx.Done(): return case <-delay.C: - s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) cancel() return } @@ -240,7 +196,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { // rewatch to another etcd server when there is an error form the current etcd server, such as 'no leader','required revision has been compacted' if wresp.Err() != nil { - s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints) + s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints) cancel() return } @@ -251,7 +207,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { s.logger.Error(err, "etcdValue invalid will be treated as 0") v = 0 } - active <- v > s.metadata.activationValue + active <- v > s.metadata.ActivationValue } } } @@ -288,12 +244,12 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) { func (s *etcdScaler) getMetricValue(ctx context.Context) (float64, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() - resp, err := s.client.Get(ctx, s.metadata.watchKey) + resp, err := s.client.Get(ctx, s.metadata.WatchKey) if err != nil { return 0, err } if resp.Kvs == nil { - return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.watchKey) + return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.WatchKey) } v, err := strconv.ParseFloat(string(resp.Kvs[0].Value), 64) if err != nil { diff --git a/pkg/scalers/etcd_scaler_test.go b/pkg/scalers/etcd_scaler_test.go index 9ecdb5b602a..6902c6a307b 100644 --- a/pkg/scalers/etcd_scaler_test.go +++ b/pkg/scalers/etcd_scaler_test.go @@ -19,7 +19,7 @@ type parseEtcdMetadataTestData struct { type parseEtcdAuthParamsTestData struct { authParams map[string]string isError bool - enableTLS bool + enableTLS string } type etcdMetricIdentifier struct { @@ -56,19 +56,19 @@ var parseEtcdMetadataTestDataset = []parseEtcdMetadataTestData{ var parseEtcdAuthParamsTestDataset = []parseEtcdAuthParamsTestData{ // success, TLS only - {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable}, // success, TLS cert/key and assumed public CA - {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable}, // success, TLS cert/key + key password and assumed public CA - {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true}, + {map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, etcdTLSEnable}, // success, TLS CA only - {map[string]string{"tls": "enable", "ca": "caaa"}, false, true}, + {map[string]string{"tls": "enable", "ca": "caaa"}, false, etcdTLSEnable}, // failure, TLS missing cert - {map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, false}, + {map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, etcdTLSDisable}, // failure, TLS missing key - {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false}, + {map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, etcdTLSDisable}, // failure, TLS invalid - {map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false}, + {map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, etcdTLSDisable}, } var etcdMetricIdentifiers = []etcdMetricIdentifier{ @@ -83,10 +83,10 @@ func TestParseEtcdMetadata(t *testing.T) { t.Error("Expected success but got error", err) } if testData.isError && err == nil { - t.Error("Expected error but got success") + t.Errorf("Expected error but got success %v", testData) } - if err == nil && !reflect.DeepEqual(meta.endpoints, testData.endpoints) { - t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.endpoints) + if err == nil && !reflect.DeepEqual(meta.Endpoints, testData.endpoints) { + t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.Endpoints) } } } @@ -101,21 +101,21 @@ func TestParseEtcdAuthParams(t *testing.T) { if testData.isError && err == nil { t.Error("Expected error but got success") } - if meta.enableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS) + if meta != nil && meta.EnableTLS != testData.enableTLS { + t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.EnableTLS) } - if meta.enableTLS { - if meta.ca != testData.authParams["ca"] { - t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS) + if meta != nil && meta.EnableTLS == etcdTLSEnable { + if meta.Ca != testData.authParams["ca"] { + t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.EnableTLS) } - if meta.cert != testData.authParams["cert"] { - t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert) + if meta.Cert != testData.authParams["cert"] { + t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.Cert) } - if meta.key != testData.authParams["key"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key) + if meta.Key != testData.authParams["key"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.Key) } - if meta.keyPassword != testData.authParams["keyPassword"] { - t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key) + if meta.KeyPassword != testData.authParams["keyPassword"] { + t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.Key) } } } diff --git a/tests/scalers/etcd/etcd_cluster/etcd_cluster_test.go b/tests/scalers/etcd/etcd_cluster/etcd_cluster_test.go index bf86aa3a304..fc3b103c498 100644 --- a/tests/scalers/etcd/etcd_cluster/etcd_cluster_test.go +++ b/tests/scalers/etcd/etcd_cluster/etcd_cluster_test.go @@ -101,6 +101,7 @@ spec: containers: - name: {{.EtcdClientName}} image: gcr.io/etcd-development/etcd:v3.4.10 + imagePullPolicy: Never command: - sh - -c