From cddc9ce06da2694488bac08bc1aa1529d422f447 Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Thu, 26 Sep 2024 21:00:03 +0800 Subject: [PATCH] Refactor redis scaler config (#5997) Signed-off-by: SpiritZhou --- pkg/scalers/aws_dynamodb_scaler_test.go | 10 +- pkg/scalers/redis_scaler.go | 452 ++----- pkg/scalers/redis_scaler_test.go | 440 +++--- pkg/scalers/redis_streams_scaler.go | 190 ++- pkg/scalers/redis_streams_scaler_test.go | 1176 ++++++++--------- pkg/scalers/scalersconfig/typed_config.go | 41 +- .../scalersconfig/typed_config_test.go | 42 +- 7 files changed, 1060 insertions(+), 1291 deletions(-) diff --git a/pkg/scalers/aws_dynamodb_scaler_test.go b/pkg/scalers/aws_dynamodb_scaler_test.go index f6ec68314e8..869e0b97178 100644 --- a/pkg/scalers/aws_dynamodb_scaler_test.go +++ b/pkg/scalers/aws_dynamodb_scaler_test.go @@ -39,13 +39,13 @@ type parseDynamoDBMetadataTestData struct { var ( // ErrAwsDynamoNoTableName is returned when "tableName" is missing from the config. - ErrAwsDynamoNoTableName = errors.New("missing required parameter \"tableName\"") + ErrAwsDynamoNoTableName = errors.New("missing required parameter [\"tableName\"]") // ErrAwsDynamoNoAwsRegion is returned when "awsRegion" is missing from the config. - ErrAwsDynamoNoAwsRegion = errors.New("missing required parameter \"awsRegion\"") + ErrAwsDynamoNoAwsRegion = errors.New("missing required parameter [\"awsRegion\"]") // ErrAwsDynamoNoKeyConditionExpression is returned when "keyConditionExpression" is missing from the config. - ErrAwsDynamoNoKeyConditionExpression = errors.New("missing required parameter \"keyConditionExpression\"") + ErrAwsDynamoNoKeyConditionExpression = errors.New("missing required parameter [\"keyConditionExpression\"]") ) var dynamoTestCases = []parseDynamoDBMetadataTestData{ @@ -114,7 +114,7 @@ var dynamoTestCases = []parseDynamoDBMetadataTestData{ "targetValue": "no-valid", }, authParams: map[string]string{}, - expectedError: errors.New("error parsing DynamoDb metadata: unable to set param \"targetValue\" value"), + expectedError: errors.New("error parsing DynamoDb metadata: unable to set param [\"targetValue\"] value"), }, { name: "invalid activationTargetValue given", @@ -128,7 +128,7 @@ var dynamoTestCases = []parseDynamoDBMetadataTestData{ "activationTargetValue": "no-valid", }, authParams: map[string]string{}, - expectedError: errors.New("unable to set param \"activationTargetValue\""), + expectedError: errors.New("unable to set param [\"activationTargetValue\"]"), }, { name: "malformed expressionAttributeNames", diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index 81c318facaa..3746735dce6 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "strconv" - "strings" "github.com/go-logr/logr" "github.com/redis/go-redis/v9" @@ -33,9 +32,10 @@ var ( // ErrRedisUnequalHostsAndPorts is returned when the number of hosts and ports are unequal. ErrRedisUnequalHostsAndPorts = errors.New("not enough hosts or ports given. number of hosts should be equal to the number of ports") -) -type redisAddressParser func(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) + // ErrRedisParse is returned when "listName" is missing from the config. + ErrRedisParse = errors.New("error parsing redis metadata") +) type redisScaler struct { metricType v2.MetricTargetType @@ -46,31 +46,78 @@ type redisScaler struct { } type redisConnectionInfo struct { - addresses []string - username string - password string - sentinelUsername string - sentinelPassword string - sentinelMaster string - hosts []string - ports []string - enableTLS bool - unsafeSsl bool - cert string - key string - keyPassword string - ca string + Addresses []string `keda:"name=address;addresses, order=triggerMetadata;authParams;resolvedEnv"` + Username string `keda:"name=username, order=triggerMetadata;resolvedEnv;authParams"` + Password string `keda:"name=password, order=triggerMetadata;resolvedEnv;authParams"` + SentinelUsername string `keda:"name=sentinelUsername, order=triggerMetadata;authParams;resolvedEnv"` + SentinelPassword string `keda:"name=sentinelPassword, order=triggerMetadata;authParams;resolvedEnv"` + SentinelMaster string `keda:"name=sentinelMaster, order=triggerMetadata;authParams;resolvedEnv"` + Hosts []string `keda:"name=host;hosts, order=triggerMetadata;resolvedEnv;authParams"` + Ports []string `keda:"name=port;ports, order=triggerMetadata;resolvedEnv;authParams"` + EnableTLS bool + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional, default=false"` + Cert string `keda:"name=Cert;cert, order=authParams"` + Key string `keda:"name=key, order=authParams"` + KeyPassword string `keda:"name=keyPassword, order=authParams"` + Ca string `keda:"name=ca, order=authParams"` } type redisMetadata struct { - listLength int64 - activationListLength int64 - listName string - databaseIndex int - connectionInfo redisConnectionInfo + ListLength int64 `keda:"name=listLength, order=triggerMetadata, optional, default=5"` + ActivationListLength int64 `keda:"name=activationListLength, order=triggerMetadata, optional"` + ListName string `keda:"name=listName, order=triggerMetadata"` + DatabaseIndex int `keda:"name=databaseIndex, order=triggerMetadata, optional"` + MetadataEnableTLS string `keda:"name=enableTLS, order=triggerMetadata, optional"` + AuthParamEnableTLS string `keda:"name=tls, order=authParams, optional"` + ConnectionInfo redisConnectionInfo `keda:"optional"` triggerIndex int } +func (rci *redisConnectionInfo) SetEnableTLS(metadataEnableTLS string, authParamEnableTLS string) error { + EnableTLS := defaultEnableTLS + + if metadataEnableTLS != "" && authParamEnableTLS != "" { + return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together") + } + + if metadataEnableTLS != "" { + tls, err := strconv.ParseBool(metadataEnableTLS) + if err != nil { + return fmt.Errorf("EnableTLS parsing error %w", err) + } + EnableTLS = tls + } + + // parse tls config defined in auth params + if authParamEnableTLS != "" { + switch authParamEnableTLS { + case stringEnable: + EnableTLS = true + case stringDisable: + EnableTLS = false + default: + return fmt.Errorf("error incorrect TLS value given, got %s", authParamEnableTLS) + } + } + rci.EnableTLS = EnableTLS + return nil +} + +func (r *redisMetadata) Validate() error { + err := validateRedisAddress(&r.ConnectionInfo) + + if err != nil { + return err + } + + err = r.ConnectionInfo.SetEnableTLS(r.MetadataEnableTLS, r.AuthParamEnableTLS) + if err == nil { + r.MetadataEnableTLS, r.AuthParamEnableTLS = "", "" + } + + return err +} + // NewRedisScaler creates a new redisScaler func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *scalersconfig.ScalerConfig) (Scaler, error) { luaScript := ` @@ -94,30 +141,21 @@ func NewRedisScaler(ctx context.Context, isClustered, isSentinel bool, config *s logger := InitializeLogger(config, "redis_scaler") + meta, err := parseRedisMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %w", err) + } + if isClustered { - meta, err := parseRedisMetadata(config, parseRedisClusterAddress) - if err != nil { - return nil, fmt.Errorf("error parsing redis metadata: %w", err) - } return createClusteredRedisScaler(ctx, meta, luaScript, metricType, logger) } else if isSentinel { - meta, err := parseRedisMetadata(config, parseRedisSentinelAddress) - if err != nil { - return nil, fmt.Errorf("error parsing redis metadata: %w", err) - } return createSentinelRedisScaler(ctx, meta, luaScript, metricType, logger) } - - meta, err := parseRedisMetadata(config, parseRedisAddress) - if err != nil { - return nil, fmt.Errorf("error parsing redis metadata: %w", err) - } - return createRedisScaler(ctx, meta, luaScript, metricType, logger) } func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { - client, err := getRedisClusterClient(ctx, meta.connectionInfo) + client, err := getRedisClusterClient(ctx, meta.ConnectionInfo) if err != nil { return nil, fmt.Errorf("connection to redis cluster failed: %w", err) } @@ -131,7 +169,7 @@ func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script } listLengthFn := func(ctx context.Context) (int64, error) { - cmd := client.Eval(ctx, script, []string{meta.listName}) + cmd := client.Eval(ctx, script, []string{meta.ListName}) if cmd.Err() != nil { return -1, cmd.Err() } @@ -149,7 +187,7 @@ func createClusteredRedisScaler(ctx context.Context, meta *redisMetadata, script } func createSentinelRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { - client, err := getRedisSentinelClient(ctx, meta.connectionInfo, meta.databaseIndex) + client, err := getRedisSentinelClient(ctx, meta.ConnectionInfo, meta.DatabaseIndex) if err != nil { return nil, fmt.Errorf("connection to redis sentinel failed: %w", err) } @@ -158,7 +196,7 @@ func createSentinelRedisScaler(ctx context.Context, meta *redisMetadata, script } func createRedisScaler(ctx context.Context, meta *redisMetadata, script string, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { - client, err := getRedisClient(ctx, meta.connectionInfo, meta.databaseIndex) + client, err := getRedisClient(ctx, meta.ConnectionInfo, meta.DatabaseIndex) if err != nil { return nil, fmt.Errorf("connection to redis failed: %w", err) } @@ -176,7 +214,7 @@ func createRedisScalerWithClient(client *redis.Client, meta *redisMetadata, scri } listLengthFn := func(ctx context.Context) (int64, error) { - cmd := client.Eval(ctx, script, []string{meta.listName}) + cmd := client.Eval(ctx, script, []string{meta.ListName}) if cmd.Err() != nil { return -1, cmd.Err() } @@ -193,110 +231,14 @@ func createRedisScalerWithClient(client *redis.Client, meta *redisMetadata, scri } } -func parseTLSConfigIntoConnectionInfo(config *scalersconfig.ScalerConfig, connInfo *redisConnectionInfo) error { - enableTLS := defaultEnableTLS - if val, ok := config.TriggerMetadata["enableTLS"]; ok { - tls, err := strconv.ParseBool(val) - if err != nil { - return fmt.Errorf("enableTLS parsing error %w", err) - } - enableTLS = tls - } - - connInfo.unsafeSsl = false - if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { - parsedVal, err := strconv.ParseBool(val) - if err != nil { - return fmt.Errorf("error parsing unsafeSsl: %w", err) - } - connInfo.unsafeSsl = parsedVal - } - - // parse tls config defined in auth params - if val, ok := config.AuthParams["tls"]; ok { - val = strings.TrimSpace(val) - if enableTLS { - return errors.New("unable to set `tls` in both ScaledObject and TriggerAuthentication together") - } - switch val { - case stringEnable: - enableTLS = true - case stringDisable: - enableTLS = false - default: - return fmt.Errorf("error incorrect TLS value given, got %s", val) - } - } - if enableTLS { - certGiven := config.AuthParams["cert"] != "" - keyGiven := config.AuthParams["key"] != "" - if certGiven && !keyGiven { - return errors.New("key must be provided with cert") - } - if keyGiven && !certGiven { - return errors.New("cert must be provided with key") - } - connInfo.ca = config.AuthParams["ca"] - connInfo.cert = config.AuthParams["cert"] - connInfo.key = config.AuthParams["key"] - if value, found := config.AuthParams["keyPassword"]; found { - connInfo.keyPassword = value - } else { - connInfo.keyPassword = "" - } - } - connInfo.enableTLS = enableTLS - return nil -} - -func parseRedisMetadata(config *scalersconfig.ScalerConfig, parserFn redisAddressParser) (*redisMetadata, error) { - connInfo, err := parserFn(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams) - if err != nil { - return nil, err - } - meta := redisMetadata{ - connectionInfo: connInfo, - } - - err = parseTLSConfigIntoConnectionInfo(config, &meta.connectionInfo) - if err != nil { - return nil, err - } - - meta.listLength = defaultListLength - if val, ok := config.TriggerMetadata["listLength"]; ok { - listLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("list length parsing error: %w", err) - } - meta.listLength = listLength - } - - meta.activationListLength = defaultActivationListLength - if val, ok := config.TriggerMetadata["activationListLength"]; ok { - activationListLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("activationListLength parsing error %w", err) - } - meta.activationListLength = activationListLength - } - - if val, ok := config.TriggerMetadata["listName"]; ok { - meta.listName = val - } else { - return nil, ErrRedisNoListName +func parseRedisMetadata(config *scalersconfig.ScalerConfig) (*redisMetadata, error) { + meta := &redisMetadata{} + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing redis metadata: %w", err) } - meta.databaseIndex = defaultDBIdx - if val, ok := config.TriggerMetadata["databaseIndex"]; ok { - dbIndex, err := strconv.ParseInt(val, 10, 32) - if err != nil { - return nil, fmt.Errorf("databaseIndex: parsing error %w", err) - } - meta.databaseIndex = int(dbIndex) - } meta.triggerIndex = config.TriggerIndex - return &meta, nil + return meta, nil } func (s *redisScaler) Close(context.Context) error { @@ -305,12 +247,12 @@ func (s *redisScaler) Close(context.Context) error { // GetMetricSpecForScaling returns the metric spec for the HPA func (s *redisScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { - metricName := util.NormalizeString(fmt.Sprintf("redis-%s", s.metadata.listName)) + metricName := util.NormalizeString(fmt.Sprintf("redis-%s", s.metadata.ListName)) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), }, - Target: GetMetricTarget(s.metricType, s.metadata.listLength), + Target: GetMetricTarget(s.metricType, s.metadata.ListLength), } metricSpec := v2.MetricSpec{ External: externalMetric, Type: externalMetricType, @@ -329,189 +271,34 @@ func (s *redisScaler) GetMetricsAndActivity(ctx context.Context, metricName stri metric := GenerateMetricInMili(metricName, float64(listLen)) - return []external_metrics.ExternalMetricValue{metric}, listLen > s.metadata.activationListLength, nil -} - -func parseRedisAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { - info := redisConnectionInfo{} - switch { - case authParams["address"] != "": - info.addresses = append(info.addresses, authParams["address"]) - case metadata["address"] != "": - info.addresses = append(info.addresses, metadata["address"]) - case metadata["addressFromEnv"] != "": - info.addresses = append(info.addresses, resolvedEnv[metadata["addressFromEnv"]]) - default: - switch { - case authParams["host"] != "": - info.hosts = append(info.hosts, authParams["host"]) - case metadata["host"] != "": - info.hosts = append(info.hosts, metadata["host"]) - case metadata["hostFromEnv"] != "": - info.hosts = append(info.hosts, resolvedEnv[metadata["hostFromEnv"]]) - } - - switch { - case authParams["port"] != "": - info.ports = append(info.ports, authParams["port"]) - case metadata["port"] != "": - info.ports = append(info.ports, metadata["port"]) - case metadata["portFromEnv"] != "": - info.ports = append(info.ports, resolvedEnv[metadata["portFromEnv"]]) - } - - if len(info.hosts) != 0 && len(info.ports) != 0 { - info.addresses = append(info.addresses, net.JoinHostPort(info.hosts[0], info.ports[0])) - } - } - - if len(info.addresses) == 0 || len(info.addresses[0]) == 0 { - return info, fmt.Errorf("no address or host given. address should be in the format of host:port or you should set the host/port values") - } - - switch { - case authParams["username"] != "": - info.username = authParams["username"] - case metadata["username"] != "": - info.username = metadata["username"] - case metadata["usernameFromEnv"] != "": - info.username = resolvedEnv[metadata["usernameFromEnv"]] - } - - if authParams["password"] != "" { - info.password = authParams["password"] - } else if metadata["passwordFromEnv"] != "" { - info.password = resolvedEnv[metadata["passwordFromEnv"]] - } - - return info, nil + return []external_metrics.ExternalMetricValue{metric}, listLen > s.metadata.ActivationListLength, nil } -func parseRedisMultipleAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { - info := redisConnectionInfo{} - switch { - case authParams["addresses"] != "": - info.addresses = splitAndTrim(authParams["addresses"]) - case metadata["addresses"] != "": - info.addresses = splitAndTrim(metadata["addresses"]) - case metadata["addressesFromEnv"] != "": - info.addresses = splitAndTrim(resolvedEnv[metadata["addressesFromEnv"]]) - default: - switch { - case authParams["hosts"] != "": - info.hosts = splitAndTrim(authParams["hosts"]) - case metadata["hosts"] != "": - info.hosts = splitAndTrim(metadata["hosts"]) - case metadata["hostsFromEnv"] != "": - info.hosts = splitAndTrim(resolvedEnv[metadata["hostsFromEnv"]]) +func validateRedisAddress(c *redisConnectionInfo) error { + if len(c.Hosts) != 0 && len(c.Ports) != 0 { + if len(c.Hosts) != len(c.Ports) { + return ErrRedisUnequalHostsAndPorts } - - switch { - case authParams["ports"] != "": - info.ports = splitAndTrim(authParams["ports"]) - case metadata["ports"] != "": - info.ports = splitAndTrim(metadata["ports"]) - case metadata["portsFromEnv"] != "": - info.ports = splitAndTrim(resolvedEnv[metadata["portsFromEnv"]]) - } - - if len(info.hosts) != 0 && len(info.ports) != 0 { - if len(info.hosts) != len(info.ports) { - return info, ErrRedisUnequalHostsAndPorts - } - for i := range info.hosts { - info.addresses = append(info.addresses, net.JoinHostPort(info.hosts[i], info.ports[i])) - } + for i := range c.Hosts { + c.Addresses = append(c.Addresses, net.JoinHostPort(c.Hosts[i], c.Ports[i])) } } + // } - if len(info.addresses) == 0 { - return info, ErrRedisNoAddresses - } - - return info, nil -} - -func parseRedisClusterAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { - info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams) - if err != nil { - return redisConnectionInfo{}, err - } - - switch { - case authParams["username"] != "": - info.username = authParams["username"] - case metadata["username"] != "": - info.username = metadata["username"] - case metadata["usernameFromEnv"] != "": - info.username = resolvedEnv[metadata["usernameFromEnv"]] - } - - if authParams["password"] != "" { - info.password = authParams["password"] - } else if metadata["passwordFromEnv"] != "" { - info.password = resolvedEnv[metadata["passwordFromEnv"]] - } - - return info, nil -} - -func parseRedisSentinelAddress(metadata, resolvedEnv, authParams map[string]string) (redisConnectionInfo, error) { - info, err := parseRedisMultipleAddress(metadata, resolvedEnv, authParams) - if err != nil { - return redisConnectionInfo{}, err - } - - switch { - case authParams["username"] != "": - info.username = authParams["username"] - case metadata["username"] != "": - info.username = metadata["username"] - case metadata["usernameFromEnv"] != "": - info.username = resolvedEnv[metadata["usernameFromEnv"]] - } - - if authParams["password"] != "" { - info.password = authParams["password"] - } else if metadata["passwordFromEnv"] != "" { - info.password = resolvedEnv[metadata["passwordFromEnv"]] - } - - switch { - case authParams["sentinelUsername"] != "": - info.sentinelUsername = authParams["sentinelUsername"] - case metadata["sentinelUsername"] != "": - info.sentinelUsername = metadata["sentinelUsername"] - case metadata["sentinelUsernameFromEnv"] != "": - info.sentinelUsername = resolvedEnv[metadata["sentinelUsernameFromEnv"]] - } - - if authParams["sentinelPassword"] != "" { - info.sentinelPassword = authParams["sentinelPassword"] - } else if metadata["sentinelPasswordFromEnv"] != "" { - info.sentinelPassword = resolvedEnv[metadata["sentinelPasswordFromEnv"]] - } - - switch { - case authParams["sentinelMaster"] != "": - info.sentinelMaster = authParams["sentinelMaster"] - case metadata["sentinelMaster"] != "": - info.sentinelMaster = metadata["sentinelMaster"] - case metadata["sentinelMasterFromEnv"] != "": - info.sentinelMaster = resolvedEnv[metadata["sentinelMasterFromEnv"]] + if len(c.Addresses) == 0 || len(c.Addresses[0]) == 0 { + return ErrRedisNoAddresses } - - return info, nil + return nil } func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redis.ClusterClient, error) { options := &redis.ClusterOptions{ - Addrs: info.addresses, - Username: info.username, - Password: info.password, + Addrs: info.Addresses, + Username: info.Username, + Password: info.Password, } - if info.enableTLS { - tlsConfig, err := util.NewTLSConfigWithPassword(info.cert, info.key, info.keyPassword, info.ca, info.unsafeSsl) + if info.EnableTLS { + tlsConfig, err := util.NewTLSConfigWithPassword(info.Cert, info.Key, info.KeyPassword, info.Ca, info.UnsafeSsl) if err != nil { return nil, err } @@ -528,16 +315,16 @@ func getRedisClusterClient(ctx context.Context, info redisConnectionInfo) (*redi func getRedisSentinelClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) { options := &redis.FailoverOptions{ - Username: info.username, - Password: info.password, + Username: info.Username, + Password: info.Password, DB: dbIndex, - SentinelAddrs: info.addresses, - SentinelUsername: info.sentinelUsername, - SentinelPassword: info.sentinelPassword, - MasterName: info.sentinelMaster, + SentinelAddrs: info.Addresses, + SentinelUsername: info.SentinelUsername, + SentinelPassword: info.SentinelPassword, + MasterName: info.SentinelMaster, } - if info.enableTLS { - tlsConfig, err := util.NewTLSConfigWithPassword(info.cert, info.key, info.keyPassword, info.ca, info.unsafeSsl) + if info.EnableTLS { + tlsConfig, err := util.NewTLSConfigWithPassword(info.Cert, info.Key, info.KeyPassword, info.Ca, info.UnsafeSsl) if err != nil { return nil, err } @@ -554,13 +341,13 @@ func getRedisSentinelClient(ctx context.Context, info redisConnectionInfo, dbInd func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int) (*redis.Client, error) { options := &redis.Options{ - Addr: info.addresses[0], - Username: info.username, - Password: info.password, + Addr: info.Addresses[0], + Username: info.Username, + Password: info.Password, DB: dbIndex, } - if info.enableTLS { - tlsConfig, err := util.NewTLSConfigWithPassword(info.cert, info.key, info.keyPassword, info.ca, info.unsafeSsl) + if info.EnableTLS { + tlsConfig, err := util.NewTLSConfigWithPassword(info.Cert, info.Key, info.KeyPassword, info.Ca, info.UnsafeSsl) if err != nil { return nil, err } @@ -575,12 +362,3 @@ func getRedisClient(ctx context.Context, info redisConnectionInfo, dbIndex int) } return c, nil } - -// Splits a string separated by comma and trims space from all the elements. -func splitAndTrim(s string) []string { - x := strings.Split(s, ",") - for i := range x { - x[i] = strings.Trim(x[i], " ") - } - return x -} diff --git a/pkg/scalers/redis_scaler_test.go b/pkg/scalers/redis_scaler_test.go index 9ce7a062e70..734a1c359a0 100644 --- a/pkg/scalers/redis_scaler_test.go +++ b/pkg/scalers/redis_scaler_test.go @@ -2,7 +2,6 @@ package scalers import ( "context" - "strconv" "testing" "github.com/go-logr/logr" @@ -25,7 +24,7 @@ type parseRedisMetadataTestData struct { metadata map[string]string isError bool authParams map[string]string - enableTLS bool + EnableTLS bool } type redisMetricIdentifier struct { @@ -83,7 +82,7 @@ func TestRedisParseMetadata(t *testing.T) { testCaseNum := 0 for _, testData := range testRedisMetadata { testCaseNum++ - meta, err := parseRedisMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.authParams}, parseRedisAddress) + meta, err := parseRedisMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.authParams}) if err != nil && !testData.isError { t.Errorf("Expected success but got error for unit test # %v", testCaseNum) } @@ -93,21 +92,21 @@ func TestRedisParseMetadata(t *testing.T) { if testData.isError { continue } - if meta.connectionInfo.enableTLS != testData.enableTLS { - t.Errorf("Expected enableTLS to be set to %v but got %v for unit test #%v\n", testData.enableTLS, meta.connectionInfo.enableTLS, testCaseNum) + if meta.ConnectionInfo.EnableTLS != testData.EnableTLS { + t.Errorf("Expected EnableTLS to be set to %v but got %v for unit test #%v\n", testData.EnableTLS, meta.ConnectionInfo.EnableTLS, testCaseNum) } - if meta.connectionInfo.enableTLS { - if meta.connectionInfo.ca != testData.authParams["ca"] { - t.Errorf("Expected ca to be set to %v but got %v for unit test #%v\n", testData.authParams["ca"], meta.connectionInfo.enableTLS, testCaseNum) + if meta.ConnectionInfo.EnableTLS { + if meta.ConnectionInfo.Ca != testData.authParams["ca"] { + t.Errorf("Expected ca to be set to %v but got %v for unit test #%v\n", testData.authParams["ca"], meta.ConnectionInfo.EnableTLS, testCaseNum) } - if meta.connectionInfo.cert != testData.authParams["cert"] { - t.Errorf("Expected cert to be set to %v but got %v for unit test #%v\n", testData.authParams["cert"], meta.connectionInfo.cert, testCaseNum) + if meta.ConnectionInfo.Cert != testData.authParams["cert"] { + t.Errorf("Expected Cert to be set to %v but got %v for unit test #%v\n", testData.authParams["cert"], meta.ConnectionInfo.Cert, testCaseNum) } - if meta.connectionInfo.key != testData.authParams["key"] { - t.Errorf("Expected key to be set to %v but got %v for unit test #%v\n", testData.authParams["key"], meta.connectionInfo.key, testCaseNum) + if meta.ConnectionInfo.Key != testData.authParams["key"] { + t.Errorf("Expected key to be set to %v but got %v for unit test #%v\n", testData.authParams["key"], meta.ConnectionInfo.Key, testCaseNum) } - if meta.connectionInfo.keyPassword != testData.authParams["keyPassword"] { - t.Errorf("Expected key to be set to %v but got %v for unit test #%v\n", testData.authParams["keyPassword"], meta.connectionInfo.key, testCaseNum) + if meta.ConnectionInfo.KeyPassword != testData.authParams["keyPassword"] { + t.Errorf("Expected key to be set to %v but got %v for unit test #%v\n", testData.authParams["keyPassword"], meta.ConnectionInfo.Key, testCaseNum) } } } @@ -115,7 +114,7 @@ func TestRedisParseMetadata(t *testing.T) { func TestRedisGetMetricSpecForScaling(t *testing.T) { for _, testData := range redisMetricIdentifiers { - meta, err := parseRedisMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, parseRedisAddress) + meta, err := parseRedisMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testRedisResolvedEnv, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } @@ -152,10 +151,11 @@ func TestParseRedisClusterMetadata(t *testing.T) { wantErr: ErrRedisNoAddresses, }, { - name: "unequal number of hosts/ports", + name: "unequal number of Hosts/Ports", metadata: map[string]string{ - "hosts": "a, b, c", - "ports": "1, 2", + "listName": "mylist", + "hosts": "a, b, c", + "ports": "1, 2", }, wantMeta: nil, wantErr: ErrRedisUnequalHostsAndPorts, @@ -168,7 +168,7 @@ func TestParseRedisClusterMetadata(t *testing.T) { "listLength": "5", }, wantMeta: nil, - wantErr: ErrRedisNoListName, + wantErr: ErrRedisParse, }, { name: "invalid list length", @@ -179,7 +179,7 @@ func TestParseRedisClusterMetadata(t *testing.T) { "listLength": "invalid", }, wantMeta: nil, - wantErr: strconv.ErrSyntax, + wantErr: ErrRedisParse, }, { name: "address is defined in auth params", @@ -190,16 +190,16 @@ func TestParseRedisClusterMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, }, wantErr: nil, }, { - name: "hosts and ports given in auth params", + name: "Hosts and Ports given in auth params", metadata: map[string]string{ "listName": "mylist", }, @@ -208,18 +208,18 @@ func TestParseRedisClusterMetadata(t *testing.T) { "ports": "1, 2, 3", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, }, wantErr: nil, }, { - name: "username given in authParams", + name: "Username given in authParams", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -229,19 +229,19 @@ func TestParseRedisClusterMetadata(t *testing.T) { "username": "username", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, }, wantErr: nil, }, { - name: "username given in metadata", + name: "Username given in metadata", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -250,19 +250,19 @@ func TestParseRedisClusterMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, }, wantErr: nil, }, { - name: "username given in metadata from env", + name: "Username given in metadata from env", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -272,19 +272,19 @@ func TestParseRedisClusterMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "none", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "none", }, }, wantErr: nil, }, { - name: "password given in authParams", + name: "Password given in authParams", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -294,19 +294,19 @@ func TestParseRedisClusterMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", }, }, wantErr: nil, }, { - name: "password given in metadata from env", + name: "Password given in metadata from env", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -316,19 +316,19 @@ func TestParseRedisClusterMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "none", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "none", }, }, wantErr: nil, }, { - name: "tls enabled without setting unsafeSsl", + name: "tls enabled without setting UnsafeSsl", metadata: map[string]string{ "listName": "mylist", "enableTLS": "true", @@ -337,18 +337,18 @@ func TestParseRedisClusterMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, - enableTLS: true, - unsafeSsl: false, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, + EnableTLS: true, + UnsafeSsl: false, }, }, wantErr: nil, }, { - name: "tls enabled with unsafeSsl true", + name: "tls enabled with UnsafeSsl true", metadata: map[string]string{ "listName": "mylist", "enableTLS": "true", @@ -358,12 +358,12 @@ func TestParseRedisClusterMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, - enableTLS: true, - unsafeSsl: true, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, + EnableTLS: true, + UnsafeSsl: true, }, }, wantErr: nil, @@ -378,9 +378,9 @@ func TestParseRedisClusterMetadata(t *testing.T) { ResolvedEnv: c.resolvedEnv, AuthParams: c.authParams, } - meta, err := parseRedisMetadata(config, parseRedisClusterAddress) + meta, err := parseRedisMetadata(config) if c.wantErr != nil { - assert.ErrorIs(t, err, c.wantErr) + assert.ErrorContains(t, err, c.wantErr.Error()) } else { assert.NoError(t, err) } @@ -404,7 +404,7 @@ func TestParseRedisSentinelMetadata(t *testing.T) { wantErr: ErrRedisNoAddresses, }, { - name: "unequal number of hosts/ports", + name: "unequal number of Hosts/Ports", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2", @@ -420,7 +420,7 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "listLength": "5", }, wantMeta: nil, - wantErr: ErrRedisNoListName, + wantErr: ErrRedisParse, }, { name: "invalid list length", @@ -431,7 +431,7 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "listLength": "invalid", }, wantMeta: nil, - wantErr: strconv.ErrSyntax, + wantErr: ErrRedisParse, }, { name: "address is defined in auth params", @@ -442,16 +442,16 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, }, wantErr: nil, }, { - name: "hosts and ports given in auth params", + name: "Hosts and Ports given in auth params", metadata: map[string]string{ "listName": "mylist", }, @@ -460,18 +460,18 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "ports": "1, 2, 3", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, }, wantErr: nil, }, { - name: "hosts and ports given in auth params", + name: "Hosts and Ports given in auth params", metadata: map[string]string{ "listName": "mylist", }, @@ -480,18 +480,18 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "ports": "1, 2, 3", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, }, wantErr: nil, }, { - name: "username given in authParams", + name: "Username given in authParams", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -501,19 +501,19 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "username": "username", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, }, wantErr: nil, }, { - name: "username given in metadata", + name: "Username given in metadata", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -522,19 +522,19 @@ func TestParseRedisSentinelMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, }, wantErr: nil, }, { - name: "username given in metadata from env", + name: "Username given in metadata from env", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -544,19 +544,19 @@ func TestParseRedisSentinelMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "none", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "none", }, }, wantErr: nil, }, { - name: "password given in authParams", + name: "Password given in authParams", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -566,19 +566,19 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", }, }, wantErr: nil, }, { - name: "password given in metadata from env", + name: "Password given in metadata from env", metadata: map[string]string{ "hosts": "a, b, c", "ports": "1, 2, 3", @@ -588,13 +588,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "none", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "none", }, }, wantErr: nil, @@ -610,13 +610,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "sentinelUsername": "sentinelUsername", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "sentinelUsername", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "sentinelUsername", }, }, wantErr: nil, @@ -631,13 +631,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "sentinelUsername", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "sentinelUsername", }, }, wantErr: nil, @@ -653,13 +653,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "none", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "none", }, }, wantErr: nil, @@ -675,13 +675,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "sentinelPassword": "sentinelPassword", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelPassword: "sentinelPassword", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelPassword: "sentinelPassword", }, }, wantErr: nil, @@ -697,13 +697,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelPassword: "none", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelPassword: "none", }, }, wantErr: nil, @@ -719,13 +719,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "sentinelMaster": "sentinelMaster", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "sentinelMaster", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "sentinelMaster", }, }, wantErr: nil, @@ -740,13 +740,13 @@ func TestParseRedisSentinelMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "sentinelMaster", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "sentinelMaster", }, }, wantErr: nil, @@ -762,19 +762,19 @@ func TestParseRedisSentinelMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "none", + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "none", }, }, wantErr: nil, }, { - name: "tls enabled without setting unsafeSsl", + name: "tls enabled without setting UnsafeSsl", metadata: map[string]string{ "listName": "mylist", "enableTLS": "true", @@ -783,18 +783,18 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, - enableTLS: true, - unsafeSsl: false, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, + EnableTLS: true, + UnsafeSsl: false, }, }, wantErr: nil, }, { - name: "tls enabled with unsafeSsl true", + name: "tls enabled with UnsafeSsl true", metadata: map[string]string{ "listName": "mylist", "enableTLS": "true", @@ -804,12 +804,12 @@ func TestParseRedisSentinelMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisMetadata{ - listLength: 5, - listName: "mylist", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, - enableTLS: true, - unsafeSsl: true, + ListLength: 5, + ListName: "mylist", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, + EnableTLS: true, + UnsafeSsl: true, }, }, wantErr: nil, @@ -824,9 +824,9 @@ func TestParseRedisSentinelMetadata(t *testing.T) { ResolvedEnv: c.resolvedEnv, AuthParams: c.authParams, } - meta, err := parseRedisMetadata(config, parseRedisSentinelAddress) + meta, err := parseRedisMetadata(config) if c.wantErr != nil { - assert.ErrorIs(t, err, c.wantErr) + assert.ErrorContains(t, err, c.wantErr.Error()) } else { assert.NoError(t, err) } diff --git a/pkg/scalers/redis_streams_scaler.go b/pkg/scalers/redis_streams_scaler.go index fdb3f7bc085..9418ee5f85b 100644 --- a/pkg/scalers/redis_streams_scaler.go +++ b/pkg/scalers/redis_streams_scaler.go @@ -25,12 +25,6 @@ const ( ) const ( - // defaults - defaultDBIndex = 0 - defaultTargetEntries = 5 - defaultTargetLag = 5 - defaultActivationLagCount = 0 - // metadata names lagMetadata = "lagCount" pendingEntriesCountMetadata = "pendingEntriesCount" @@ -54,15 +48,54 @@ type redisStreamsScaler struct { type redisStreamsMetadata struct { scaleFactor scaleFactor - targetPendingEntriesCount int64 - targetStreamLength int64 - targetLag int64 - streamName string - consumerGroupName string - databaseIndex int - connectionInfo redisConnectionInfo triggerIndex int - activationLagCount int64 + TargetPendingEntriesCount int64 `keda:"name=pendingEntriesCount, order=triggerMetadata, optional, default=5"` + TargetStreamLength int64 `keda:"name=streamLength, order=triggerMetadata, optional, default=5"` + TargetLag int64 `keda:"name=lagCount, order=triggerMetadata, optional"` + StreamName string `keda:"name=stream, order=triggerMetadata"` + ConsumerGroupName string `keda:"name=consumerGroup, order=triggerMetadata, optional"` + DatabaseIndex int `keda:"name=databaseIndex, order=triggerMetadata, optional"` + ConnectionInfo redisConnectionInfo `keda:"optional"` + ActivationLagCount int64 `keda:"name=activationLagCount, order=triggerMetadata, optional"` + MetadataEnableTLS string `keda:"name=enableTLS, order=triggerMetadata, optional"` + AuthParamEnableTLS string `keda:"name=tls, order=authParams, optional"` +} + +func (r *redisStreamsMetadata) Validate() error { + err := validateRedisAddress(&r.ConnectionInfo) + if err != nil { + return err + } + + err = r.ConnectionInfo.SetEnableTLS(r.MetadataEnableTLS, r.AuthParamEnableTLS) + if err != nil { + return err + } + r.MetadataEnableTLS, r.AuthParamEnableTLS = "", "" + + if r.StreamName == "" { + return ErrRedisMissingStreamName + } + + if r.ConsumerGroupName != "" { + r.TargetStreamLength = 0 + if r.TargetLag != 0 { + r.scaleFactor = lagFactor + r.TargetPendingEntriesCount = 0 + + if r.ActivationLagCount == 0 { + err := errors.New("activationLagCount required for Redis lag") + return err + } + } else { + r.scaleFactor = xPendingFactor + } + } else { + r.scaleFactor = xLengthFactor + r.TargetPendingEntriesCount = 0 + } + + return nil } // NewRedisStreamsScaler creates a new redisStreamsScaler @@ -74,28 +107,21 @@ func NewRedisStreamsScaler(ctx context.Context, isClustered, isSentinel bool, co logger := InitializeLogger(config, "redis_streams_scaler") + meta, err := parseRedisStreamsMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing redis streams metadata: %w", err) + } + if isClustered { - meta, err := parseRedisStreamsMetadata(config, parseRedisClusterAddress) - if err != nil { - return nil, fmt.Errorf("error parsing redis streams metadata: %w", err) - } return createClusteredRedisStreamsScaler(ctx, meta, metricType, logger) } else if isSentinel { - meta, err := parseRedisStreamsMetadata(config, parseRedisSentinelAddress) - if err != nil { - return nil, fmt.Errorf("error parsing redis streams metadata: %w", err) - } return createSentinelRedisStreamsScaler(ctx, meta, metricType, logger) } - meta, err := parseRedisStreamsMetadata(config, parseRedisAddress) - if err != nil { - return nil, fmt.Errorf("error parsing redis streams metadata: %w", err) - } return createRedisStreamsScaler(ctx, meta, metricType, logger) } func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { - client, err := getRedisClusterClient(ctx, meta.connectionInfo) + client, err := getRedisClusterClient(ctx, meta.ConnectionInfo) if err != nil { return nil, fmt.Errorf("connection to redis cluster failed: %w", err) @@ -121,7 +147,7 @@ func createClusteredRedisStreamsScaler(ctx context.Context, meta *redisStreamsMe } func createSentinelRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { - client, err := getRedisSentinelClient(ctx, meta.connectionInfo, meta.databaseIndex) + client, err := getRedisSentinelClient(ctx, meta.ConnectionInfo, meta.DatabaseIndex) if err != nil { return nil, fmt.Errorf("connection to redis sentinel failed: %w", err) } @@ -130,7 +156,7 @@ func createSentinelRedisStreamsScaler(ctx context.Context, meta *redisStreamsMet } func createRedisStreamsScaler(ctx context.Context, meta *redisStreamsMetadata, metricType v2.MetricTargetType, logger logr.Logger) (Scaler, error) { - client, err := getRedisClient(ctx, meta.connectionInfo, meta.databaseIndex) + client, err := getRedisClient(ctx, meta.ConnectionInfo, meta.DatabaseIndex) if err != nil { return nil, fmt.Errorf("connection to redis failed: %w", err) } @@ -162,7 +188,7 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent switch meta.scaleFactor { case xPendingFactor: entriesCountFn = func(ctx context.Context) (int64, error) { - pendingEntries, err := client.XPending(ctx, meta.streamName, meta.consumerGroupName).Result() + pendingEntries, err := client.XPending(ctx, meta.StreamName, meta.ConsumerGroupName).Result() if err != nil { return -1, err } @@ -170,7 +196,7 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent } case xLengthFactor: entriesCountFn = func(ctx context.Context) (int64, error) { - entriesLength, err := client.XLen(ctx, meta.streamName).Result() + entriesLength, err := client.XLen(ctx, meta.StreamName).Result() if err != nil { return -1, err } @@ -212,7 +238,7 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent err := errors.New("could not find Redis version number") return -1, err } - groups, err := client.XInfoGroups(ctx, meta.streamName).Result() + groups, err := client.XInfoGroups(ctx, meta.StreamName).Result() // If XINFO GROUPS can't find the stream key, it hasn't been created // yet. In that case, we return a lag of 0. @@ -225,13 +251,13 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent numGroups := len(groups) for i := 0; i < numGroups; i++ { group := groups[i] - if group.Name == meta.consumerGroupName { + if group.Name == meta.ConsumerGroupName { return group.Lag, nil } } // There is an edge case where the Redis producer has set up the - // stream [meta.streamName], but the consumer group [meta.consumerGroupName] + // stream [meta.StreamName], but the consumer group [meta.ConsumerGroupName] // for that stream isn't registered with Redis. In other words, the // producer has created messages for the stream, but the consumer group // hasn't yet registered itself on Redis because scaling starts with 0 @@ -240,7 +266,7 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent // it's not possible to obtain the lag for a nonexistent consumer // group. From here, the consumer group gets instantiated, and scaling // again occurs according to XINFO GROUP lag. - entriesLength, err := client.XLen(ctx, meta.streamName).Result() + entriesLength, err := client.XLen(ctx, meta.StreamName).Result() if err != nil { return -1, err } @@ -255,84 +281,18 @@ func createEntriesCountFn(client redis.Cmdable, meta *redisStreamsMetadata) (ent var ( // ErrRedisMissingStreamName is returned when "stream" is missing. ErrRedisMissingStreamName = errors.New("missing redis stream name") -) - -func parseRedisStreamsMetadata(config *scalersconfig.ScalerConfig, parseFn redisAddressParser) (*redisStreamsMetadata, error) { - connInfo, err := parseFn(config.TriggerMetadata, config.ResolvedEnv, config.AuthParams) - if err != nil { - return nil, err - } - meta := redisStreamsMetadata{ - connectionInfo: connInfo, - } - - err = parseTLSConfigIntoConnectionInfo(config, &meta.connectionInfo) - if err != nil { - return nil, err - } - - if val, ok := config.TriggerMetadata[streamNameMetadata]; ok { - meta.streamName = val - } else { - return nil, ErrRedisMissingStreamName - } - - meta.activationLagCount = defaultActivationLagCount - - if val, ok := config.TriggerMetadata[consumerGroupNameMetadata]; ok { - meta.consumerGroupName = val - if val, ok := config.TriggerMetadata[lagMetadata]; ok { - meta.scaleFactor = lagFactor - lag, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing lag: %w", err) - } - meta.targetLag = lag - - if val, ok := config.TriggerMetadata[activationValueTriggerConfigName]; ok { - activationVal, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, errors.New("error while parsing activation lag value") - } - meta.activationLagCount = activationVal - } else { - err := errors.New("activationLagCount required for Redis lag") - return nil, err - } - } else { - meta.scaleFactor = xPendingFactor - meta.targetPendingEntriesCount = defaultTargetEntries - if val, ok := config.TriggerMetadata[pendingEntriesCountMetadata]; ok { - pendingEntriesCount, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing pending entries count: %w", err) - } - meta.targetPendingEntriesCount = pendingEntriesCount - } - } - } else { - meta.scaleFactor = xLengthFactor - meta.targetStreamLength = defaultTargetEntries - if val, ok := config.TriggerMetadata[streamLengthMetadata]; ok { - streamLength, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing stream length: %w", err) - } - meta.targetStreamLength = streamLength - } - } - meta.databaseIndex = defaultDBIndex - if val, ok := config.TriggerMetadata[databaseIndexMetadata]; ok { - dbIndex, err := strconv.ParseInt(val, 10, 32) - if err != nil { - return nil, fmt.Errorf("error parsing redis database index %w", err) - } - meta.databaseIndex = int(dbIndex) - } + // ErrRedisStreamParse is returned when missing parameters or parsing parameters error. + ErrRedisStreamParse = errors.New("error parsing redis stream metadata") +) +func parseRedisStreamsMetadata(config *scalersconfig.ScalerConfig) (*redisStreamsMetadata, error) { + meta := &redisStreamsMetadata{} meta.triggerIndex = config.TriggerIndex - return &meta, nil + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing redis stream metadata: %w", err) + } + return meta, nil } func (s *redisStreamsScaler) Close(context.Context) error { @@ -345,16 +305,16 @@ func (s *redisStreamsScaler) GetMetricSpecForScaling(context.Context) []v2.Metri switch s.metadata.scaleFactor { case xPendingFactor: - metricValue = s.metadata.targetPendingEntriesCount + metricValue = s.metadata.TargetPendingEntriesCount case xLengthFactor: - metricValue = s.metadata.targetStreamLength + metricValue = s.metadata.TargetStreamLength case lagFactor: - metricValue = s.metadata.targetLag + metricValue = s.metadata.TargetLag } externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("redis-streams-%s", s.metadata.streamName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("redis-streams-%s", s.metadata.StreamName))), }, Target: GetMetricTarget(s.metricType, metricValue), } @@ -372,5 +332,5 @@ func (s *redisStreamsScaler) GetMetricsAndActivity(ctx context.Context, metricNa } metric := GenerateMetricInMili(metricName, float64(metricCount)) - return []external_metrics.ExternalMetricValue{metric}, metricCount > s.metadata.activationLagCount, nil + return []external_metrics.ExternalMetricValue{metric}, metricCount > s.metadata.ActivationLagCount, nil } diff --git a/pkg/scalers/redis_streams_scaler_test.go b/pkg/scalers/redis_streams_scaler_test.go index 0accba44f0b..205775b542b 100644 --- a/pkg/scalers/redis_streams_scaler_test.go +++ b/pkg/scalers/redis_streams_scaler_test.go @@ -49,25 +49,25 @@ func TestParseRedisStreamsMetadata(t *testing.T) { for _, tc := range testCasesPending { tc := tc t.Run(tc.name, func(te *testing.T) { - m, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: tc.authParams}, parseRedisAddress) + m, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: tc.authParams}) assert.Nil(t, err) - assert.Equal(t, tc.metadata[streamNameMetadata], m.streamName) - assert.Equal(t, tc.metadata[consumerGroupNameMetadata], m.consumerGroupName) - assert.Equal(t, tc.metadata[pendingEntriesCountMetadata], strconv.FormatInt(m.targetPendingEntriesCount, 10)) + assert.Equal(t, tc.metadata[streamNameMetadata], m.StreamName) + assert.Equal(t, tc.metadata[consumerGroupNameMetadata], m.ConsumerGroupName) + assert.Equal(t, tc.metadata[pendingEntriesCountMetadata], strconv.FormatInt(m.TargetPendingEntriesCount, 10)) if authParams != nil { // if authParam is used - assert.Equal(t, authParams[usernameMetadata], m.connectionInfo.username) - assert.Equal(t, authParams[passwordMetadata], m.connectionInfo.password) + assert.Equal(t, authParams[usernameMetadata], m.ConnectionInfo.Username) + assert.Equal(t, authParams[passwordMetadata], m.ConnectionInfo.Password) } else { // if metadata is used to pass credentials' env var names - assert.Equal(t, tc.resolvedEnv[tc.metadata[usernameMetadata]], m.connectionInfo.username) - assert.Equal(t, tc.resolvedEnv[tc.metadata[passwordMetadata]], m.connectionInfo.password) + assert.Equal(t, tc.resolvedEnv[tc.metadata[usernameMetadata]], m.ConnectionInfo.Username) + assert.Equal(t, tc.resolvedEnv[tc.metadata[passwordMetadata]], m.ConnectionInfo.Password) } - assert.Equal(t, tc.metadata[databaseIndexMetadata], strconv.Itoa(m.databaseIndex)) + assert.Equal(t, tc.metadata[databaseIndexMetadata], strconv.Itoa(m.DatabaseIndex)) b, err := strconv.ParseBool(tc.metadata[enableTLSMetadata]) assert.Nil(t, err) - assert.Equal(t, b, m.connectionInfo.enableTLS) + assert.Equal(t, b, m.ConnectionInfo.EnableTLS) }) } @@ -99,25 +99,25 @@ func TestParseRedisStreamsMetadata(t *testing.T) { for _, tc := range testCasesLag { tc := tc t.Run(tc.name, func(te *testing.T) { - m, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: tc.authParams}, parseRedisAddress) + m, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: tc.authParams}) assert.Nil(t, err) - assert.Equal(t, m.streamName, tc.metadata[streamNameMetadata]) - assert.Equal(t, m.consumerGroupName, tc.metadata[consumerGroupNameMetadata]) - assert.Equal(t, strconv.FormatInt(m.targetLag, 10), tc.metadata[lagMetadata]) + assert.Equal(t, m.StreamName, tc.metadata[streamNameMetadata]) + assert.Equal(t, m.ConsumerGroupName, tc.metadata[consumerGroupNameMetadata]) + assert.Equal(t, strconv.FormatInt(m.TargetLag, 10), tc.metadata[lagMetadata]) if authParams != nil { // if authParam is used - assert.Equal(t, m.connectionInfo.username, authParams[usernameMetadata]) - assert.Equal(t, m.connectionInfo.password, authParams[passwordMetadata]) + assert.Equal(t, m.ConnectionInfo.Username, authParams[usernameMetadata]) + assert.Equal(t, m.ConnectionInfo.Password, authParams[passwordMetadata]) } else { // if metadata is used to pass credentials' env var names - assert.Equal(t, m.connectionInfo.username, tc.resolvedEnv[tc.metadata[usernameMetadata]]) - assert.Equal(t, m.connectionInfo.password, tc.resolvedEnv[tc.metadata[passwordMetadata]]) + assert.Equal(t, m.ConnectionInfo.Username, tc.resolvedEnv[tc.metadata[usernameMetadata]]) + assert.Equal(t, m.ConnectionInfo.Password, tc.resolvedEnv[tc.metadata[passwordMetadata]]) } - assert.Equal(t, strconv.Itoa(m.databaseIndex), tc.metadata[databaseIndexMetadata]) + assert.Equal(t, strconv.Itoa(m.DatabaseIndex), tc.metadata[databaseIndexMetadata]) b, err := strconv.ParseBool(tc.metadata[enableTLSMetadata]) assert.Nil(t, err) - assert.Equal(t, m.connectionInfo.enableTLS, b) + assert.Equal(t, m.ConnectionInfo.EnableTLS, b) }) } } @@ -160,7 +160,7 @@ func TestParseRedisStreamsMetadataForInvalidCases(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(te *testing.T) { - _, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: map[string]string{}}, parseRedisAddress) + _, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: tc.metadata, ResolvedEnv: tc.resolvedEnv, AuthParams: map[string]string{}}) assert.NotNil(t, err) }) } @@ -191,7 +191,7 @@ func TestRedisStreamsGetMetricSpecForScaling(t *testing.T) { } for _, testData := range redisStreamMetricIdentifiers { - meta, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: map[string]string{"REDIS_SERVICE": "my-address"}, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}, parseRedisAddress) + meta, err := parseRedisStreamsMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: map[string]string{"REDIS_SERVICE": "my-address"}, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) if err != nil { t.Fatal("Could not parse metadata:", err) } @@ -250,7 +250,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "pendingEntriesCount": "invalid", }, wantMeta: nil, - wantErr: strconv.ErrSyntax, + wantErr: ErrRedisStreamParse, }, { name: "invalid lag", @@ -263,7 +263,7 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "lagCount": "junk", }, wantMeta: nil, - wantErr: strconv.ErrSyntax, + wantErr: ErrRedisStreamParse, }, { name: "address is defined in auth params", @@ -277,13 +277,13 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 6, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 6, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, scaleFactor: lagFactor, }, @@ -300,12 +300,12 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, scaleFactor: xPendingFactor, }, @@ -324,15 +324,15 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "ports": "1, 2, 3", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 6, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 6, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, scaleFactor: lagFactor, }, @@ -350,14 +350,14 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "ports": "1, 2, 3", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, scaleFactor: xPendingFactor, }, @@ -377,16 +377,16 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "username": "username", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, scaleFactor: lagFactor, }, @@ -405,15 +405,15 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "username": "username", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, scaleFactor: xPendingFactor, }, @@ -431,15 +431,15 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, scaleFactor: xPendingFactor, }, @@ -459,16 +459,16 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "none", }, scaleFactor: lagFactor, }, @@ -487,15 +487,15 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "none", }, scaleFactor: xPendingFactor, }, @@ -515,16 +515,16 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", }, scaleFactor: lagFactor, }, @@ -543,15 +543,15 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", }, scaleFactor: xPendingFactor, }, @@ -571,16 +571,16 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "none", }, scaleFactor: lagFactor, }, @@ -599,15 +599,15 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "none", }, scaleFactor: xPendingFactor, }, @@ -628,18 +628,18 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: false, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: false, }, scaleFactor: lagFactor, }, @@ -659,17 +659,17 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: false, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: false, }, scaleFactor: xPendingFactor, }, @@ -691,18 +691,18 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: true, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: true, }, scaleFactor: lagFactor, }, @@ -723,17 +723,17 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: true, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: true, }, scaleFactor: xPendingFactor, }, @@ -757,20 +757,20 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "keyPassword": "keeyPassword", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - ca: "caaa", - cert: "ceert", - key: "keey", - keyPassword: "keeyPassword", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + Ca: "caaa", + Cert: "ceert", + Key: "keey", + KeyPassword: "keeyPassword", }, scaleFactor: xPendingFactor, }, @@ -785,10 +785,10 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetStreamLength: 5, - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + StreamName: "my-stream", + TargetStreamLength: 5, + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, scaleFactor: xLengthFactor, }, @@ -804,13 +804,13 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - targetLag: 0, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + TargetLag: 0, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, scaleFactor: xPendingFactor, }, @@ -826,9 +826,9 @@ func TestParseRedisClusterStreamsMetadata(t *testing.T) { ResolvedEnv: c.resolvedEnv, AuthParams: c.authParams, } - meta, err := parseRedisStreamsMetadata(config, parseRedisClusterAddress) + meta, err := parseRedisStreamsMetadata(config) if c.wantErr != nil { - assert.ErrorIs(t, err, c.wantErr) + assert.ErrorContains(t, err, c.wantErr.Error()) } else { assert.NoError(t, err) } @@ -882,7 +882,7 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "activationLagCount": "3", }, wantMeta: nil, - wantErr: strconv.ErrSyntax, + wantErr: ErrRedisStreamParse, }, { name: "address is defined in auth params", @@ -896,13 +896,13 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, scaleFactor: lagFactor, }, @@ -919,12 +919,12 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "addresses": ":7001, :7002", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{":7001", ":7002"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{":7001", ":7002"}, }, scaleFactor: xPendingFactor, }, @@ -943,15 +943,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "ports": "1, 2, 3", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, scaleFactor: lagFactor, }, @@ -969,14 +969,14 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "ports": "1, 2, 3", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, scaleFactor: xPendingFactor, }, @@ -996,16 +996,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "username": "username", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, scaleFactor: lagFactor, }, @@ -1024,15 +1024,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "username": "username", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, scaleFactor: xPendingFactor, }, @@ -1051,15 +1051,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + StreamName: "my-stream", + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, scaleFactor: lagFactor, }, @@ -1077,15 +1077,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "username", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "username", }, scaleFactor: xPendingFactor, }, @@ -1105,16 +1105,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "none", }, scaleFactor: lagFactor, }, @@ -1133,15 +1133,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - username: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Username: "none", }, scaleFactor: xPendingFactor, }, @@ -1161,16 +1161,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", }, scaleFactor: lagFactor, }, @@ -1189,15 +1189,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", }, scaleFactor: xPendingFactor, }, @@ -1217,16 +1217,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "none", }, scaleFactor: lagFactor, }, @@ -1245,15 +1245,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "none", }, scaleFactor: xPendingFactor, }, @@ -1273,16 +1273,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "sentinelUsername": "sentinelUsername", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "sentinelUsername", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "sentinelUsername", }, scaleFactor: lagFactor, }, @@ -1301,15 +1301,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "sentinelUsername": "sentinelUsername", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "sentinelUsername", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "sentinelUsername", }, scaleFactor: xPendingFactor, }, @@ -1328,16 +1328,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "sentinelUsername", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "sentinelUsername", }, scaleFactor: lagFactor, }, @@ -1355,15 +1355,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "sentinelUsername", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "sentinelUsername", }, scaleFactor: xPendingFactor, }, @@ -1383,16 +1383,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "none", }, scaleFactor: lagFactor, }, @@ -1411,15 +1411,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelUsername: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelUsername: "none", }, scaleFactor: xPendingFactor, }, @@ -1439,16 +1439,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "sentinelPassword": "sentinelPassword", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelPassword: "sentinelPassword", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelPassword: "sentinelPassword", }, scaleFactor: lagFactor, }, @@ -1467,15 +1467,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "sentinelPassword": "sentinelPassword", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelPassword: "sentinelPassword", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelPassword: "sentinelPassword", }, scaleFactor: xPendingFactor, }, @@ -1495,16 +1495,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelPassword: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelPassword: "none", }, scaleFactor: lagFactor, }, @@ -1523,15 +1523,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelPassword: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelPassword: "none", }, scaleFactor: xPendingFactor, }, @@ -1551,16 +1551,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "sentinelMaster": "sentinelMaster", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "sentinelMaster", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "sentinelMaster", }, scaleFactor: lagFactor, }, @@ -1579,15 +1579,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "sentinelMaster": "sentinelMaster", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "sentinelMaster", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "sentinelMaster", }, scaleFactor: xPendingFactor, }, @@ -1606,16 +1606,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "sentinelMaster", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "sentinelMaster", }, scaleFactor: lagFactor, }, @@ -1633,15 +1633,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "sentinelMaster", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "sentinelMaster", }, scaleFactor: xPendingFactor, }, @@ -1661,16 +1661,16 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "none", }, scaleFactor: lagFactor, }, @@ -1689,15 +1689,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - sentinelMaster: "none", + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + SentinelMaster: "none", }, scaleFactor: xPendingFactor, }, @@ -1718,18 +1718,18 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: false, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: false, }, scaleFactor: lagFactor, }, @@ -1749,17 +1749,17 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: false, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: false, }, scaleFactor: xPendingFactor, }, @@ -1781,18 +1781,18 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: true, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: true, }, scaleFactor: lagFactor, }, @@ -1813,17 +1813,17 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { "password": "password", }, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, - password: "password", - enableTLS: true, - unsafeSsl: true, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, + Password: "password", + EnableTLS: true, + UnsafeSsl: true, }, scaleFactor: xPendingFactor, }, @@ -1839,15 +1839,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetStreamLength: 15, - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1"}, - hosts: []string{"a"}, - ports: []string{"1"}, - password: "", - enableTLS: false, - unsafeSsl: false, + StreamName: "my-stream", + TargetStreamLength: 15, + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1"}, + Hosts: []string{"a"}, + Ports: []string{"1"}, + Password: "", + EnableTLS: false, + UnsafeSsl: false, }, scaleFactor: xLengthFactor, }, @@ -1866,18 +1866,18 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 70, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1"}, - hosts: []string{"a"}, - ports: []string{"1"}, - password: "", - enableTLS: false, - unsafeSsl: false, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 70, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1"}, + Hosts: []string{"a"}, + Ports: []string{"1"}, + Password: "", + EnableTLS: false, + UnsafeSsl: false, }, scaleFactor: lagFactor, }, @@ -1895,17 +1895,17 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 5, - activationLagCount: 0, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1"}, - hosts: []string{"a"}, - ports: []string{"1"}, - password: "", - enableTLS: false, - unsafeSsl: false, + StreamName: "my-stream", + TargetPendingEntriesCount: 5, + ActivationLagCount: 0, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1"}, + Hosts: []string{"a"}, + Ports: []string{"1"}, + Password: "", + EnableTLS: false, + UnsafeSsl: false, }, scaleFactor: xPendingFactor, }, @@ -1922,15 +1922,15 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { }, authParams: map[string]string{}, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetStreamLength: 15, - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1"}, - hosts: []string{"a"}, - ports: []string{"1"}, - password: "", - enableTLS: false, - unsafeSsl: false, + StreamName: "my-stream", + TargetStreamLength: 15, + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1"}, + Hosts: []string{"a"}, + Ports: []string{"1"}, + Password: "", + EnableTLS: false, + UnsafeSsl: false, }, scaleFactor: xLengthFactor, }, @@ -1946,9 +1946,9 @@ func TestParseRedisSentinelStreamsMetadata(t *testing.T) { ResolvedEnv: c.resolvedEnv, AuthParams: c.authParams, } - meta, err := parseRedisStreamsMetadata(config, parseRedisSentinelAddress) + meta, err := parseRedisStreamsMetadata(config) if c.wantErr != nil { - assert.ErrorIs(t, err, c.wantErr) + assert.ErrorContains(t, err, c.wantErr.Error()) } else { assert.NoError(t, err) } @@ -1982,15 +1982,15 @@ func TestActivityCount(t *testing.T) { authParams: map[string]string{}, resolvedEnv: testRedisResolvedEnv, wantMeta: &redisStreamsMetadata{ - streamName: "my-stream", - targetPendingEntriesCount: 0, - targetLag: 7, - activationLagCount: 3, - consumerGroupName: "consumer1", - connectionInfo: redisConnectionInfo{ - addresses: []string{"a:1", "b:2", "c:3"}, - hosts: []string{"a", "b", "c"}, - ports: []string{"1", "2", "3"}, + StreamName: "my-stream", + TargetPendingEntriesCount: 0, + TargetLag: 7, + ActivationLagCount: 3, + ConsumerGroupName: "consumer1", + ConnectionInfo: redisConnectionInfo{ + Addresses: []string{"a:1", "b:2", "c:3"}, + Hosts: []string{"a", "b", "c"}, + Ports: []string{"1", "2", "3"}, }, scaleFactor: lagFactor, }, @@ -2002,7 +2002,7 @@ func TestActivityCount(t *testing.T) { ResolvedEnv: c.resolvedEnv, AuthParams: c.authParams, } - meta, err := parseRedisStreamsMetadata(config, parseRedisClusterAddress) + meta, err := parseRedisStreamsMetadata(config) if c.wantErr != nil { assert.ErrorIs(t, err, c.wantErr) } else { diff --git a/pkg/scalers/scalersconfig/typed_config.go b/pkg/scalers/scalersconfig/typed_config.go index 9cba6254225..896955ec70a 100644 --- a/pkg/scalers/scalersconfig/typed_config.go +++ b/pkg/scalers/scalersconfig/typed_config.go @@ -84,7 +84,7 @@ type Params struct { FieldName string // Name is the 'name' tag parameter defining the key in triggerMetadata, resolvedEnv or authParams - Name string + Name []string // Optional is the 'optional' tag parameter defining if the parameter is optional Optional bool @@ -116,7 +116,7 @@ type Params struct { // IsNested is a function that returns true if the parameter is nested func (p Params) IsNested() bool { - return p.Name == "" + return len(p.Name) == 0 } // IsDeprecated is a function that returns true if the parameter is deprecated @@ -406,23 +406,24 @@ func setConfigValueHelper(params Params, valFromConfig string, field reflect.Val func (sc *ScalerConfig) configParamValue(params Params) (string, bool) { for _, po := range params.Order { var m map[string]string - key := params.Name - switch po { - case TriggerMetadata: - m = sc.TriggerMetadata - case AuthParams: - m = sc.AuthParams - case ResolvedEnv: - m = sc.ResolvedEnv - key = sc.TriggerMetadata[fmt.Sprintf("%sFromEnv", params.Name)] - default: - // this is checked when parsing the tags but adding as default case to avoid any potential future problems - return "", false - } - param, ok := m[key] - param = strings.TrimSpace(param) - if ok && param != "" { - return param, true + for _, key := range params.Name { + switch po { + case TriggerMetadata: + m = sc.TriggerMetadata + case AuthParams: + m = sc.AuthParams + case ResolvedEnv: + m = sc.ResolvedEnv + key = sc.TriggerMetadata[fmt.Sprintf("%sFromEnv", key)] + default: + // this is checked when parsing the tags but adding as default case to avoid any potential future problems + return "", false + } + param, ok := m[key] + param = strings.TrimSpace(param) + if ok && param != "" { + return param, true + } } } return "", params.IsNested() @@ -458,7 +459,7 @@ func paramsFromTag(tag string, field reflect.StructField) (Params, error) { } case nameTag: if len(tsplit) > 1 { - params.Name = strings.TrimSpace(tsplit[1]) + params.Name = strings.Split(strings.TrimSpace(tsplit[1]), tagValueSeparator) } case deprecatedTag: if len(tsplit) == 1 { diff --git a/pkg/scalers/scalersconfig/typed_config_test.go b/pkg/scalers/scalersconfig/typed_config_test.go index 996e45a1dd8..d817f9f108f 100644 --- a/pkg/scalers/scalersconfig/typed_config_test.go +++ b/pkg/scalers/scalersconfig/typed_config_test.go @@ -128,12 +128,12 @@ func TestMissing(t *testing.T) { sc := &ScalerConfig{} type testStruct struct { - StringVal string `keda:"name=stringVal, order=triggerMetadata"` + StringVal string `keda:"name=stringVal, order=triggerMetadata"` } ts := testStruct{} err := sc.TypedConfig(&ts) - Expect(err).To(MatchError(`missing required parameter "stringVal" in [triggerMetadata]`)) + Expect(err).To(MatchError(`missing required parameter ["stringVal"] in [triggerMetadata]`)) } // TestDeprecated tests the deprecated tag @@ -152,7 +152,7 @@ func TestDeprecated(t *testing.T) { ts := testStruct{} err := sc.TypedConfig(&ts) - Expect(err).To(MatchError(`parameter "stringVal" is deprecated`)) + Expect(err).To(MatchError(`parameter ["stringVal"] is deprecated`)) sc2 := &ScalerConfig{ TriggerMetadata: map[string]string{}, @@ -276,7 +276,7 @@ func TestEnum(t *testing.T) { ts2 := testStruct{} err = sc2.TypedConfig(&ts2) - Expect(err).To(MatchError(`parameter "enumVal" value "value3" must be one of [value1 value2]`)) + Expect(err).To(MatchError(`parameter ["enumVal"] value "value3" must be one of [value1 value2]`)) } // TestExclusive tests the exclusiveSet type @@ -305,7 +305,7 @@ func TestExclusive(t *testing.T) { ts2 := testStruct{} err = sc2.TypedConfig(&ts2) - Expect(err).To(MatchError(`parameter "intVal" value "1,4" must contain only one of [1 4 5]`)) + Expect(err).To(MatchError(`parameter ["intVal"] value "1,4" must contain only one of [1 4 5]`)) } // TestURLValues tests the url.Values type @@ -503,7 +503,7 @@ func TestNoParsingOrder(t *testing.T) { } tsm := testStructMissing{} err := sc.TypedConfig(&tsm) - Expect(err).To(MatchError(`missing required parameter "strVal", no 'order' tag, provide any from [authParams resolvedEnv triggerMetadata]`)) + Expect(err).To(MatchError(ContainSubstring(`missing required parameter ["strVal"], no 'order' tag, provide any from [authParams resolvedEnv triggerMetadata]`))) type testStructDefault struct { DefaultVal string `keda:"name=defaultVal, default=dv"` @@ -553,3 +553,33 @@ func TestRange(t *testing.T) { Expect(ts.DottedRange).To(ConsistOf(2, 3, 4, 5, 6, 7)) Expect(ts.WrongRange).To(HaveLen(0)) } + +// TestMultiName tests the multi name param +func TestMultiName(t *testing.T) { + RegisterTestingT(t) + + sc := &ScalerConfig{ + TriggerMetadata: map[string]string{ + "property1": "aaa", + }, + } + + sc2 := &ScalerConfig{ + TriggerMetadata: map[string]string{ + "property2": "bbb", + }, + } + + type testStruct struct { + Property string `keda:"name=property1;property2, order=triggerMetadata"` + } + + ts := testStruct{} + err := sc.TypedConfig(&ts) + Expect(err).To(BeNil()) + Expect(ts.Property).To(Equal("aaa")) + + err = sc2.TypedConfig(&ts) + Expect(err).To(BeNil()) + Expect(ts.Property).To(Equal("bbb")) +}