Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor IBM MQ scaler and remove and deprecate variables #6034

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ Here is an overview of all new **experimental** features:

### Deprecations

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

You can find all deprecations in [this overview](https://github.com/kedacore/keda/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3Abreaking-change) and [join the discussion here](https://github.com/kedacore/keda/discussions/categories/deprecations).

New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- IBM MQ Scaler: Remove and deprecate unused variables in IBM MQ scaler ([#6033](https://github.com/kedacore/keda/issues/6033))

### Breaking Changes

Expand Down
217 changes: 70 additions & 147 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -20,39 +18,28 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// Default variables and settings
const (
defaultTargetQueueDepth = 20
defaultTLSDisabled = false
)

// IBMMQScaler assigns struct data pointer to metadata variable
type IBMMQScaler struct {
metricType v2.MetricTargetType
metadata *IBMMQMetadata
defaultHTTPTimeout time.Duration
httpClient *http.Client
logger logr.Logger
type ibmmqScaler struct {
metricType v2.MetricTargetType
metadata ibmmqMetadata
httpClient *http.Client
logger logr.Logger
}

// IBMMQMetadata Metadata used by KEDA to query IBM MQ queue depth and scale
type IBMMQMetadata struct {
host string
queueManager string
queueName string
username string
password string
queueDepth int64
activationQueueDepth int64
tlsDisabled bool
triggerIndex int

// TLS
ca string
cert string
key string
keyPassword string
unsafeSsl bool
type ibmmqMetadata struct {
Host string `keda:"name=host, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`

triggerIndex int
}

// CommandResponse Full structured response from MQ admin REST query
Expand All @@ -71,142 +58,79 @@ type Parameters struct {
Curdepth int `json:"curdepth"`
}

// NewIBMMQScaler creates a new IBM MQ scaler
func (m *ibmmqMetadata) Validate() error {
_, err := url.ParseRequestURI(m.Host)
if err != nil {
return fmt.Errorf("invalid URL: %w", err)
}

if (m.Cert == "") != (m.Key == "") {
return fmt.Errorf("both cert and key must be provided when using TLS")
}

// TODO: DEPRECATED to be removed in v2.18
if m.TLS && m.UnsafeSsl {
return fmt.Errorf("'tls' and 'unsafeSsl' are both specified. Please use only 'unsafeSsl'")
}

return nil
}

func NewIBMMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "ibm_mq_scaler")

meta, err := parseIBMMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing IBM MQ metadata: %w", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.tlsDisabled)
// TODO: DEPRECATED to be removed in v2.18
if meta.TLS {
logger.Info("The 'tls' setting is DEPRECATED and will be removed in v2.18 - Use 'unsafeSsl' instead")
meta.UnsafeSsl = meta.TLS
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSsl)

// Configure TLS if cert and key are specified
if meta.cert != "" && meta.key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl)
if meta.Cert != "" && meta.Key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.Cert, meta.Key, meta.KeyPassword, meta.CA, meta.UnsafeSsl)
if err != nil {
return nil, err
}
httpClient.Transport = kedautil.CreateHTTPTransportWithTLSConfig(tlsConfig)
}

return &IBMMQScaler{
metricType: metricType,
metadata: meta,
defaultHTTPTimeout: config.GlobalHTTPTimeout,
httpClient: httpClient,
logger: InitializeLogger(config, "ibm_mq_scaler"),
return &ibmmqScaler{
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: logger,
}, nil
}

// Close closes and returns nil
func (s *IBMMQScaler) Close(context.Context) error {
func (s *ibmmqScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

// parseIBMMQMetadata checks the existence of and validates the MQ connection data provided
func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (*IBMMQMetadata, error) {
meta := IBMMQMetadata{}

if val, ok := config.TriggerMetadata["host"]; ok {
_, err := url.ParseRequestURI(val)
if err != nil {
return nil, fmt.Errorf("invalid URL: %w", err)
}
meta.host = val
} else {
return nil, fmt.Errorf("no host URI given")
}

if val, ok := config.TriggerMetadata["queueManager"]; ok {
meta.queueManager = val
} else {
return nil, fmt.Errorf("no queue manager given")
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, error) {
meta := ibmmqMetadata{triggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, err
}

if val, ok := config.TriggerMetadata["queueDepth"]; ok && val != "" {
queueDepth, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid queueDepth - must be an integer")
}
meta.queueDepth = queueDepth
} else {
fmt.Println("No target depth defined - setting default")
meta.queueDepth = defaultTargetQueueDepth
}

meta.activationQueueDepth = 0
if val, ok := config.TriggerMetadata["activationQueueDepth"]; ok && val != "" {
activationQueueDepth, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid activationQueueDepth - must be an integer")
}
meta.activationQueueDepth = activationQueueDepth
}

if val, ok := config.TriggerMetadata["tls"]; ok {
tlsDisabled, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("invalid tls setting: %w", err)
}
meta.tlsDisabled = tlsDisabled
} else {
fmt.Println("No tls setting defined - setting default")
meta.tlsDisabled = defaultTLSDisabled
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["usernameFromEnv"]; ok && val != "" {
meta.username = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no username given")
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["passwordFromEnv"]; ok && val != "" {
meta.password = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no password given")
}

// TLS config (optional)
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.keyPassword = config.AuthParams["keyPassword"]

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
boolVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("failed to parse unsafeSsl value. Must be either true or false")
}
meta.unsafeSsl = boolVal
}

meta.triggerIndex = config.TriggerIndex
return &meta, nil
return meta, nil
}

// getQueueDepthViaHTTP returns the depth of the MQ Queue from the Admin endpoint
func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.queueName
url := s.metadata.host
func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.QueueName
url := s.metadata.Host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
Expand All @@ -216,7 +140,7 @@ func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")

req.SetBasicAuth(s.metadata.username, s.metadata.password)
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

resp, err := s.httpClient.Do(req)
if err != nil {
Expand Down Expand Up @@ -251,26 +175,25 @@ func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
return int64(response.CommandResponse[0].Parameters.Curdepth), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *IBMMQScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.queueDepth),
Target: GetMetricTarget(s.metricType, s.metadata.QueueDepth),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *IBMMQScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
func (s *ibmmqScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueDepth, err := s.getQueueDepthViaHTTP(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error inspecting IBM MQ queue depth: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(queueDepth))

return []external_metrics.ExternalMetricValue{metric}, queueDepth > s.metadata.activationQueueDepth, nil
return []external_metrics.ExternalMetricValue{metric}, queueDepth > s.metadata.ActivationQueueDepth, nil
}
Loading
Loading