Skip to content

Commit

Permalink
For scaling deployments in particular, use a direct object get rather…
Browse files Browse the repository at this point in the history
… than the /scale API (#1458)

* For scaling deployments in particular, use a direct object get rather than the /scale API so that it can use the informer cache for better performance.

This is annoying as a special case but is so common and improves performance so much that I think it's worthwhile to include. Another option would be to majorly increase the QPS rate limit on the scaling API client however that would also increase kube-apiserver load while watches/informers are generally much less impactful.

Signed-off-by: Noah Kantrowitz <noah@coderanger.net>

* Update changelog.

Signed-off-by: Noah Kantrowitz <noah@coderanger.net>

* Use the already-normalized GVKR data so less weird string parsing.

Also adds support for StatefulSets for symmetry.

Signed-off-by: Noah Kantrowitz <noah@coderanger.net>

* Apply suggestions from code review

Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com>

Signed-off-by: Noah Kantrowitz <noah@coderanger.net>

* Apply suggestions from code review

Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com>

Signed-off-by: Noah Kantrowitz <noah@coderanger.net>
  • Loading branch information
coderanger authored Jan 6, 2021
1 parent c6c2cd4 commit 68a479c
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- Add support for the WATCH_NAMESPACE environment variable to the operator ([#1474](https://github.com/kedacore/keda/pull/1474))
- Automatically determine the RabbitMQ protocol when possible, and support setting the protocl via TriggerAuthentication ([#1459](https://github.com/kedacore/keda/pulls/1459),[#1483](https://github.com/kedacore/keda/pull/1483))
- Improve performance when fetching pod information ([#1457](https://github.com/kedacore/keda/pull/1457))
- Improve performance when fetching current scaling information on Deployments ([#1458](https://github.com/kedacore/keda/pull/1458))

### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func init() {
// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// create Discovery clientset
// TODO If we need to increase the QPS of scaling API calls, copy and tweak this RESTConfig.
clientset, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
r.Log.Error(err, "Not able to create Discovery clientset")
Expand Down
78 changes: 59 additions & 19 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"time"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
)
Expand All @@ -16,18 +18,46 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
"scaledObject.Namespace", scaledObject.Namespace,
"scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name)

currentScale, err := e.getScaleTargetScale(ctx, scaledObject)
if err != nil {
logger.Error(err, "Error getting information on the current Scale (ie. replias count) on the scaleTarget")
return
// Get the current replica count. As a special case, Deployments and StatefulSets fetch directly from the object so they can use the informer cache
// to reduce API calls. Everything else uses the scale subresource.
var currentScale *autoscalingv1.Scale
var currentReplicas int32
targetName := scaledObject.Spec.ScaleTargetRef.Name
targetGVKR := scaledObject.Status.ScaleTargetGVKR
switch {
case targetGVKR.Group == "apps" && targetGVKR.Kind == "Deployment":
deployment := &appsv1.Deployment{}
err := e.client.Get(ctx, client.ObjectKey{Name: targetName, Namespace: scaledObject.Namespace}, deployment)
if err != nil {
logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget")
return
}
currentReplicas = *deployment.Spec.Replicas
case targetGVKR.Group == "apps" && targetGVKR.Kind == "StatefulSet":
statefulSet := &appsv1.StatefulSet{}
err := e.client.Get(ctx, client.ObjectKey{Name: targetName, Namespace: scaledObject.Namespace}, statefulSet)
if err != nil {
logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget")
return
}
currentReplicas = *statefulSet.Spec.Replicas
default:
var err error
currentScale, err = e.getScaleTargetScale(ctx, scaledObject)
if err != nil {
logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget")
return
}
currentReplicas = currentScale.Spec.Replicas
}

switch {
case currentScale.Spec.Replicas == 0 && isActive:
case currentReplicas == 0 && isActive:
// current replica count is 0, but there is an active trigger.
// scale the ScaleTarget up
e.scaleFromZero(ctx, logger, scaledObject, currentScale)
case !isActive &&
currentScale.Spec.Replicas > 0 &&
currentReplicas > 0 &&
(scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0):
// there are no active triggers, but the ScaleTarget has replicas.
// AND
Expand All @@ -37,14 +67,12 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al
e.scaleToZero(ctx, logger, scaledObject, currentScale)
case !isActive &&
scaledObject.Spec.MinReplicaCount != nil &&
currentScale.Spec.Replicas < *scaledObject.Spec.MinReplicaCount:
currentReplicas < *scaledObject.Spec.MinReplicaCount:
// there are no active triggers
// AND
// ScaleTarget replicas count is less than minimum replica count specified in ScaledObject
// Let's set ScaleTarget replicas count to correct value
currentScale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount

err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale)
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *scaledObject.Spec.MinReplicaCount)
if err == nil {
logger.Info("Successfully set ScaleTarget replicas count to ScaledObject minReplicaCount",
"ScaleTarget.Replicas", currentScale.Spec.Replicas)
Expand Down Expand Up @@ -93,8 +121,7 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca
if scaledObject.Status.LastActiveTime == nil ||
scaledObject.Status.LastActiveTime.Add(cooldownPeriod).Before(time.Now()) {
// or last time a trigger was active was > cooldown period, so scale down.
scale.Spec.Replicas = 0
err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale)
_, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0)
if err == nil {
logger.Info("Successfully scaled ScaleTarget to 0 replicas")
if err := e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
Expand All @@ -118,19 +145,19 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca
}

func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) {
currentReplicas := scale.Spec.Replicas
var replicas int32
if scaledObject.Spec.MinReplicaCount != nil && *scaledObject.Spec.MinReplicaCount > 0 {
scale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount
replicas = *scaledObject.Spec.MinReplicaCount
} else {
scale.Spec.Replicas = 1
replicas = 1
}

err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale)
currentReplicas, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, replicas)

if err == nil {
logger.Info("Successfully updated ScaleTarget",
"Original Replicas Count", currentReplicas,
"New Replicas Count", scale.Spec.Replicas)
"New Replicas Count", replicas)

// Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject
if err := e.updateLastActiveTime(ctx, logger, scaledObject); err != nil {
Expand All @@ -144,7 +171,20 @@ func (e *scaleExecutor) getScaleTargetScale(ctx context.Context, scaledObject *k
return (*e.scaleClient).Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
}

func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) error {
func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale, replicas int32) (int32, error) {
if scale == nil {
// Wasn't retrieved earlier, grab it now.
var err error
scale, err = e.getScaleTargetScale(ctx, scaledObject)
if err != nil {
return -1, err
}
}

// Update with requested repliacs.
currentReplicas := scale.Spec.Replicas
scale.Spec.Replicas = replicas

_, err := (*e.scaleClient).Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{})
return err
return currentReplicas, err
}

0 comments on commit 68a479c

Please sign in to comment.