Skip to content

Commit

Permalink
feat: Introduce activationThreshold/minMetricValue for ActiveMQ Scaler (
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Turrado Ferrero authored Jul 11, 2022
1 parent 9e1d6ac commit 6318652
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 188 deletions.
38 changes: 25 additions & 13 deletions pkg/scalers/activemq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ type activeMQScaler struct {
}

type activeMQMetadata struct {
managementEndpoint string
destinationName string
brokerName string
username string
password string
restAPITemplate string
targetQueueSize int64
corsHeader string
metricName string
scalerIndex int
managementEndpoint string
destinationName string
brokerName string
username string
password string
restAPITemplate string
targetQueueSize int64
activationTargetQueueSize int64
corsHeader string
metricName string
scalerIndex int
}

type activeMQMonitoring struct {
Expand All @@ -46,8 +47,9 @@ type activeMQMonitoring struct {
}

const (
defaultTargetQueueSize = 10
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
defaultTargetQueueSize = 10
defaultActivationTargetQueueSize = 0
defaultActiveMQRestAPITemplate = "http://{{.ManagementEndpoint}}/api/jolokia/read/org.apache.activemq:type=Broker,brokerName={{.BrokerName}},destinationType=Queue,destinationName={{.DestinationName}}/QueueSize"
)

var activeMQLog = logf.Log.WithName("activeMQ_scaler")
Expand Down Expand Up @@ -110,6 +112,16 @@ func parseActiveMQMetadata(config *ScalerConfig) (*activeMQMetadata, error) {
meta.targetQueueSize = defaultTargetQueueSize
}

if val, ok := config.TriggerMetadata["activationTargetQueueSize"]; ok {
activationTargetQueueSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid activationTargetQueueSize - must be an integer")
}
meta.activationTargetQueueSize = activationTargetQueueSize
} else {
meta.activationTargetQueueSize = defaultActivationTargetQueueSize
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["username"]; ok && val != "" {
Expand Down Expand Up @@ -162,7 +174,7 @@ func (s *activeMQScaler) IsActive(ctx context.Context) (bool, error) {
return false, err
}

return queueSize > 0, nil
return queueSize > s.metadata.activationTargetQueueSize, nil
}

// getRestAPIParameters parse restAPITemplate to provide managementEndpoint, brokerName, destinationName
Expand Down
27 changes: 22 additions & 5 deletions pkg/scalers/activemq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type activeMQMetricIdentifier struct {
// Setting metric identifier mock name
var activeMQMetricIdentifiers = []activeMQMetricIdentifier{
{&testActiveMQMetadata[1], 0, "s0-activemq-testQueue"},
{&testActiveMQMetadata[9], 1, "s1-activemq-testQueue"},
{&testActiveMQMetadata[10], 1, "s1-activemq-testQueue"},
}

var testActiveMQMetadata = []parseActiveMQMetadataTestData{
Expand All @@ -40,10 +40,11 @@ var testActiveMQMetadata = []parseActiveMQMetadataTestData{
{
name: "properly formed metadata",
metadata: map[string]string{
"managementEndpoint": "localhost:8161",
"destinationName": "testQueue",
"brokerName": "localhost",
"targetQueueSize": "10",
"managementEndpoint": "localhost:8161",
"destinationName": "testQueue",
"brokerName": "localhost",
"targetQueueSize": "10",
"activationTargetQueueSize": "0",
},
authParams: map[string]string{
"username": "testUsername",
Expand Down Expand Up @@ -80,6 +81,22 @@ var testActiveMQMetadata = []parseActiveMQMetadataTestData{
},
isError: true,
},
{
name: "Invalid activatingTargetQueueSize using a string",
metadata: map[string]string{
"managementEndpoint": "localhost:8161",
"destinationName": "testQueue",
"brokerName": "localhost",
"targetQueueSize": "10",
"activationTargetQueueSize": "AA",
"metricName": "testMetricName",
},
authParams: map[string]string{
"username": "testUsername",
"password": "pass123",
},
isError: true,
},
{
name: "missing management endpoint should fail",
metadata: map[string]string{
Expand Down
18 changes: 18 additions & 0 deletions tests/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,24 @@ func WaitForDeploymentReplicaCountChange(t *testing.T, kc *kubernetes.Clientset,
return int(replicas)
}

// Waits some time to ensure that the replica count doesn't change.
func AssertReplicaCountNotChangeDuringTimePeriod(t *testing.T, kc *kubernetes.Clientset, name, namespace string, target, intervalSeconds int) {
t.Log("Waiting for some time to ensure deployment replica count doesn't change")
var replicas int32

for i := 0; i < intervalSeconds; i++ {
deployment, _ := kc.AppsV1().Deployments(namespace).Get(context.Background(), name, metav1.GetOptions{})
replicas = deployment.Status.Replicas

if replicas != int32(target) {
assert.Fail(t, fmt.Sprintf("%s replica count has changed from %d to %d", name, target, replicas))
return
}

time.Sleep(time.Second)
}
}

func WaitForHpaCreation(t *testing.T, kc *kubernetes.Clientset, name, namespace string,
iterations, intervalSeconds int) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
Expand Down
4 changes: 2 additions & 2 deletions tests/run-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
set -u

# TODO - Remove TypeScript regex after all tests have been migrated to Go.
E2E_REGEX_GO="./${E2E_TEST_REGEX:-*_test.go}"
E2E_REGEX_TS="./${E2E_TEST_REGEX:-*.test.ts}"
E2E_REGEX_GO="./*${E2E_TEST_REGEX:-*_test.go}"
E2E_REGEX_TS="./*${E2E_TEST_REGEX:-*.test.ts}"

DIR=$(dirname "$0")
cd $DIR
Expand Down
2 changes: 1 addition & 1 deletion tests/run-arm-smoke-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cd $DIR
# TODO - Replace with Go tests.
test_files=(
"scalers_go/kubernetes_workload/kubernetes_workload_test.go"
"scalers/activemq.test.ts"
"scalers_go/activemq/activemq_test.go"
"scalers_go/cron/cron_test.go"
)

Expand Down
Loading

0 comments on commit 6318652

Please sign in to comment.