Skip to content

Commit

Permalink
exit early while scaling if the object is paused
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
  • Loading branch information
aryan9600 committed May 5, 2022
1 parent 3e44209 commit e9591df
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 15 deletions.
37 changes: 22 additions & 15 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,28 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
}

status := scaledObject.Status.DeepCopy()
if pausedCount != nil && *pausedCount != currentReplicas && status.PausedReplicaCount == nil {
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount)
if err != nil {
logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount)
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
if pausedCount != nil {
// Scale the target to the paused replica count
if *pausedCount != currentReplicas {
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *pausedCount)
if err != nil {
logger.Error(err, "error scaling target to paused replicas count", "paused replicas", *pausedCount)
if err := e.setReadyCondition(ctx, logger, scaledObject, metav1.ConditionUnknown,
kedav1alpha1.ScaledObjectConditionReadySucccesReason, kedav1alpha1.ScaledObjectConditionReadySuccessMessage); err != nil {
logger.Error(err, "error setting ready condition")
}
return
}
return
}
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
if *status.PausedReplicaCount != *pausedCount {
status.PausedReplicaCount = pausedCount
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "error updating status paused replica count")
return
}
}
logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount)
}
logger.Info("Successfully scaled target to paused replicas count", "paused replicas", *pausedCount)
return
}

Expand Down Expand Up @@ -355,6 +360,8 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool
return false, *scaledObject.Spec.MinReplicaCount
}

// GetPausedReplicaCount returns the paused replica count of the ScaledObject.
// If not paused, it returns nil.
func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) {
if scaledObject.Annotations != nil {
if val, ok := scaledObject.Annotations[kedacontrollerutil.PausedReplicasAnnotation]; ok {
Expand Down
19 changes: 19 additions & 0 deletions tests/scalers/azure-queue-pause.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,25 @@ test.serial(
}
)

test.serial.cb(
'deployment should remain at pausedreplicacount (0) even with messages on storage',
t => {
const queuesvc = azure.createqueueservice(connectionstring)
queuesvc.messageencoder = new azure.queuemessageencoder.textbase64queuemessageencoder()
async.maplimit(
array(1000).keys(),
20,
(n, cb) => queuesvc.createmessage(queuename, `test ${n}`, cb),
async () => {
// scaling is paused even with messages in storage.
t.true(await waitfordeploymentreplicacount(0, 'test-deployment', testnamespace, 60, 1000), 'replica count should remain 0 after 1 minute')
queuesvc.clearmessages(queuename, _ => {})
t.end()
}
)
}
)

test.serial(`Updsating ScaledObject (without annotation) should work`, async t => {
fs.writeFileSync(scaledObjectFile.name, scaledObjectYaml)
t.is(
Expand Down

0 comments on commit e9591df

Please sign in to comment.