Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid scaling triggered by events/messages when ScaledObject is paused #3011

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

- **General**: Fix CVE-2022-21221 in `github.com/valyala/fasthttp` ([#2775](https://github.com/kedacore/keda/issue/2775))
- **General**: Bump Golang to 1.17.9 ([#3016](https://github.com/kedacore/keda/issues/3016))
- **General**: Fix autoscaling behaviour while paused. ([#3009](https://github.com/kedacore/keda/issues/3009))

## v2.7.0

Expand Down
35 changes: 20 additions & 15 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,26 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
}

status := scaledObject.Status.DeepCopy()
if pausedCount != nil && *pausedCount != currentReplicas && status.PausedReplicaCount == nil {
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount)
if err != nil {
logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount)
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
if pausedCount != nil {
// Scale the target to the paused replica count
if *pausedCount != currentReplicas {
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount)
if err != nil {
logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount)
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
return
}
return
}
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
}
logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount)
}
logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount)
return
}

Expand Down Expand Up @@ -355,6 +358,8 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool
return false, *scaledObject.Spec.MinReplicaCount
}

// GetPausedReplicaCount returns the paused replica count of the ScaledObject.
// If not paused, it returns nil.
func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) {
if scaledObject.Annotations != nil {
if val, ok := scaledObject.Annotations[kedacontrollerutil.PausedReplicasAnnotation]; ok {
Expand Down
38 changes: 37 additions & 1 deletion tests/scalers/azure-queue-pause.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as azure from 'azure-storage'
import * as sh from 'shelljs'
import * as tmp from 'tmp'
import test from 'ava'
import {createNamespace, waitForDeploymentReplicaCount} from "./helpers";
import {createNamespace, waitForDeploymentReplicaCount, sleep} from "./helpers";

const testNamespace = 'pause-test'
const deploymentFile = tmp.fileSync()
Expand Down Expand Up @@ -60,6 +60,24 @@ test.serial(
}
)

test.serial.cb(
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
'Deployment should remain at pausedReplicaCount (0) even with messages on storage',
t => {
const queueSvc = azure.createQueueService(connectionString)
queueSvc.messageEncoder = new azure.QueueMessageEncoder.TextBase64QueueMessageEncoder()
async.mapLimit(
Array(1000).keys(),
20,
(n, cb) => queueSvc.createMessage(queueName, `test ${n}`, cb),
async () => {
t.true(await checkIfReplicaCountGreater(0, 'test-deployment', testNamespace, 60, 1000), 'replica count remain 0 after 1 minute')
queueSvc.clearMessages(queueName, _ => {})
t.end()
}
)
}
)

test.serial(`Updsating ScaledObject (without annotation) should work`, async t => {
fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml)
t.is(
Expand Down Expand Up @@ -118,6 +136,24 @@ test.after.always.cb('clean up workload test related deployments', t => {
t.end()
})


// checks if the current replica count is greater than the given target count for a given interval.
// returns false if it is greater, otherwise true.
async function checkIfReplicaCountGreater(target: number, name: string, namespace: string, iterations = 10, interval = 3000): Promise<boolean> {
for (let i = 0; i < iterations; i++) {
let replicaCountStr = sh.exec(`kubectl get deployment.apps/${name} --namespace ${namespace} -o jsonpath="{.spec.replicas}"`).stdout
try {
const replicaCount = parseInt(replicaCountStr, 10)
if (replicaCount > target) {
return false
}
} catch { }

await sleep(interval)
}
return true
}

const deployYaml = `apiVersion: v1
kind: Secret
metadata:
Expand Down