From 68a479c981ecb18f460cf0fb994aa721330037c6 Mon Sep 17 00:00:00 2001 From: Noah Kantrowitz Date: Wed, 6 Jan 2021 10:02:50 -0800 Subject: [PATCH] For scaling deployments in particular, use a direct object get rather than the /scale API (#1458) * For scaling deployments in particular, use a direct object get rather than the /scale API so that it can use the informer cache for better performance. This is annoying as a special case but is so common and improves performance so much that I think it's worthwhile to include. Another option would be to majorly increase the QPS rate limit on the scaling API client however that would also increase kube-apiserver load while watches/informers are generally much less impactful. Signed-off-by: Noah Kantrowitz * Update changelog. Signed-off-by: Noah Kantrowitz * Use the already-normalized GVKR data so less weird string parsing. Also adds support for StatefulSets for symmetry. Signed-off-by: Noah Kantrowitz * Apply suggestions from code review Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> Signed-off-by: Noah Kantrowitz * Apply suggestions from code review Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> Signed-off-by: Noah Kantrowitz --- CHANGELOG.md | 1 + controllers/scaledobject_controller.go | 1 + pkg/scaling/executor/scale_scaledobjects.go | 78 ++++++++++++++++----- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b1a2b6aa5e8..44bdced1153 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ - Add support for the WATCH_NAMESPACE environment variable to the operator ([#1474](https://github.com/kedacore/keda/pull/1474)) - Automatically determine the RabbitMQ protocol when possible, and support setting the protocl via TriggerAuthentication ([#1459](https://github.com/kedacore/keda/pulls/1459),[#1483](https://github.com/kedacore/keda/pull/1483)) - Improve performance when fetching pod information ([#1457](https://github.com/kedacore/keda/pull/1457)) +- Improve performance when fetching current scaling information on Deployments ([#1458](https://github.com/kedacore/keda/pull/1458)) ### Breaking Changes diff --git a/controllers/scaledobject_controller.go b/controllers/scaledobject_controller.go index 0073c1b019f..1d18c42911f 100644 --- a/controllers/scaledobject_controller.go +++ b/controllers/scaledobject_controller.go @@ -69,6 +69,7 @@ func init() { // SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance. func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error { // create Discovery clientset + // TODO If we need to increase the QPS of scaling API calls, copy and tweak this RESTConfig. clientset, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig()) if err != nil { r.Log.Error(err, "Not able to create Discovery clientset") diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 8e2f558430a..6e4a5c1cc4b 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -5,8 +5,10 @@ import ( "time" "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" ) @@ -16,18 +18,46 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al "scaledObject.Namespace", scaledObject.Namespace, "scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name) - currentScale, err := e.getScaleTargetScale(ctx, scaledObject) - if err != nil { - logger.Error(err, "Error getting information on the current Scale (ie. replias count) on the scaleTarget") - return + // Get the current replica count. As a special case, Deployments and StatefulSets fetch directly from the object so they can use the informer cache + // to reduce API calls. Everything else uses the scale subresource. + var currentScale *autoscalingv1.Scale + var currentReplicas int32 + targetName := scaledObject.Spec.ScaleTargetRef.Name + targetGVKR := scaledObject.Status.ScaleTargetGVKR + switch { + case targetGVKR.Group == "apps" && targetGVKR.Kind == "Deployment": + deployment := &appsv1.Deployment{} + err := e.client.Get(ctx, client.ObjectKey{Name: targetName, Namespace: scaledObject.Namespace}, deployment) + if err != nil { + logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget") + return + } + currentReplicas = *deployment.Spec.Replicas + case targetGVKR.Group == "apps" && targetGVKR.Kind == "StatefulSet": + statefulSet := &appsv1.StatefulSet{} + err := e.client.Get(ctx, client.ObjectKey{Name: targetName, Namespace: scaledObject.Namespace}, statefulSet) + if err != nil { + logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget") + return + } + currentReplicas = *statefulSet.Spec.Replicas + default: + var err error + currentScale, err = e.getScaleTargetScale(ctx, scaledObject) + if err != nil { + logger.Error(err, "Error getting information on the current Scale (ie. replicas count) on the scaleTarget") + return + } + currentReplicas = currentScale.Spec.Replicas } + switch { - case currentScale.Spec.Replicas == 0 && isActive: + case currentReplicas == 0 && isActive: // current replica count is 0, but there is an active trigger. // scale the ScaleTarget up e.scaleFromZero(ctx, logger, scaledObject, currentScale) case !isActive && - currentScale.Spec.Replicas > 0 && + currentReplicas > 0 && (scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0): // there are no active triggers, but the ScaleTarget has replicas. // AND @@ -37,14 +67,12 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al e.scaleToZero(ctx, logger, scaledObject, currentScale) case !isActive && scaledObject.Spec.MinReplicaCount != nil && - currentScale.Spec.Replicas < *scaledObject.Spec.MinReplicaCount: + currentReplicas < *scaledObject.Spec.MinReplicaCount: // there are no active triggers // AND // ScaleTarget replicas count is less than minimum replica count specified in ScaledObject // Let's set ScaleTarget replicas count to correct value - currentScale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount - - err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale) + _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, *scaledObject.Spec.MinReplicaCount) if err == nil { logger.Info("Successfully set ScaleTarget replicas count to ScaledObject minReplicaCount", "ScaleTarget.Replicas", currentScale.Spec.Replicas) @@ -93,8 +121,7 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca if scaledObject.Status.LastActiveTime == nil || scaledObject.Status.LastActiveTime.Add(cooldownPeriod).Before(time.Now()) { // or last time a trigger was active was > cooldown period, so scale down. - scale.Spec.Replicas = 0 - err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale) + _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, 0) if err == nil { logger.Info("Successfully scaled ScaleTarget to 0 replicas") if err := e.setActiveCondition(ctx, logger, scaledObject, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil { @@ -118,19 +145,19 @@ func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, sca } func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) { - currentReplicas := scale.Spec.Replicas + var replicas int32 if scaledObject.Spec.MinReplicaCount != nil && *scaledObject.Spec.MinReplicaCount > 0 { - scale.Spec.Replicas = *scaledObject.Spec.MinReplicaCount + replicas = *scaledObject.Spec.MinReplicaCount } else { - scale.Spec.Replicas = 1 + replicas = 1 } - err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale) + currentReplicas, err := e.updateScaleOnScaleTarget(ctx, scaledObject, scale, replicas) if err == nil { logger.Info("Successfully updated ScaleTarget", "Original Replicas Count", currentReplicas, - "New Replicas Count", scale.Spec.Replicas) + "New Replicas Count", replicas) // Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject if err := e.updateLastActiveTime(ctx, logger, scaledObject); err != nil { @@ -144,7 +171,20 @@ func (e *scaleExecutor) getScaleTargetScale(ctx context.Context, scaledObject *k return (*e.scaleClient).Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) } -func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) error { +func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale, replicas int32) (int32, error) { + if scale == nil { + // Wasn't retrieved earlier, grab it now. + var err error + scale, err = e.getScaleTargetScale(ctx, scaledObject) + if err != nil { + return -1, err + } + } + + // Update with requested repliacs. + currentReplicas := scale.Spec.Replicas + scale.Spec.Replicas = replicas + _, err := (*e.scaleClient).Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) - return err + return currentReplicas, err }