Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reconcile loop #581

Merged
merged 1 commit into from
Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 96 additions & 34 deletions pkg/controller/scaledobject/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scaledobject
import (
"context"
"fmt"
"reflect"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -44,7 +43,7 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileScaledObject{client: mgr.GetClient(), scheme: mgr.GetScheme(), scaleLoopContexts: &sync.Map{}}
return &ReconcileScaledObject{client: mgr.GetClient(), scheme: mgr.GetScheme(), scaleLoopContexts: &sync.Map{}, scaledObjectsGenerations: &sync.Map{}}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand All @@ -68,6 +67,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
if err != nil {
return err
}

// Watch for changes to secondary resource HPA and requeue the owner ScaledObject
err = c.Watch(&source.Kind{Type: &autoscalingv2beta1.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kedav1alpha1.ScaledObject{},
})
if err != nil {
return err
}
return nil
}

Expand All @@ -78,9 +86,10 @@ var _ reconcile.Reconciler = &ReconcileScaledObject{}
type ReconcileScaledObject struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
scaleLoopContexts *sync.Map
client client.Client
scheme *runtime.Scheme
scaleLoopContexts *sync.Map
scaledObjectsGenerations *sync.Map
}

// Reconcile reads that state of the cluster for a ScaledObject object and makes changes based on the state read
Expand Down Expand Up @@ -137,7 +146,7 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile.
}
}

reqLogger.Info("Detecting ScaleType from ScaledObject")
reqLogger.V(1).Info("Detecting ScaleType from ScaledObject")
if scaledObject.Spec.ScaleTargetRef == nil || scaledObject.Spec.ScaleTargetRef.DeploymentName == "" {
reqLogger.Info("Detected ScaleType = Job")
return r.reconcileJobType(reqLogger, scaledObject)
Expand Down Expand Up @@ -175,7 +184,11 @@ func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObjec
}

// ScaledObject was created or modified - let's start a new ScaleLoop
r.startScaleLoop(logger, scaledObject)
err = r.startScaleLoop(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to start a new ScaleLoop")
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -216,7 +229,11 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal
}

// ScaledObject was created - let's start a new ScaleLoop
r.startScaleLoop(logger, scaledObject)
err = r.startScaleLoop(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to start a new ScaleLoop")
return reconcile.Result{}, err
}

// HPA created successfully & ScaleLoop started - don't requeue
return reconcile.Result{}, nil
Expand All @@ -225,31 +242,13 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal
return reconcile.Result{}, err
}

// Check whether update of HPA is needed
updateHPA := false
scaledObjectMinReplicaCount := getHpaMinReplicas(scaledObject)
if foundHpa.Spec.MinReplicas != scaledObjectMinReplicaCount {
updateHPA = true
foundHpa.Spec.MinReplicas = scaledObjectMinReplicaCount
}

scaledObjectMaxReplicaCount := getHpaMaxReplicas(scaledObject)
if foundHpa.Spec.MaxReplicas != scaledObjectMaxReplicaCount {
updateHPA = true
foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount
}

newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName)
// Update hpa HPA if needed
updateHpa, err := r.checkHPAForUpdate(logger, scaledObject, foundHpa, deploymentName)
if err != nil {
logger.Error(err, "Failed to create MetricSpec")
logger.Error(err, "Failed to check HPA for possible update")
return reconcile.Result{}, err
}
if !reflect.DeepEqual(foundHpa.Spec.Metrics, newMetricSpec) {
updateHPA = true
foundHpa.Spec.Metrics = newMetricSpec
}

if updateHPA {
if updateHpa {
err = r.client.Update(context.TODO(), foundHpa)
if err != nil {
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
Expand All @@ -258,8 +257,19 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal
logger.Info("Updated HPA according to ScaledObject", "HPA.Namespace", hpaNamespace, "HPA.Name", hpaName)
}

// ScaledObject was modified - let's start a new ScaleLoop
r.startScaleLoop(logger, scaledObject)
// Let's start a new ScaleLoop if ScaledObject's Generation was changed
updateNeeded, err := r.scaledObjectGenerationChanged(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to check ScaledObject's Generation change")
return reconcile.Result{}, err
}
if updateNeeded {
err = r.startScaleLoop(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to start a new ScaleLoop")
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -293,16 +303,21 @@ func checkDeploymentTypeScaledObject(scaledObject *kedav1alpha1.ScaledObject) (s
}

// startScaleLoop starts ScaleLoop handler for the respective ScaledObject
func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) {
func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {

logger.V(1).Info("Starting a new ScaleLoop")

scaleHandler := scalehandler.NewScaleHandler(r.client, r.scheme)

key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
return
return err
}

// store ScaledObject's current Generation
r.scaledObjectsGenerations.Store(key, scaledObject.Generation)

ctx, cancel := context.WithCancel(context.TODO())

// cancel the outdated ScaleLoop for the same ScaledObject (if exists)
Expand All @@ -315,6 +330,25 @@ func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject
r.scaleLoopContexts.Store(key, cancel)
}
go scaleHandler.HandleScaleLoop(ctx, scaledObject)

return nil
}

func (r *ReconcileScaledObject) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
return true, err
}

value, loaded := r.scaledObjectsGenerations.Load(key)
if loaded {
generation := value.(int64)
if generation == scaledObject.Generation {
return false, nil
}
}
return true, nil
}

// newHPAForScaledObject returns HPA as it is specified in ScaledObject
Expand Down Expand Up @@ -346,6 +380,34 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled
}, nil
}

// checkHPAForUpdate checks whether update of HPA is needed
func (r *ReconcileScaledObject) checkHPAForUpdate(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta1.HorizontalPodAutoscaler, deploymentName string) (bool, error) {
updateHPA := false
scaledObjectMinReplicaCount := getHpaMinReplicas(scaledObject)
if *foundHpa.Spec.MinReplicas != *scaledObjectMinReplicaCount {
updateHPA = true
foundHpa.Spec.MinReplicas = scaledObjectMinReplicaCount
}

scaledObjectMaxReplicaCount := getHpaMaxReplicas(scaledObject)
if foundHpa.Spec.MaxReplicas != scaledObjectMaxReplicaCount {
updateHPA = true
foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount
}

newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName)
if err != nil {
logger.Error(err, "Failed to create MetricSpec")
return true, err
}
if fmt.Sprintf("%v", foundHpa.Spec.Metrics) != fmt.Sprintf("%v", newMetricSpec) {
updateHPA = true
foundHpa.Spec.Metrics = newMetricSpec
}

return updateHPA, nil
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, deploymentName string) ([]autoscalingv2beta1.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta1.MetricSpec
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/scaledobject/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func (r *ReconcileScaledObject) finalizeScaledObject(logger logr.Logger, scaledO
return err
}

// store ScaledObject's current Generation
r.scaledObjectsGenerations.Delete(key)

result, ok := r.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
Expand Down