diff --git a/CHANGELOG.md b/CHANGELOG.md index bba14397e9c..50f3c62b886 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - Extend Azure Monitor scaler to support custom metrics ([#1883](https://github.com/kedacore/keda/pull/1883)) - Support non-public cloud environments in the Azure Storage Queue and Azure Storage Blob scalers ([#1863](https://github.com/kedacore/keda/pull/1863)) - Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862)) +- Add Solace PubSub+ Event Broker Scaler ([#1945](https://github.com/kedacore/keda/pull/1945)) - Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872)) ### Improvements diff --git a/pkg/scalers/solace_scaler.go b/pkg/scalers/solace_scaler.go new file mode 100644 index 00000000000..34b77c69b9a --- /dev/null +++ b/pkg/scalers/solace_scaler.go @@ -0,0 +1,367 @@ +package scalers + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + solaceExtMetricType = "External" + solaceScalerID = "solace" + // REST ENDPOINT String Patterns + solaceSempEndpointURLTemplate = "%s/%s/%s/monitor/msgVpns/%s/%ss/%s" + // SEMP REST API Context + solaceAPIName = "SEMP" + solaceAPIVersion = "v2" + solaceAPIObjectTypeQueue = "queue" + // Log Message Templates + solaceFoundMetaFalse = "required Field %s NOT FOUND in Solace Metadata" + // YAML Configuration Metadata Field Names + // Broker Identifiers + solaceMetaSempBaseURL = "solaceSempBaseURL" + // Credential Identifiers + solaceMetaUsername = "username" + solaceMetaPassword = "password" + solaceMetaUsernameFromEnv = "usernameFromEnv" + solaceMetaPasswordFromEnv = "passwordFromEnv" + // Target Object Identifiers + solaceMetaMsgVpn = "messageVpn" + solaceMetaQueueName = "queueName" + // Metric Targets + solaceMetaMsgCountTarget = "messageCountTarget" + solaceMetaMsgSpoolUsageTarget = "messageSpoolUsageTarget" + // Trigger type identifiers + solaceTriggermsgcount = "msgcount" + solaceTriggermsgspoolusage = "msgspoolusage" +) + +// Struct for Observed Metric Values +type SolaceMetricValues struct { + // Observed Message Count + msgCount int + // Observed Message Spool Usage + msgSpoolUsage int +} + +type SolaceScaler struct { + metadata *SolaceMetadata + httpClient *http.Client +} + +type SolaceMetadata struct { + // Full SEMP URL to target queue (CONSTRUCTED IN CODE) + endpointURL string + solaceSempURL string + // Solace Message VPN + messageVpn string + queueName string + // Basic Auth Username + username string + // Basic Auth Password + password string + // Target Message Count + msgCountTarget int + msgSpoolUsageTarget int // Spool Use Target in Megabytes +} + +// SEMP API Response Root Struct +type solaceSEMPResponse struct { + Collections solaceSEMPCollections `json:"collections"` + Data solaceSEMPData `json:"data"` + Meta solaceSEMPMetadata `json:"meta"` +} + +// SEMP API Response Collections Struct +type solaceSEMPCollections struct { + Msgs solaceSEMPMessages `json:"msgs"` +} + +// SEMP API Response Queue Data Struct +type solaceSEMPData struct { + MsgSpoolUsage int `json:"msgSpoolUsage"` +} + +// SEMP API Messages Struct +type solaceSEMPMessages struct { + Count int `json:"count"` +} + +// SEMP API Metadata Struct +type solaceSEMPMetadata struct { + ResponseCode int `json:"responseCode"` +} + +// Solace Logger +var solaceLog = logf.Log.WithName(solaceScalerID + "_scaler") + +// Constructor for SolaceScaler +func NewSolaceScaler(config *ScalerConfig) (Scaler, error) { + // Create HTTP Client + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout) + + // Parse Solace Metadata + solaceMetadata, err := parseSolaceMetadata(config) + if err != nil { + solaceLog.Error(err, "Error parsing Solace Trigger Metadata or missing values") + return nil, err + } + + return &SolaceScaler{ + metadata: solaceMetadata, + httpClient: httpClient, + }, nil +} + +// Called by constructor +func parseSolaceMetadata(config *ScalerConfig) (*SolaceMetadata, error) { + meta := SolaceMetadata{} + // GET THE SEMP API ENDPOINT + if val, ok := config.TriggerMetadata[solaceMetaSempBaseURL]; ok && val != "" { + meta.solaceSempURL = val + } else { + return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaSempBaseURL) + } + // GET Message VPN + if val, ok := config.TriggerMetadata[solaceMetaMsgVpn]; ok && val != "" { + meta.messageVpn = val + } else { + return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaMsgVpn) + } + // GET Queue Name + if val, ok := config.TriggerMetadata[solaceMetaQueueName]; ok && val != "" { + meta.queueName = val + } else { + return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaQueueName) + } + + // GET METRIC TARGET VALUES + // GET msgCountTarget + if val, ok := config.TriggerMetadata[solaceMetaMsgCountTarget]; ok && val != "" { + if msgCount, err := strconv.Atoi(val); err == nil { + meta.msgCountTarget = msgCount + } else { + return nil, fmt.Errorf("can't parse [%s], not a valid integer: %s", solaceMetaMsgCountTarget, err) + } + } + // GET msgSpoolUsageTarget + if val, ok := config.TriggerMetadata[solaceMetaMsgSpoolUsageTarget]; ok && val != "" { + if msgSpoolUsage, err := strconv.Atoi(val); err == nil { + meta.msgSpoolUsageTarget = msgSpoolUsage * 1024 * 1024 + } else { + return nil, fmt.Errorf("can't parse [%s], not a valid integer: %s", solaceMetaMsgSpoolUsageTarget, err) + } + } + + // Check that we have at least one positive target value for the scaler + if meta.msgCountTarget < 1 && meta.msgSpoolUsageTarget < 1 { + return nil, fmt.Errorf("no target value found in the scaler configuration") + } + + // Format Solace SEMP Queue Endpoint (REST URL) + meta.endpointURL = fmt.Sprintf( + solaceSempEndpointURLTemplate, + meta.solaceSempURL, + solaceAPIName, + solaceAPIVersion, + meta.messageVpn, + solaceAPIObjectTypeQueue, + meta.queueName) + + // Get Credentials + var e error + if meta.username, meta.password, e = getSolaceSempCredentials(config); e != nil { + return nil, e + } + return &meta, nil +} + +func getSolaceSempCredentials(config *ScalerConfig) (u string, p string, err error) { + // GET CREDENTIALS + // The username must be a valid broker ADMIN user identifier with read access to SEMP for the broker, VPN, and relevant objects + // The scaler will attempt to acquire username and then password independently. For each: + // - Search K8S Secret (Encoded) + // - Search environment variable specified by config at 'usernameFromEnv' / 'passwordFromEnv' + // - Search 'username' / 'password' fields (Clear Text) + // Get username + if usernameSecret, ok := config.AuthParams[solaceMetaUsername]; ok && usernameSecret != "" { + u = usernameSecret + } else if usernameFromEnv, ok := config.TriggerMetadata[solaceMetaUsernameFromEnv]; ok && usernameFromEnv != "" { + if resolvedUser, ok := config.ResolvedEnv[config.TriggerMetadata[solaceMetaUsernameFromEnv]]; ok && resolvedUser != "" { + u = resolvedUser + } else { + return "", "", fmt.Errorf("username could not be resolved from the environment variable: %s", usernameFromEnv) + } + } else if usernameClear, ok := config.TriggerMetadata[solaceMetaUsername]; ok && usernameClear != "" { + u = usernameClear + } else { + return "", "", fmt.Errorf("username is required and not found in K8Secret, environment, or clear text") + } + // Get Password + if passwordSecret, ok := config.AuthParams[solaceMetaPassword]; ok && passwordSecret != "" { + p = passwordSecret + } else if passwordEnv, ok := config.TriggerMetadata[solaceMetaPasswordFromEnv]; ok && passwordEnv != "" { + if resolvedPassword, ok := config.ResolvedEnv[config.TriggerMetadata[solaceMetaPasswordFromEnv]]; ok && resolvedPassword != "" { + p = resolvedPassword + } else { + return "", "", fmt.Errorf("password could not be resolved from the environment variable: %s", passwordEnv) + } + } else if passwordClear, ok := config.TriggerMetadata[solaceMetaPassword]; ok && passwordClear != "" { + p = passwordClear + } else { + return "", "", fmt.Errorf("password is required and not found in K8Secret, environment, or clear text") + } + return u, p, nil +} + +// INTERFACE METHOD +// DEFINE METRIC FOR SCALING +// CURRENT SUPPORTED METRICS ARE: +// - QUEUE MESSAGE COUNT (msgCount) +// - QUEUE SPOOL USAGE (msgSpoolUsage in MBytes) +// METRIC IDENTIFIER HAS THE SIGNATURE: +// - solace-[VPN_Name]-[Queue_Name]-[metric_type] +// e.g. solace-myvpn-QUEUE1-msgCount +func (s *SolaceScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + var metricSpecList []v2beta2.MetricSpec + // Message Count Target Spec + if s.metadata.msgCountTarget > 0 { + targetMetricValue := resource.NewQuantity(int64(s.metadata.msgCountTarget), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", solaceScalerID, s.metadata.messageVpn, s.metadata.queueName, solaceTriggermsgcount)), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: solaceExtMetricType} + metricSpecList = append(metricSpecList, metricSpec) + } + // Message Spool Usage Target Spec + if s.metadata.msgSpoolUsageTarget > 0 { + targetMetricValue := resource.NewQuantity(int64(s.metadata.msgSpoolUsageTarget), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s-%s", solaceScalerID, s.metadata.messageVpn, s.metadata.queueName, solaceTriggermsgspoolusage)), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: solaceExtMetricType} + metricSpecList = append(metricSpecList, metricSpec) + } + return metricSpecList +} + +// returns SolaceMetricValues struct populated from broker SEMP endpoint +func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP() (SolaceMetricValues, error) { + var scaledMetricEndpointURL = s.metadata.endpointURL + var httpClient = s.httpClient + var sempResponse solaceSEMPResponse + var metricValues SolaceMetricValues + + // RETRIEVE METRICS FROM SOLACE SEMP API + // Define HTTP Request + request, err := http.NewRequest("GET", scaledMetricEndpointURL, nil) + if err != nil { + return SolaceMetricValues{}, fmt.Errorf("failed attempting request to solace semp api: %s", err) + } + + // Add HTTP Auth and Headers + request.SetBasicAuth(s.metadata.username, s.metadata.password) + request.Header.Set("Content-Type", "application/json") + + // Call Solace SEMP API + response, err := httpClient.Do(request) + if err != nil { + return SolaceMetricValues{}, fmt.Errorf("call to solace semp api failed: %s", err) + } + defer response.Body.Close() + + // Check HTTP Status Code + if response.StatusCode < 200 || response.StatusCode > 299 { + sempError := fmt.Errorf("semp request http status code: %s - %s", strconv.Itoa(response.StatusCode), response.Status) + return SolaceMetricValues{}, sempError + } + + // Decode SEMP Response and Test + if err := json.NewDecoder(response.Body).Decode(&sempResponse); err != nil { + return SolaceMetricValues{}, fmt.Errorf("failed to read semp response body: %s", err) + } + if sempResponse.Meta.ResponseCode < 200 || sempResponse.Meta.ResponseCode > 299 { + return SolaceMetricValues{}, fmt.Errorf("solace semp api returned error status: %d", sempResponse.Meta.ResponseCode) + } + + // Set Return Values + metricValues.msgCount = sempResponse.Collections.Msgs.Count + metricValues.msgSpoolUsage = sempResponse.Data.MsgSpoolUsage + return metricValues, nil +} + +// INTERFACE METHOD +// Call SEMP API to retrieve metrics +// returns value for named metric +func (s *SolaceScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + var metricValues, mv SolaceMetricValues + var mve error + if mv, mve = s.getSolaceQueueMetricsFromSEMP(); mve != nil { + solaceLog.Error(mve, "call to semp endpoint failed") + return []external_metrics.ExternalMetricValue{}, mve + } + metricValues = mv + + var metric external_metrics.ExternalMetricValue + switch { + case strings.HasSuffix(metricName, solaceTriggermsgcount): + metric = external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(metricValues.msgCount), resource.DecimalSI), + Timestamp: metav1.Now(), + } + case strings.HasSuffix(metricName, solaceTriggermsgspoolusage): + metric = external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(metricValues.msgSpoolUsage), resource.DecimalSI), + Timestamp: metav1.Now(), + } + default: + // Should never end up here + err := fmt.Errorf("unidentified metric: %s", metricName) + solaceLog.Error(err, "returning error to calling app") + return []external_metrics.ExternalMetricValue{}, err + } + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// INTERFACE METHOD +// Call SEMP API to retrieve metrics +// IsActive returns true if queue messageCount > 0 || msgSpoolUsage > 0 +func (s *SolaceScaler) IsActive(ctx context.Context) (bool, error) { + metricValues, err := s.getSolaceQueueMetricsFromSEMP() + if err != nil { + solaceLog.Error(err, "call to semp endpoint failed") + return false, err + } + return (metricValues.msgCount > 0 || metricValues.msgSpoolUsage > 0), nil +} + +// Do Nothing - Satisfies Interface +func (s *SolaceScaler) Close() error { + return nil +} diff --git a/pkg/scalers/solace_scaler_test.go b/pkg/scalers/solace_scaler_test.go new file mode 100644 index 00000000000..613f917f8f5 --- /dev/null +++ b/pkg/scalers/solace_scaler_test.go @@ -0,0 +1,435 @@ +package scalers + +import ( + "fmt" + "net/http" + "testing" + + "k8s.io/api/autoscaling/v2beta2" +) + +type testSolaceMetadata struct { + testID string + metadata map[string]string + isError bool +} + +var ( + soltestValidBaseURL = "http://localhost:8080" + soltestValidUsername = "admin" + soltestValidPassword = "admin" + soltestValidVpn = "dennis_vpn" + soltestValidQueueName = "queue3" + soltestValidMsgCountTarget = "10" + soltestValidMsgSpoolTarget = "20" + soltestEnvUsername = "SOLTEST_USERNAME" + soltestEnvPassword = "SOLTEST_PASSWORD" +) + +// AUTH RECORD FOR TEST +var testDataSolaceAuthParamsVALID = map[string]string{ + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, +} + +// ENV VARS FOR TEST -- VALID USER / PWD +var testDataSolaceResolvedEnvVALID = map[string]string{ + soltestEnvUsername: soltestValidUsername, // Sets the environment variables to the correct values + soltestEnvPassword: soltestValidPassword, +} + +// TEST CASES FOR SolaceParseMetadata() +var testParseSolaceMetadata = []testSolaceMetadata{ + // Empty + { + "#001 - EMPTY", map[string]string{}, + true, + }, + // +Case - brokerBaseUrl + { + "#002 - brokerBaseUrl", + map[string]string{ + "": "", + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + false, + }, + // -Case - missing username (clear) + { + "#007 - missing username (clear)", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: "", + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + true, + }, + // -Case - missing password (clear) + { + "#008 - missing password (clear)", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: "", + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + true, + }, + // -Case - missing queue + { + "#009 - missing queueName", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: "", + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + true, + }, + // -Case - missing msgCountTarget + { + "#010 - missing msgCountTarget", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: "", + solaceMetaMsgSpoolUsageTarget: "", + }, + true, + }, + // -Case - msgSpoolUsageTarget non-numeric + { + "#011 - msgSpoolUsageTarget non-numeric", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: "NOT_AN_INTEGER", + }, + true, + }, + // -Case - msgSpoolUsage non-numeric + { + "#012 - msgSpoolUsage non-numeric", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgSpoolUsageTarget: "NOT_AN_INTEGER", + }, + true, + }, + // +Case - Pass with msgSpoolUsageTarget and not msgCountTarget + { + "#013 - brokerBaseUrl", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgSpoolUsageTarget: soltestValidMsgSpoolTarget, + }, + false, + }, +} + +var testSolaceEnvCreds = []testSolaceMetadata{ + // +Case - Should find ENV vars + { + "#101 - Connect with Credentials in env", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: soltestEnvUsername, + solaceMetaPasswordFromEnv: soltestEnvPassword, + // solaceMetaUsername: "", + // solaceMetaPassword: "", + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + false, + }, + // -Case - Should fail with ENV var not found + { + "#102 - Environment vars referenced but not found", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "SOLTEST_DNE", + solaceMetaPasswordFromEnv: "SOLTEST_DNE", + // solaceMetaUsername: "", + // solaceMetaPassword: "", + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + true, + }, +} + +var testSolaceK8sSecretCreds = []testSolaceMetadata{ + // Records require Auth Record to be passed + + // +Case - Should find + { + "#201 - Connect with credentials from Auth Record (ENV VAR Present)", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: soltestEnvUsername, + solaceMetaPasswordFromEnv: soltestEnvPassword, + // solaceMetaUsername: "", + // solaceMetaPassword: "", + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + false, + }, + // +Case - should find creds + { + "#202 - Connect with credentials from Auth Record (ENV VAR and Clear Auth not present)", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + // solaceMetaUsernameFromEnv: soltestEnvUsername, + // solaceMetaPasswordFromEnv: soltestEnvPassword, + // solaceMetaUsername: "", + // solaceMetaPassword: "", + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + false, + }, + // +Case - Should find with creds + { + "#203 - Connect with credentials from Auth Record (ENV VAR Present, Clear Auth not present)", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "SOLTEST_DNE", + solaceMetaPasswordFromEnv: "SOLTEST_DNE", + // solaceMetaUsername: "", + // solaceMetaPassword: "", + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + }, + false, + }, +} + +var testSolaceGetMetricSpecData = []testSolaceMetadata{ + { + "#401 - Get Metric Spec - msgCountTarget", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + // solaceMetaMsgSpoolUsageTarget: soltestValidMsgSpoolTarget, + }, + false, + }, + { + "#402 - Get Metric Spec - msgSpoolUsageTarget", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + // solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + solaceMetaMsgSpoolUsageTarget: soltestValidMsgSpoolTarget, + }, + false, + }, + { + "#403 - Get Metric Spec - BOTH msgSpoolUsage and msgCountTarget", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + solaceMetaMsgSpoolUsageTarget: soltestValidMsgSpoolTarget, + }, + false, + }, + { + "#404 - Get Metric Spec - BOTH MISSING", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + // solaceMetaMsgCountTarget: soltestValidMsgCountTarget, + // solaceMetaMsgSpoolUsageTarget: soltestValidMsgSpoolTarget, + }, + true, + }, + { + "#405 - Get Metric Spec - BOTH ZERO", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: "0", + solaceMetaMsgSpoolUsageTarget: "0", + }, + true, + }, + { + "#406 - Get Metric Spec - ONE ZERO; OTHER VALID", + map[string]string{ + solaceMetaSempBaseURL: soltestValidBaseURL, + solaceMetaMsgVpn: soltestValidVpn, + solaceMetaUsernameFromEnv: "", + solaceMetaPasswordFromEnv: "", + solaceMetaUsername: soltestValidUsername, + solaceMetaPassword: soltestValidPassword, + solaceMetaQueueName: soltestValidQueueName, + solaceMetaMsgCountTarget: "0", + solaceMetaMsgSpoolUsageTarget: soltestValidMsgSpoolTarget, + }, + false, + }, +} + +var testSolaceExpectedMetricNames = map[string]string{ + solaceScalerID + "-" + soltestValidVpn + "-" + soltestValidQueueName + "-" + solaceTriggermsgcount: "", + solaceScalerID + "-" + soltestValidVpn + "-" + soltestValidQueueName + "-" + solaceTriggermsgspoolusage: "", +} + +func TestSolaceParseSolaceMetadata(t *testing.T) { + for _, testData := range testParseSolaceMetadata { + fmt.Print(testData.testID) + _, err := parseSolaceMetadata(&ScalerConfig{ResolvedEnv: nil, TriggerMetadata: testData.metadata, AuthParams: nil}) + switch { + case err != nil && !testData.isError: + t.Error("expected success but got error: ", err) + fmt.Println(" --> FAIL") + case testData.isError && err == nil: + t.Error("expected error but got success") + fmt.Println(" --> FAIL") + default: + fmt.Println(" --> PASS") + } + } + for _, testData := range testSolaceEnvCreds { + fmt.Print(testData.testID) + _, err := parseSolaceMetadata(&ScalerConfig{ResolvedEnv: testDataSolaceResolvedEnvVALID, TriggerMetadata: testData.metadata, AuthParams: nil}) + switch { + case err != nil && !testData.isError: + t.Error("expected success but got error: ", err) + fmt.Println(" --> FAIL") + case testData.isError && err == nil: + t.Error("expected error but got success") + fmt.Println(" --> FAIL") + default: + fmt.Println(" --> PASS") + } + } + for _, testData := range testSolaceK8sSecretCreds { + fmt.Print(testData.testID) + _, err := parseSolaceMetadata(&ScalerConfig{ResolvedEnv: nil, TriggerMetadata: testData.metadata, AuthParams: testDataSolaceAuthParamsVALID}) + switch { + case err != nil && !testData.isError: + t.Error("expected success but got error: ", err) + fmt.Println(" --> FAIL") + case testData.isError && err == nil: + t.Error("expected error but got success") + fmt.Println(" --> FAIL") + default: + fmt.Println(" --> PASS") + } + } +} + +func TestSolaceGetMetricSpec(t *testing.T) { + for idx := 0; idx < len(testSolaceGetMetricSpecData); idx++ { + testData := testSolaceGetMetricSpecData[idx] + fmt.Print(testData.testID) + var err error + var solaceMeta *SolaceMetadata + solaceMeta, err = parseSolaceMetadata(&ScalerConfig{ResolvedEnv: testDataSolaceResolvedEnvVALID, TriggerMetadata: testData.metadata, AuthParams: testDataSolaceAuthParamsVALID}) + if err != nil { + fmt.Printf("\n Failed to parse metadata: %v", err) + } else { + // DECLARE SCALER AND RUN METHOD TO GET METRICS + testSolaceScaler := SolaceScaler{ + metadata: solaceMeta, + httpClient: http.DefaultClient, + } + + var metric []v2beta2.MetricSpec + if metric = testSolaceScaler.GetMetricSpecForScaling(); len(metric) == 0 { + err = fmt.Errorf("metric value not found") + } else { + metricName := metric[0].External.Metric.Name + if _, ok := testSolaceExpectedMetricNames[metricName]; ok == false { + err = fmt.Errorf("expected Metric value not found") + } + } + } + switch { + case testData.isError && err == nil: + fmt.Println(" --> FAIL") + t.Error("expected to fail but passed", err) + case !testData.isError && err != nil: + fmt.Println(" --> FAIL") + t.Error("expected success but failed", err) + default: + fmt.Println(" --> PASS") + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 62776b15b38..90dd6e66990 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -449,6 +449,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewRedisStreamsScaler(true, config) case "redis-streams": return scalers.NewRedisStreamsScaler(false, config) + case "solace-event-queue": + return scalers.NewSolaceScaler(config) case "stan": return scalers.NewStanScaler(config) default: diff --git a/tests/scalers/solace-helpers.ts b/tests/scalers/solace-helpers.ts new file mode 100644 index 00000000000..c69ec6e3eac --- /dev/null +++ b/tests/scalers/solace-helpers.ts @@ -0,0 +1,247 @@ +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import * as fs from 'fs' + +export class SolaceHelper { + + static getUpdateSolaceHelmChart(t) { + t.is( + 0, + sh.exec(`helm repo add solacecharts https://solaceproducts.github.io/pubsubplus-kubernetes-quickstart/helm-charts`).code, + 'Should retrieve Solace Helm Chart from Repo' + ) + t.is( + 0, + sh.exec(`helm repo update`).code, + 'Should update Helm Charts' + ) + } + + static installSolaceBroker(t, testNamespace: string) { + t.is( + 0, + sh.exec(`kubectl create namespace ${testNamespace}`).code, 'Should create solace namespace' + ) + t.is( + 0, + sh.exec(`helm install kedalab solacecharts/pubsubplus-dev --namespace ${testNamespace} --set solace.usernameAdminPassword=KedaLabAdminPwd1 --set storage.persistent=false`).code, 'Solace Broker should install' + ) + sh.exec('sleep 2s') + t.is( + 0, + sh.exec(`kubectl -n ${testNamespace} wait --for=condition=Ready --timeout=120s pod/kedalab-pubsubplus-dev-0`).code, 'Solace should be available.' + ) + sh.exec('sleep 2s') + } + + static installSolaceTestHelper(t, testNamespace: string) { + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, solaceTestHelperYaml) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name}`).code, 'creating test helper pod should work' + ) + t.is( + 0, + sh.exec(`kubectl -n ${testNamespace} wait --for=condition=Ready --timeout=120s pod/kedalab-helper`).code, 'kedalab-helper should be available' + ) + sh.exec('sleep 5s') + } + + static configSolacePubSubBroker(t, testNamespace: string) { + t.is( + 0, + sh.exec(`kubectl exec -n ${testNamespace} kedalab-helper -- ./config/config_solace.sh`).code, 'should be able to configure Solace PubSub Broker' + ) + } + + static installSolaceConsumer(t) { + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, solaceConsumerYaml) + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name}`).code, 'create solace-consumer deployment should work.' + ) + sh.exec('sleep 10s') + } + + static installSolaceKedaSecret(t) { + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, solaceKedaSecretYaml) + sh.exec(`kubectl apply -f ${tmpFile.name}`).code, 'creating secret should work.' + } + + static installSolaceKedaTriggerAuth(t) { + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, solaceKedaTriggerAuthYaml) + sh.exec(`kubectl apply -f ${tmpFile.name}`).code, 'creating scaled object should work.' + } + + static installSolaceKedaScaledObject(t) { + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, solaceKedaScaledObjectYaml) + sh.exec(`kubectl apply -f ${tmpFile.name}`).code, 'creating scaled object should work.' + } +/* + static publishMessages(t, testNamespace: string, messageRate: string, messageNumber: string) { + t.is( + 0, + sh.exec(`kubectl exec -n ${testNamespace} kedalab-helper -- ./sdkperf/sdkperf_java.sh -cip=kedalab-pubsubplus-dev:55555 -cu consumer_user@keda_vpn -cp=consumer_pwd -mr ${messageRate} -mn ${messageNumber} -mt=persistent -pql=SCALED_CONSUMER_QUEUE1`).code, 'publishing messages should work' + ) + } +*/ + static publishMessages(t, testNamespace: string, messageRate: string, messageNumber: string, messageSize: string) { + t.is( + 0, + sh.exec(`kubectl exec -n ${testNamespace} kedalab-helper -- ./sdkperf/sdkperf_java.sh -cip=kedalab-pubsubplus-dev:55555 -cu consumer_user@keda_vpn -cp=consumer_pwd -mr ${messageRate} -mn ${messageNumber} -msx ${messageSize} -mt=persistent -pql=SCALED_CONSUMER_QUEUE1`).code, 'publishing large messages should work' + ) + } + + static uninstallSolaceKedaObjects(t){ + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, solaceKedaScaledObjectYaml) + sh.exec(`kubectl delete -f ${tmpFile.name}`) + fs.writeFileSync(tmpFile.name, solaceKedaTriggerAuthYaml) + sh.exec(`kubectl delete -f ${tmpFile.name}`) + fs.writeFileSync(tmpFile.name, solaceKedaSecretYaml) + sh.exec(`kubectl delete -f ${tmpFile.name}`) + } + + static uninstallSolaceTestPods(t) { + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, solaceConsumerYaml) + sh.exec(`kubectl delete -f ${tmpFile.name}`) + fs.writeFileSync(tmpFile.name, solaceTestHelperYaml) + sh.exec(`kubectl delete -f ${tmpFile.name}`) + } + + static uninstallSolace(t, solaceNamespace: string){ + sh.exec(`helm uninstall kedalab --namespace=${solaceNamespace}`) + sh.exec(`sleep 6s`) + sh.exec(`kubectl delete namespace ${solaceNamespace}`) + } +} + +const solaceKedaScaledObjectYaml = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: kedalab-scaled-object + namespace: solace +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: solace-consumer + pollingInterval: 5 + cooldownPeriod: 20 + minReplicaCount: 0 + maxReplicaCount: 10 + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 10 + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Pods + value: 10 + periodSeconds: 10 + selectPolicy: Max + triggers: + - type: solace-event-queue + metadata: + solaceSempBaseURL: http://kedalab-pubsubplus-dev.solace.svc.cluster.local:8080 + messageVpn: keda_vpn + queueName: SCALED_CONSUMER_QUEUE1 + messageCountTarget: '20' + messageSpoolUsageTarget: '1' + authenticationRef: + name: kedalab-trigger-auth +` + +const solaceKedaTriggerAuthYaml = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: kedalab-trigger-auth + namespace: solace +spec: + secretTargetRef: + - parameter: username + name: kedalab-solace-secret + key: SEMP_USER + - parameter: password + name: kedalab-solace-secret + key: SEMP_PASSWORD +` + +const solaceKedaSecretYaml = ` +apiVersion: v1 +kind: Secret +metadata: + name: kedalab-solace-secret + namespace: solace + labels: + app: solace-consumer +type: Opaque +data: + SEMP_USER: YWRtaW4= + SEMP_PASSWORD: S2VkYUxhYkFkbWluUHdkMQ== +` + +const solaceConsumerYaml = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: solace-consumer + namespace: solace +spec: + selector: + matchLabels: + app: web + template: + metadata: + labels: + app: web + name: docker-test-pod + spec: + containers: + - name: solace-jms-consumer + image: ghcr.io/solacelabs/kedalab-consumer:latest + env: + - name: SOLACE_CLIENT_HOST + value: tcp://kedalab-pubsubplus-dev:55555 + - name: SOLACE_CLIENT_MSGVPN + value: keda_vpn + - name: SOLACE_CLIENT_USERNAME + value: consumer_user + - name: SOLACE_CLIENT_PASSWORD + value: consumer_pwd + - name: SOLACE_CLIENT_QUEUENAME + value: SCALED_CONSUMER_QUEUE1 + - name: SOLACE_CLIENT_CONSUMER_DELAY + value: '1000' + imagePullPolicy: Always + restartPolicy: Always +` + +const solaceTestHelperYaml = ` +apiVersion: v1 +kind: Pod +metadata: + name: kedalab-helper + namespace: solace +spec: + containers: + - name: sdk-perf + image: ghcr.io/solacelabs/kedalab-helper:latest + # Just spin & wait forever + command: [ "/bin/bash", "-c", "--" ] + args: [ "while true; do sleep 10; done;" ] +` diff --git a/tests/scalers/solace.tests.ts b/tests/scalers/solace.tests.ts new file mode 100644 index 00000000000..4c7cd28d62c --- /dev/null +++ b/tests/scalers/solace.tests.ts @@ -0,0 +1,143 @@ +import * as sh from 'shelljs' +import test from 'ava' +import { SolaceHelper } from './solace-helpers' + +const testNamespace = 'solace' +const queueName = 'test' + +test.before(t => { + sh.config.silent = true + SolaceHelper.getUpdateSolaceHelmChart(t) + SolaceHelper.installSolaceBroker(t, testNamespace) + SolaceHelper.installSolaceTestHelper(t, testNamespace) + SolaceHelper.configSolacePubSubBroker(t, testNamespace) + SolaceHelper.installSolaceConsumer(t) +}); + +test.serial('#1 Consumer Deployment should have 1 replicas on start', t => { + let replicas = sh.exec(`kubectl get deployment.apps/solace-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`).stdout + + t.log('replica count: ' + replicas); + t.is(replicas, '1', 'replica count should start out as 1') +}) + +test.serial('#2 Create Scaled Object; Consumer Deployment replicas scale to zero', t => { + // deploy scaler and auth objects + SolaceHelper.installSolaceKedaSecret(t) + SolaceHelper.installSolaceKedaTriggerAuth(t) + SolaceHelper.installSolaceKedaScaledObject(t) + + let replicas = '1' + let success = false + for (let i = 0; i <= 20 && replicas !== '10'; i++) { + replicas = sh.exec(`kubectl get deployment.apps/solace-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.log('pod replicas (of 0 expected): ' + replicas) + if (replicas !== '0') { + sh.exec('sleep 3s') + } else { + t.log('scale to zero goal met') + success = true + break + } + } + + t.is('0', replicas, 'replica count should be 0 after 60 seconds') + if (success) { + sh.exec('sleep 5s') + } + }) + +test.serial('#3 Publish 400 messages to Consumer Queue; Scale Replicas to 10 for message count', t => { + // publish messages to queue -- 400 msgs at 50 msgs/sec + SolaceHelper.publishMessages(t, testNamespace, '50', '400', '256') + + // with messages published, the consumer deployment should start receiving the messages + let replicas = '0' + for (let i = 0; i < 30 && replicas !== '10'; i++) { + replicas = sh.exec(`kubectl get deployment.apps/solace-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.log('pod replicas (of 10 expected): ' + replicas) + if (replicas !== '10') { + sh.exec('sleep 2s') + } else { + t.log('max pod replica count goal met - msg count') + break + } + } + + t.is('10', replicas, 'replica count should be 10 after 60 seconds - msg count') +}) + +test.serial('#4 Consumer Deployment scales to zero replicas after all messages read', t => { + + let replicas = '10' + let success = false + + // Replicas should decrease as messages are consumed + for (let i = 0; i < 60 && replicas !== '0'; i++) { + replicas = sh.exec(`kubectl get deployment.apps/solace-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.log('pod replicas (of 0 expected): ' + replicas) + if (replicas !== '0') { + sh.exec('sleep 5s') + } else { + t.log('min pod replica count goal met (scale to zero)') + success = true + break + } + } + + t.is('0', replicas, 'replica count should be 0 after 5 minutes') + if (success) { + sh.exec('sleep 5s') + } +}) + +test.serial('#5 Publish 50 LARGE messages to Consumer Queue; Scale Replicas to 10 for spool usage', t => { + // publish messages to queue -- 400 msgs at 50 msgs/sec + SolaceHelper.publishMessages(t, testNamespace, '10', '50', '4194304') + + // with messages published, the consumer deployment should start receiving the messages + let replicas = '0' + for (let i = 0; i < 30 && replicas !== '10'; i++) { + replicas = sh.exec(`kubectl get deployment.apps/solace-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.log('pod replicas (of 10 expected): ' + replicas) + if (replicas !== '10') { + sh.exec('sleep 2s') + } else { + t.log('max pod replica count goal met - spool size') + break + } + } + + t.is('10', replicas, 'replica count should be 10 after 60 seconds - spool size') +}) + +test.serial('#6 Consumer Deployment scales to zero replicas after all messages read', t => { + + let replicas = '10' + let success = false + + // Replicas should decrease as messages are consumed + for (let i = 0; i < 60 && replicas !== '0'; i++) { + replicas = sh.exec(`kubectl get deployment.apps/solace-consumer --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.log('pod replicas (of 0 expected): ' + replicas) + if (replicas !== '0') { + sh.exec('sleep 5s') + } else { + t.log('min pod replica count goal met (scale to zero)') + success = true + break + } + } + + t.is('0', replicas, 'Replica count should be 0 after 5 minutes') + if (success) { + sh.exec('sleep 5s') + } +}) + +test.after.always.cb('clean up the cluster', t => { + SolaceHelper.uninstallSolaceKedaObjects(t) + SolaceHelper.uninstallSolaceTestPods(t) + SolaceHelper.uninstallSolace(t, testNamespace) + t.end() +})