Skip to content

Commit

Permalink
Escape queueName and vhostName in RabbitMQ Scaler before use them…
Browse files Browse the repository at this point in the history
… in query string (bug fix) (#2055)
  • Loading branch information
Jorge Turrado Ferrero authored Aug 26, 2021
1 parent 58160c0 commit 0e4a01f
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve validation in Cron scaler in case start & end input is same.([#2032](https://github.com/kedacore/keda/pull/2032))
- Improve the cron validation in Cron Scaler ([#2038](https://github.com/kedacore/keda/pull/2038))
- Add Bearer auth for Metrics API scaler ([#2028](https://github.com/kedacore/keda/pull/2028))
- Escape `queueName` and `vhostName` in RabbitMQ Scaler before use them in query string (bug fix) ([#2055](https://github.com/kedacore/keda/pull/2055))

### Breaking Changes

Expand Down
24 changes: 14 additions & 10 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type queueInfo struct {
Name string `json:"name"`
}

type regexQueueInfo struct {
Queues []queueInfo `json:"items"`
}

type messageStat struct {
PublishDetail publishDetail `json:"publish_details"`
}
Expand Down Expand Up @@ -200,12 +204,12 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {

// Resolve metricName
if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", val))
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", url.QueryEscape(val)))
} else {
if meta.mode == rabbitModeQueueLength {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", meta.queueName))
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq", url.QueryEscape(meta.queueName)))
} else {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", meta.queueName))
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("%s-%s", "rabbitmq-rate", url.QueryEscape(meta.queueName)))
}
}

Expand Down Expand Up @@ -340,12 +344,12 @@ func getJSON(s *rabbitMQScaler, url string) (queueInfo, error) {

if r.StatusCode == 200 {
if s.metadata.useRegex {
var results []queueInfo
err = json.NewDecoder(r.Body).Decode(&results)
var queues regexQueueInfo
err = json.NewDecoder(r.Body).Decode(&queues)
if err != nil {
return result, err
return queueInfo{}, err
}
result, err := getComposedQueue(s, results)
result, err := getComposedQueue(s, queues.Queues)
return result, err
}

Expand All @@ -368,7 +372,7 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {

// Override vhost if requested.
if s.metadata.vhostName != nil {
vhost = "/" + *s.metadata.vhostName
vhost = "/" + url.QueryEscape(*s.metadata.vhostName)
}

if vhost == "" || vhost == "/" || vhost == "//" {
Expand All @@ -378,9 +382,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?use_regex=true&pagination=false&name=", s.metadata.queueName)
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName))
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, s.metadata.queueName)
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName))
}

var info queueInfo
Expand Down
78 changes: 39 additions & 39 deletions pkg/scalers/rabbitmq_scaler_test.go

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions tests/scalers/rabbitmq-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ export class RabbitMQHelper {
)
}

static publishMessages(t, namespace: string, connectionString: string, messageCount: number) {
static publishMessages(t, namespace: string, connectionString: string, messageCount: number, queueName: string) {
// publish messages
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, publishYaml.replace('{{CONNECTION_STRING}}', connectionString)
.replace('{{MESSAGE_COUNT}}', messageCount.toString()))
.replace('{{MESSAGE_COUNT}}', messageCount.toString())
.replace('{{QUEUE_NAME}}', queueName)
.replace('{{QUEUE_NAME}}', queueName))

t.is(
0,
sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${namespace}`).code,
Expand All @@ -52,15 +55,15 @@ export class RabbitMQHelper {
const publishYaml = `apiVersion: batch/v1
kind: Job
metadata:
name: rabbitmq-publish
name: rabbitmq-publish-{{QUEUE_NAME}}
spec:
template:
spec:
containers:
- name: rabbitmq-client
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}"]
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}", "{{QUEUE_NAME}}"]
restartPolicy: Never`

const rabbitmqDeployYaml = `apiVersion: v1
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-amqp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
Expand Down Expand Up @@ -79,7 +79,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down
12 changes: 8 additions & 4 deletions tests/scalers/rabbitmq-queue-http-regex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {waitForDeploymentReplicaCount} from "./helpers";
const testNamespace = 'rabbitmq-queue-http-regex-test'
const rabbitmqNamespace = 'rabbitmq-http-regex-test'
const queueName = 'hello'
const dummyQueueName1 = 'hello-1'
const dummyQueueName2 = 'hellohellohello'
const username = "test-user"
const password = "test-password"
const vhost = "test-vh-regex"
Expand All @@ -20,7 +22,7 @@ test.before(t => {

sh.config.silent = true
// create deployment
const httpConnectionString = `http://${username}:${password}@rabbitmq.${rabbitmqNamespace}.svc.cluster.local/${vhost}`
const httpConnectionString = `http://${username}:${password}@rabbitmq.${rabbitmqNamespace}.svc.cluster.local`

RabbitMQHelper.createDeployment(t, testNamespace, deployYaml, connectionString, httpConnectionString, queueName)
})
Expand All @@ -33,7 +35,9 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, dummyQueueName1)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, dummyQueueName2)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
Expand Down Expand Up @@ -81,7 +85,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand All @@ -105,7 +109,7 @@ spec:
triggers:
- type: rabbitmq
metadata:
queueName: {{QUEUE_NAME}}
queueName: "^hell.{1}$"
hostFromEnv: RabbitApiHost
protocol: http
useRegex: 'true'
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-http.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ test.serial('Deployment should have 0 replicas on start', async t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 30, 5000))
Expand Down Expand Up @@ -78,7 +78,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down
4 changes: 2 additions & 2 deletions tests/scalers/rabbitmq-queue-trigger-auth.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test.serial('Deployment should have 0 replicas on start', t => {
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
let replicaCount = '0'
Expand Down Expand Up @@ -102,7 +102,7 @@ spec:
spec:
containers:
- name: rabbitmq-consumer
image: jeffhollan/rabbitmq-client:dev
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
Expand Down

0 comments on commit 0e4a01f

Please sign in to comment.