Skip to content

Commit

Permalink
Refactor etcd scaler metadata
Browse files Browse the repository at this point in the history
Signed-off-by: wangrushen <wrs369@88.com>
  • Loading branch information
dovics committed Sep 30, 2024
1 parent 69a9cb9 commit f53de4f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 114 deletions.
140 changes: 48 additions & 92 deletions pkg/scalers/etcd_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -38,18 +37,39 @@ type etcdScaler struct {
}

type etcdMetadata struct {
endpoints []string
watchKey string
value float64
activationValue float64
watchProgressNotifyInterval int
triggerIndex int
triggerIndex int

Endpoints []string `keda:"name=endpoints, order=triggerMetadata"`
WatchKey string `keda:"name=watchKey, order=triggerMetadata"`
Value float64 `keda:"name=value, order=triggerMetadata"`
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"`
WatchProgressNotifyInterval int `keda:"name=watchProgressNotifyInterval, order=triggerMetadata, optional, default=600"`

// TLS
enableTLS bool
cert string
key string
keyPassword string
ca string
EnableTLS string `keda:"name=tls, order=authParams, optional, default=disable"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
Ca string `keda:"name=ca, order=authParams, optional"`
}

func (meta *etcdMetadata) Validate() error {
if meta.WatchProgressNotifyInterval <= 0 {
return errors.New("watchProgressNotifyInterval must be greater than 0")
}

if meta.EnableTLS == etcdTLSEnable {
if meta.Cert == "" && meta.Key != "" {
return errors.New("cert must be provided with key")
}
if meta.Key == "" && meta.Cert != "" {
return errors.New("key must be provided with cert")
}
} else if meta.EnableTLS != etcdTLSDisable {
return fmt.Errorf("incorrect value for TLS given: %s", meta.EnableTLS)
}

return nil
}

// NewEtcdScaler creates a new etcdScaler
Expand All @@ -76,75 +96,11 @@ func NewEtcdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}, nil
}

func parseEtcdAuthParams(config *scalersconfig.ScalerConfig, meta *etcdMetadata) error {
meta.enableTLS = false
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)
if val == etcdTLSEnable {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
} else if val != etcdTLSDisable {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
}
}

return nil
}

func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error) {
meta := &etcdMetadata{}
var err error
meta.endpoints = strings.Split(config.TriggerMetadata[endpoints], ",")
if len(meta.endpoints) == 0 || meta.endpoints[0] == "" {
return nil, fmt.Errorf("endpoints required")
}

meta.watchKey = config.TriggerMetadata[watchKey]
if len(meta.watchKey) == 0 {
return nil, fmt.Errorf("watchKey required")
}

value, err := strconv.ParseFloat(config.TriggerMetadata[value], 64)
if err != nil || value <= 0 {
return nil, fmt.Errorf("value must be a float greater than 0")
}
meta.value = value

meta.activationValue = 0
if val, ok := config.TriggerMetadata[activationValue]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationValue must be a float")
}
meta.activationValue = activationValue
}

meta.watchProgressNotifyInterval = defaultWatchProgressNotifyInterval
if val, ok := config.TriggerMetadata[watchProgressNotifyInterval]; ok {
interval, err := strconv.Atoi(val)
if err != nil || interval <= 0 {
return nil, fmt.Errorf("watchProgressNotifyInterval must be a int greater than 0")
}
meta.watchProgressNotifyInterval = interval
}

if err = parseEtcdAuthParams(config, meta); err != nil {
return meta, err
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %w", err)
}

meta.triggerIndex = config.TriggerIndex
Expand All @@ -154,15 +110,15 @@ func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error
func getEtcdClients(metadata *etcdMetadata) (*clientv3.Client, error) {
var tlsConfig *tls.Config
var err error
if metadata.enableTLS {
tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false)
if metadata.EnableTLS == etcdTLSEnable {
tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.Cert, metadata.Key, metadata.KeyPassword, metadata.Ca, false)
if err != nil {
return nil, err
}
}

cli, err := clientv3.New(clientv3.Config{
Endpoints: metadata.endpoints,
Endpoints: metadata.Endpoints,
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
Expand All @@ -189,16 +145,16 @@ func (s *etcdScaler) GetMetricsAndActivity(ctx context.Context, metricName strin
}

metric := GenerateMetricInMili(metricName, v)
return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.activationValue, nil
return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.ActivationValue, nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA.
func (s *etcdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.watchKey))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.WatchKey))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
Target: GetMetricTargetMili(s.metricType, s.metadata.Value),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: etcdMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -209,16 +165,16 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {

// It's possible for the watch to get terminated anytime, we need to run this in a retry loop
runWithWatch := func() {
s.logger.Info("run watch", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints)
s.logger.Info("run watch", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints)
subCtx, cancel := context.WithCancel(ctx)
subCtx = clientv3.WithRequireLeader(subCtx)
rch := s.client.Watch(subCtx, s.metadata.watchKey, clientv3.WithProgressNotify())
rch := s.client.Watch(subCtx, s.metadata.WatchKey, clientv3.WithProgressNotify())

// rewatch to another etcd server when the network is isolated from the current etcd server.
progress := make(chan bool)
defer close(progress)
go func() {
delayDuration := time.Duration(s.metadata.watchProgressNotifyInterval) * 2 * time.Second
delayDuration := time.Duration(s.metadata.WatchProgressNotifyInterval) * 2 * time.Second
delay := time.NewTimer(delayDuration)
defer delay.Stop()
for {
Expand All @@ -228,7 +184,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {
case <-subCtx.Done():
return
case <-delay.C:
s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints)
s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints)
cancel()
return
}
Expand All @@ -240,7 +196,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {

// rewatch to another etcd server when there is an error form the current etcd server, such as 'no leader','required revision has been compacted'
if wresp.Err() != nil {
s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints)
s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints)
cancel()
return
}
Expand All @@ -251,7 +207,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {
s.logger.Error(err, "etcdValue invalid will be treated as 0")
v = 0
}
active <- v > s.metadata.activationValue
active <- v > s.metadata.ActivationValue
}
}
}
Expand Down Expand Up @@ -288,12 +244,12 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {
func (s *etcdScaler) getMetricValue(ctx context.Context) (float64, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
resp, err := s.client.Get(ctx, s.metadata.watchKey)
resp, err := s.client.Get(ctx, s.metadata.WatchKey)
if err != nil {
return 0, err
}
if resp.Kvs == nil {
return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.watchKey)
return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.WatchKey)
}
v, err := strconv.ParseFloat(string(resp.Kvs[0].Value), 64)
if err != nil {
Expand Down
44 changes: 22 additions & 22 deletions pkg/scalers/etcd_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type parseEtcdMetadataTestData struct {
type parseEtcdAuthParamsTestData struct {
authParams map[string]string
isError bool
enableTLS bool
enableTLS string
}

type etcdMetricIdentifier struct {
Expand Down Expand Up @@ -56,19 +56,19 @@ var parseEtcdMetadataTestDataset = []parseEtcdMetadataTestData{

var parseEtcdAuthParamsTestDataset = []parseEtcdAuthParamsTestData{
// success, TLS only
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable},
// success, TLS cert/key and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true},
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable},
// success, TLS cert/key + key password and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true},
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, etcdTLSEnable},
// success, TLS CA only
{map[string]string{"tls": "enable", "ca": "caaa"}, false, true},
{map[string]string{"tls": "enable", "ca": "caaa"}, false, etcdTLSEnable},
// failure, TLS missing cert
{map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, false},
{map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, etcdTLSDisable},
// failure, TLS missing key
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false},
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, etcdTLSDisable},
// failure, TLS invalid
{map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
{map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, etcdTLSDisable},
}

var etcdMetricIdentifiers = []etcdMetricIdentifier{
Expand All @@ -83,10 +83,10 @@ func TestParseEtcdMetadata(t *testing.T) {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
t.Errorf("Expected error but got success %v", testData)
}
if err == nil && !reflect.DeepEqual(meta.endpoints, testData.endpoints) {
t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.endpoints)
if err == nil && !reflect.DeepEqual(meta.Endpoints, testData.endpoints) {
t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.Endpoints)
}
}
}
Expand All @@ -101,21 +101,21 @@ func TestParseEtcdAuthParams(t *testing.T) {
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)
if meta != nil && meta.EnableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.EnableTLS)
}
if meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS)
if meta != nil && meta.EnableTLS == etcdTLSEnable {
if meta.Ca != testData.authParams["ca"] {
t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.EnableTLS)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert)
if meta.Cert != testData.authParams["cert"] {
t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.Cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key)
if meta.Key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.Key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key)
if meta.KeyPassword != testData.authParams["keyPassword"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.Key)
}
}
}
Expand Down

0 comments on commit f53de4f

Please sign in to comment.