From d9100cd707ced5a41e8c15bf9301012ed181a3c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20K=C5=82obuszewski?= Date: Wed, 23 Nov 2022 13:20:44 +0100 Subject: [PATCH 1/3] Log node group min and current size when skipping scale down --- .../processors/nodes/pre_filtering_processor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cluster-autoscaler/processors/nodes/pre_filtering_processor.go b/cluster-autoscaler/processors/nodes/pre_filtering_processor.go index 2973d8105d50..d8428426ccce 100644 --- a/cluster-autoscaler/processors/nodes/pre_filtering_processor.go +++ b/cluster-autoscaler/processors/nodes/pre_filtering_processor.go @@ -62,8 +62,9 @@ func (n *PreFilteringScaleDownNodeProcessor) GetScaleDownCandidates(ctx *context klog.Errorf("Error while checking node group size %s: group size not found", nodeGroup.Id()) continue } - if size <= nodeGroup.MinSize() { - klog.V(1).Infof("Skipping %s - node group min size reached", node.Name) + minSize := nodeGroup.MinSize() + if size <= minSize { + klog.V(1).Infof("Skipping %s - node group min size reached (current: %d, min: %d)", node.Name, size, minSize) continue } result = append(result, node) From 6b7c2911c277918881bd1b41de950188c93e29f7 Mon Sep 17 00:00:00 2001 From: Aleksandra Gacek Date: Mon, 31 Oct 2022 15:25:55 +0100 Subject: [PATCH 2/3] Check owner reference in scale down planner to avoid double-counting already delete pods. --- .../core/scaledown/planner/controller.go | 88 +++++++ .../core/scaledown/planner/controller_test.go | 230 ++++++++++++++++++ .../core/scaledown/planner/planner.go | 65 ++++- .../core/scaledown/planner/planner_test.go | 227 +++++++++++++---- 4 files changed, 548 insertions(+), 62 deletions(-) create mode 100644 cluster-autoscaler/core/scaledown/planner/controller.go create mode 100644 cluster-autoscaler/core/scaledown/planner/controller_test.go diff --git a/cluster-autoscaler/core/scaledown/planner/controller.go b/cluster-autoscaler/core/scaledown/planner/controller.go new file mode 100644 index 000000000000..153dc205129d --- /dev/null +++ b/cluster-autoscaler/core/scaledown/planner/controller.go @@ -0,0 +1,88 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planner + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" +) + +type controllerCalculatorImpl struct { + listers kubernetes.ListerRegistry +} + +func newControllerReplicasCalculator(listers kubernetes.ListerRegistry) controllerReplicasCalculator { + return &controllerCalculatorImpl{listers: listers} +} + +func (c *controllerCalculatorImpl) getReplicas(ownerRef metav1.OwnerReference, namespace string) (*replicasInfo, error) { + result := &replicasInfo{} + switch ownerRef.Kind { + case "StatefulSet": + sSet, err := c.listers.StatefulSetLister().StatefulSets(namespace).Get(ownerRef.Name) + if err != nil { + return nil, err + } + result.currentReplicas = sSet.Status.CurrentReplicas + if sSet.Spec.Replicas != nil { + result.targetReplicas = *sSet.Spec.Replicas + } else { + result.targetReplicas = 1 + } + case "ReplicaSet": + rSet, err := c.listers.ReplicaSetLister().ReplicaSets(namespace).Get(ownerRef.Name) + if err != nil { + return nil, err + } + result.currentReplicas = rSet.Status.Replicas + if rSet.Spec.Replicas != nil { + result.targetReplicas = *rSet.Spec.Replicas + } else { + result.targetReplicas = 1 + } + case "ReplicationController": + rController, err := c.listers.ReplicationControllerLister().ReplicationControllers(namespace).Get(ownerRef.Name) + if err != nil { + return nil, err + } + result.currentReplicas = rController.Status.Replicas + if rController.Spec.Replicas != nil { + result.targetReplicas = *rController.Spec.Replicas + } else { + result.targetReplicas = 1 + } + case "Job": + job, err := c.listers.JobLister().Jobs(namespace).Get(ownerRef.Name) + if err != nil { + return nil, err + } + result.currentReplicas = job.Status.Active + if job.Spec.Parallelism != nil { + result.targetReplicas = *job.Spec.Parallelism + } else { + result.targetReplicas = 1 + } + if job.Spec.Completions != nil && *job.Spec.Completions-job.Status.Succeeded < result.targetReplicas { + result.targetReplicas = *job.Spec.Completions - job.Status.Succeeded + } + default: + return nil, fmt.Errorf("unhandled controller type: %s", ownerRef.Kind) + } + return result, nil +} diff --git a/cluster-autoscaler/core/scaledown/planner/controller_test.go b/cluster-autoscaler/core/scaledown/planner/controller_test.go new file mode 100644 index 000000000000..bd73ff3f5c10 --- /dev/null +++ b/cluster-autoscaler/core/scaledown/planner/controller_test.go @@ -0,0 +1,230 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package planner + +import ( + "fmt" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +var podLabels = map[string]string{ + "app": "test", +} + +func TestReplicasCounter(t *testing.T) { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: "default", + UID: types.UID("batch/v1/namespaces/default/jobs/job"), + }, + Spec: batchv1.JobSpec{ + Parallelism: proto.Int32(3), + Selector: metav1.SetAsLabelSelector(podLabels), + }, + Status: batchv1.JobStatus{Active: 1}, + } + unsetJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unset_job", + Namespace: "default", + UID: types.UID("batch/v1/namespaces/default/jobs/unset_job"), + }, + } + jobWithSucceededReplicas := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "succeeded_job", + Namespace: "default", + UID: types.UID("batch/v1/namespaces/default/jobs/succeeded_job"), + }, + Spec: batchv1.JobSpec{ + Parallelism: proto.Int32(3), + Completions: proto.Int32(3), + Selector: metav1.SetAsLabelSelector(podLabels), + }, + Status: batchv1.JobStatus{ + Active: 1, + Succeeded: 2, + }, + } + rs := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rs", + Namespace: "default", + UID: types.UID("apps/v1/namespaces/default/replicasets/rs"), + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: proto.Int32(1), + Selector: metav1.SetAsLabelSelector(podLabels), + }, + Status: appsv1.ReplicaSetStatus{ + Replicas: 1, + }, + } + unsetRs := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unset_rs", + Namespace: "default", + UID: types.UID("apps/v1/namespaces/default/replicasets/unset_rs"), + }, + } + rC := &apiv1.ReplicationController{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rc", + Namespace: "default", + UID: types.UID("core/v1/namespaces/default/replicationcontrollers/rc"), + }, + Spec: apiv1.ReplicationControllerSpec{ + Replicas: proto.Int32(1), + Selector: podLabels, + }, + Status: apiv1.ReplicationControllerStatus{ + Replicas: 0, + }, + } + sS := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sset", + Namespace: "default", + UID: types.UID("apps/v1/namespaces/default/statefulsets/sset"), + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: proto.Int32(3), + Selector: metav1.SetAsLabelSelector(podLabels), + }, + Status: appsv1.StatefulSetStatus{ + Replicas: 1, + }, + } + rcLister, _ := kube_util.NewTestReplicationControllerLister([]*apiv1.ReplicationController{rC}) + jobLister, _ := kube_util.NewTestJobLister([]*batchv1.Job{job, unsetJob, jobWithSucceededReplicas}) + rsLister, _ := kube_util.NewTestReplicaSetLister([]*appsv1.ReplicaSet{rs, unsetRs}) + ssLister, _ := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{sS}) + listers := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, rcLister, jobLister, rsLister, ssLister) + testCases := []struct { + name string + ownerRef metav1.OwnerReference + wantReplicas replicasInfo + expectErr bool + }{ + { + name: "job owner reference", + ownerRef: ownerRef("Job", job.Name), + wantReplicas: replicasInfo{ + currentReplicas: 1, + targetReplicas: 3, + }, + }, + { + name: "job without parallelism owner reference", + ownerRef: ownerRef("Job", unsetJob.Name), + wantReplicas: replicasInfo{ + currentReplicas: 0, + targetReplicas: 1, + }, + }, + { + name: "job with succeeded replicas owner reference", + ownerRef: ownerRef("Job", jobWithSucceededReplicas.Name), + wantReplicas: replicasInfo{ + currentReplicas: 1, + targetReplicas: 1, + }, + }, + { + name: "replica set owner reference", + ownerRef: ownerRef("ReplicaSet", rs.Name), + wantReplicas: replicasInfo{ + currentReplicas: 1, + targetReplicas: 1, + }, + }, + { + name: "replica set without replicas spec specified owner reference", + ownerRef: ownerRef("ReplicaSet", unsetRs.Name), + wantReplicas: replicasInfo{ + currentReplicas: 0, + targetReplicas: 1, + }, + }, + { + name: "replica controller owner reference", + ownerRef: ownerRef("ReplicationController", rC.Name), + wantReplicas: replicasInfo{ + currentReplicas: 0, + targetReplicas: 1, + }, + }, + { + name: "stateful set owner reference", + ownerRef: ownerRef("StatefulSet", sS.Name), + wantReplicas: replicasInfo{ + currentReplicas: 0, + targetReplicas: 3, + }, + }, + { + name: "not existing job owner ref", + ownerRef: ownerRef("Job", "j"), + expectErr: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c := newControllerReplicasCalculator(listers) + res, err := c.getReplicas(tc.ownerRef, "default") + if tc.expectErr { + assert.Error(t, err) + } else { + if diff := cmp.Diff(tc.wantReplicas, *res, cmp.AllowUnexported(replicasInfo{})); diff != "" { + t.Errorf("getReplicas() diff (-want +got):\n%s", diff) + } + } + }) + } +} + +func ownerRef(ownerType, ownerName string) metav1.OwnerReference { + api := "" + strType := "" + switch ownerType { + case "ReplicaSet": + api = "apps/v1" + strType = "replicasets" + case "StatefulSet": + api = "apps/v1" + strType = "statefulsets" + case "ReplicationController": + api = "core/v1" + strType = "replicationcontrollers" + case "Job": + api = "batch/v1" + strType = "jobs" + } + return test.GenerateOwnerReferences(ownerName, ownerType, api, types.UID(fmt.Sprintf("%s/namespaces/default/%s/%s", api, strType, ownerName)))[0] +} diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index c84a844300a6..705b575b3ca0 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" @@ -49,6 +50,15 @@ type removalSimulator interface { SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, pdbs []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) } +// controllerReplicasCalculator calculates a number of target and expected replicas for a given controller. +type controllerReplicasCalculator interface { + getReplicas(metav1.OwnerReference, string) (*replicasInfo, error) +} + +type replicasInfo struct { + targetReplicas, currentReplicas int32 +} + // Planner is responsible for deciding which nodes should be deleted during scale down. type Planner struct { context *context.AutoscalingContext @@ -61,6 +71,7 @@ type Planner struct { nodeUtilizationMap map[string]utilization.Info actuationStatus scaledown.ActuationStatus resourceLimitsFinder *resource.LimitsFinder + cc controllerReplicasCalculator } // New creates a new Planner object. @@ -75,6 +86,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor), nodeUtilizationMap: make(map[string]utilization.Info), resourceLimitsFinder: resourceLimitsFinder, + cc: newControllerReplicasCalculator(context.ListerRegistry), } } @@ -165,19 +177,42 @@ func (p *Planner) NodeUtilizationMap() map[string]utilization.Info { // - pods which were recently evicted (it is up to ActuationStatus to decide // what "recently" means in this case). // -// It is entirely possible for some external controller to have already created -// a replacement pod for such recent evictions, in which case the subsequent -// simulation will count them twice. This is ok: it is much safer to disrupt -// the scale down because of double-counting some pods than it is to scale down -// too aggressively. +// For pods that are controlled by controller known by CA, it will check whether +// they have been recreated and will inject only not yet recreated pods. func (p *Planner) injectOngoingActuation() error { - err := p.injectPods(currentlyDrainedPods(p.context.ClusterSnapshot.NodeInfos(), p.actuationStatus)) + currentlyDrainedRecreatablePods := filterRecreatable(currentlyDrainedPods(p.context.ClusterSnapshot.NodeInfos(), p.actuationStatus)) + recentlyEvictedRecreatablePods := filterRecreatable(p.actuationStatus.RecentEvictions()) + err := p.injectPods(currentlyDrainedRecreatablePods) if err != nil { return err } - // TODO(x13n): Check owner references to avoid double-counting already - // recreated pods. - return p.injectPods(p.actuationStatus.RecentEvictions()) + return p.injectPods(filterOutRecreatedPods(recentlyEvictedRecreatablePods, p.cc)) +} + +func filterOutRecreatedPods(pods []*apiv1.Pod, cc controllerReplicasCalculator) []*apiv1.Pod { + var podsToInject []*apiv1.Pod + addedReplicas := make(map[string]int32) + for _, pod := range pods { + ownerRef := getKnownOwnerRef(pod.GetOwnerReferences()) + // in case of unknown ownerRef (i.e. not recognized by CA) we still inject + // the pod, to be on the safe side in case there is some custom controller + // that will recreate the pod. + if ownerRef == nil { + podsToInject = append(podsToInject, pod) + continue + } + rep, err := cc.getReplicas(*ownerRef, pod.Namespace) + if err != nil { + podsToInject = append(podsToInject, pod) + continue + } + ownerUID := string(ownerRef.UID) + if rep.targetReplicas > rep.currentReplicas && addedReplicas[ownerUID] < rep.targetReplicas-rep.currentReplicas { + podsToInject = append(podsToInject, pod) + addedReplicas[ownerUID] += 1 + } + } + return podsToInject } func currentlyDrainedPods(niLister framework.NodeInfoLister, as scaledown.ActuationStatus) []*apiv1.Pod { @@ -208,7 +243,6 @@ func filterRecreatable(pods []*apiv1.Pod) []*apiv1.Pod { } func (p *Planner) injectPods(pods []*apiv1.Pod) error { - pods = filterRecreatable(pods) pods = clearNodeName(pods) // Note: We're using ScheduleAnywhere, but the pods won't schedule back // on the drained nodes due to taints. @@ -252,6 +286,17 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand } } +// getKnownOwnerRef returns ownerRef that is known by CA and CA knows the logic of how this controller recreates pods. +func getKnownOwnerRef(ownerRefs []metav1.OwnerReference) *metav1.OwnerReference { + for _, ownerRef := range ownerRefs { + switch ownerRef.Kind { + case "StatefulSet", "Job", "ReplicaSet", "ReplicationController": + return &ownerRef + } + } + return nil +} + func merged(a, b []string) []string { return append(append(make([]string, 0, len(a)+len(b)), a...), b...) } diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index 538faa5c70a7..509147208804 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -17,9 +17,11 @@ limitations under the License. package planner import ( + "fmt" "testing" "time" + "k8s.io/apimachinery/pkg/types" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -40,6 +42,10 @@ import ( "k8s.io/client-go/kubernetes/fake" ) +var rSetLabels = map[string]string{ + "app": "rs", +} + func TestUpdateClusterState(t *testing.T) { testCases := []struct { name string @@ -49,6 +55,7 @@ func TestUpdateClusterState(t *testing.T) { eligible []string wantUnneeded []string wantErr bool + replicasSets []*appsv1.ReplicaSet }{ { name: "all eligible", @@ -93,8 +100,8 @@ func TestUpdateClusterState(t *testing.T) { nodeUndergoingDeletion("n2", 2000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 500, 1, "n2"), - scheduledPod("p2", 500, 1, "n2"), + scheduledPod("p1", 500, 1, "n2", "rs"), + scheduledPod("p2", 500, 1, "n2", "rs"), }, eligible: []string{"n1"}, actuationStatus: &fakeActuationStatus{ @@ -109,9 +116,9 @@ func TestUpdateClusterState(t *testing.T) { nodeUndergoingDeletion("n2", 2000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 500, 1, "n2"), - scheduledPod("p2", 500, 1, "n2"), - scheduledPod("p3", 500, 1, "n2"), + scheduledPod("p1", 500, 1, "n2", "rs"), + scheduledPod("p2", 500, 1, "n2", "rs"), + scheduledPod("p3", 500, 1, "n2", "rs"), }, eligible: []string{"n1"}, actuationStatus: &fakeActuationStatus{ @@ -129,10 +136,10 @@ func TestUpdateClusterState(t *testing.T) { nodeUndergoingDeletion("n4", 2000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 500, 1, "n2"), - scheduledPod("p2", 500, 1, "n2"), - scheduledPod("p4", 500, 1, "n4"), - scheduledPod("p5", 500, 1, "n4"), + scheduledPod("p1", 500, 1, "n2", "rs"), + scheduledPod("p2", 500, 1, "n2", "rs"), + scheduledPod("p4", 500, 1, "n4", "rs"), + scheduledPod("p5", 500, 1, "n4", "rs"), }, eligible: []string{"n1", "n3"}, actuationStatus: &fakeActuationStatus{ @@ -149,11 +156,11 @@ func TestUpdateClusterState(t *testing.T) { nodeUndergoingDeletion("n4", 2000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 500, 1, "n2"), - scheduledPod("p2", 500, 1, "n2"), - scheduledPod("p3", 500, 1, "n2"), - scheduledPod("p4", 500, 1, "n4"), - scheduledPod("p5", 500, 1, "n4"), + scheduledPod("p1", 500, 1, "n2", "rs"), + scheduledPod("p2", 500, 1, "n2", "rs"), + scheduledPod("p3", 500, 1, "n2", "rs"), + scheduledPod("p4", 500, 1, "n4", "rs"), + scheduledPod("p5", 500, 1, "n4", "rs"), }, eligible: []string{"n1", "n3"}, actuationStatus: &fakeActuationStatus{ @@ -172,11 +179,11 @@ func TestUpdateClusterState(t *testing.T) { BuildTestNode("n5", 2000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 400, 1, "n1"), - scheduledPod("p2", 400, 1, "n2"), - scheduledPod("p3", 400, 1, "n3"), - scheduledPod("p4", 400, 1, "n4"), - scheduledPod("p5", 400, 1, "n5"), + scheduledPod("p1", 400, 1, "n1", "rs"), + scheduledPod("p2", 400, 1, "n2", "rs"), + scheduledPod("p3", 400, 1, "n3", "rs"), + scheduledPod("p4", 400, 1, "n4", "rs"), + scheduledPod("p5", 400, 1, "n5", "rs"), }, eligible: []string{"n1", "n3", "n5"}, actuationStatus: &fakeActuationStatus{ @@ -192,13 +199,13 @@ func TestUpdateClusterState(t *testing.T) { BuildTestNode("n3", 1000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 500, 1, "n2"), - scheduledPod("p2", 500, 1, "n2"), + scheduledPod("p1", 500, 1, "n2", "rs"), + scheduledPod("p2", 500, 1, "n2", "rs"), }, eligible: []string{"n1", "n2"}, actuationStatus: &fakeActuationStatus{ recentEvictions: []*apiv1.Pod{ - scheduledPod("p3", 500, 1, "n4"), + scheduledPod("p3", 500, 1, "n4", "rs"), }, }, wantUnneeded: []string{"n1"}, @@ -211,15 +218,15 @@ func TestUpdateClusterState(t *testing.T) { BuildTestNode("n3", 1000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 500, 1, "n2"), - scheduledPod("p2", 500, 1, "n2"), + scheduledPod("p1", 500, 1, "n2", "rs"), + scheduledPod("p2", 500, 1, "n2", "rs"), }, eligible: []string{"n1", "n2"}, actuationStatus: &fakeActuationStatus{ recentEvictions: []*apiv1.Pod{ - scheduledPod("p3", 500, 1, "n4"), - scheduledPod("p4", 500, 1, "n4"), - scheduledPod("p5", 500, 1, "n4"), + scheduledPod("p3", 500, 1, "n4", "rs"), + scheduledPod("p4", 500, 1, "n4", "rs"), + scheduledPod("p5", 500, 1, "n4", "rs"), }, }, wantUnneeded: []string{}, @@ -231,15 +238,15 @@ func TestUpdateClusterState(t *testing.T) { BuildTestNode("n2", 1000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 500, 1, "n1"), - scheduledPod("p2", 500, 1, "n1"), + scheduledPod("p1", 500, 1, "n1", "rs"), + scheduledPod("p2", 500, 1, "n1", "rs"), }, eligible: []string{"n1", "n2"}, actuationStatus: &fakeActuationStatus{ recentEvictions: []*apiv1.Pod{ - scheduledPod("p3", 500, 1, "n3"), - scheduledPod("p4", 500, 1, "n3"), - scheduledPod("p5", 500, 1, "n3"), + scheduledPod("p3", 500, 1, "n3", "rs"), + scheduledPod("p4", 500, 1, "n3", "rs"), + scheduledPod("p5", 500, 1, "n3", "rs"), }, }, wantUnneeded: []string{}, @@ -255,18 +262,18 @@ func TestUpdateClusterState(t *testing.T) { BuildTestNode("n5", 1000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 200, 1, "n1"), - scheduledPod("p2", 200, 1, "n2"), - scheduledPod("p3", 200, 1, "n3"), - scheduledPod("p4", 200, 1, "n4"), - scheduledPod("p5", 200, 1, "n5"), + scheduledPod("p1", 200, 1, "n1", "rs"), + scheduledPod("p2", 200, 1, "n2", "rs"), + scheduledPod("p3", 200, 1, "n3", "rs"), + scheduledPod("p4", 200, 1, "n4", "rs"), + scheduledPod("p5", 200, 1, "n5", "rs"), }, eligible: []string{"n1", "n3", "n5"}, actuationStatus: &fakeActuationStatus{ currentlyDrained: []string{"n2", "n4"}, recentEvictions: []*apiv1.Pod{ - scheduledPod("p6", 600, 1, "n6"), - scheduledPod("p7", 600, 1, "n6"), + scheduledPod("p6", 600, 1, "n6", "rs"), + scheduledPod("p7", 600, 1, "n6", "rs"), }, }, wantUnneeded: []string{}, @@ -281,27 +288,110 @@ func TestUpdateClusterState(t *testing.T) { BuildTestNode("n5", 1000, 10), }, pods: []*apiv1.Pod{ - scheduledPod("p1", 200, 1, "n1"), - scheduledPod("p2", 200, 1, "n2"), - scheduledPod("p3", 200, 1, "n3"), - scheduledPod("p4", 200, 1, "n4"), - scheduledPod("p5", 200, 1, "n5"), + scheduledPod("p1", 200, 1, "n1", "rs"), + scheduledPod("p2", 200, 1, "n2", "rs"), + scheduledPod("p3", 200, 1, "n3", "rs"), + scheduledPod("p4", 200, 1, "n4", "rs"), + scheduledPod("p5", 200, 1, "n5", "rs"), + }, + eligible: []string{"n1", "n3", "n5"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + recentEvictions: []*apiv1.Pod{ + scheduledPod("p6", 600, 1, "n6", "rs"), + }, + }, + wantUnneeded: []string{"n1"}, + }, + { + name: "multiple drained nodes and recent evictions, replicas rescheduled, two nodes unneeded", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + nodeUndergoingDeletion("n4", 1000, 10), + BuildTestNode("n5", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 200, 1, "n1", "rs"), + scheduledPod("p2", 200, 1, "n2", "rs"), + scheduledPod("p3", 200, 1, "n3", "rs"), + scheduledPod("p4", 200, 1, "n4", "rs"), + scheduledPod("p5", 200, 1, "n5", "rs"), + }, + eligible: []string{"n1", "n3", "n5"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + recentEvictions: []*apiv1.Pod{ + scheduledPod("p6", 600, 1, "n1", "rs1"), + scheduledPod("p7", 600, 1, "n3", "rs1"), + }, + }, + replicasSets: append(generateReplicaSetWithReplicas("rs1", 2, 2, rSetLabels), generateReplicaSets("rs", 5)...), + wantUnneeded: []string{"n1", "n3"}, + }, + { + name: "multiple drained nodes and recent evictions, some replicas rescheduled, one node unneeded", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + nodeUndergoingDeletion("n4", 1000, 10), + BuildTestNode("n5", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 200, 1, "n1", "rs"), + scheduledPod("p2", 200, 1, "n2", "rs"), + scheduledPod("p3", 200, 1, "n3", "rs"), + scheduledPod("p4", 200, 1, "n4", "rs"), + scheduledPod("p5", 200, 1, "n5", "rs"), }, eligible: []string{"n1", "n3", "n5"}, actuationStatus: &fakeActuationStatus{ currentlyDrained: []string{"n2", "n4"}, recentEvictions: []*apiv1.Pod{ - scheduledPod("p6", 600, 1, "n6"), + scheduledPod("p6", 600, 1, "n1", "rs1"), + scheduledPod("p7", 600, 1, "n3", "rs1"), }, }, + replicasSets: append(generateReplicaSetWithReplicas("rs1", 2, 1, rSetLabels), generateReplicaSets("rs", 5)...), wantUnneeded: []string{"n1"}, }, + { + name: "multiple drained nodes and recent evictions, pods belonging to ds", + nodes: []*apiv1.Node{ + BuildTestNode("n1", 1000, 10), + nodeUndergoingDeletion("n2", 1000, 10), + BuildTestNode("n3", 1000, 10), + nodeUndergoingDeletion("n4", 1000, 10), + BuildTestNode("n5", 1000, 10), + }, + pods: []*apiv1.Pod{ + scheduledPod("p1", 200, 1, "n1", "rs"), + scheduledPod("p2", 200, 1, "n2", "rs"), + scheduledPod("p3", 200, 1, "n3", "rs"), + scheduledPod("p4", 200, 1, "n4", "rs"), + scheduledPod("p5", 200, 1, "n5", "rs"), + }, + eligible: []string{"n1", "n3", "n5"}, + actuationStatus: &fakeActuationStatus{ + currentlyDrained: []string{"n2", "n4"}, + recentEvictions: []*apiv1.Pod{ + scheduledDSPod("p6", 600, 1, "n1"), + scheduledDSPod("p7", 600, 1, "n3"), + }, + }, + wantUnneeded: []string{"n1", "n3"}, + }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() - rsLister, err := kube_util.NewTestReplicaSetLister(generateReplicaSets()) + if tc.replicasSets == nil { + tc.replicasSets = generateReplicaSets("rs", 5) + } + rsLister, err := kube_util.NewTestReplicaSetLister(tc.replicasSets) assert.NoError(t, err) registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil) provider := testprovider.NewTestCloudProvider(nil, nil) @@ -330,14 +420,13 @@ func TestUpdateClusterState(t *testing.T) { } } -func generateReplicaSets() []*appsv1.ReplicaSet { - replicas := int32(5) +func generateReplicaSets(name string, replicas int32) []*appsv1.ReplicaSet { return []*appsv1.ReplicaSet{ { ObjectMeta: metav1.ObjectMeta{ - Name: "rs", + Name: name, Namespace: "default", - SelfLink: "api/v1/namespaces/default/replicasets/rs", + UID: rSetUID(name), }, Spec: appsv1.ReplicaSetSpec{ Replicas: &replicas, @@ -346,10 +435,44 @@ func generateReplicaSets() []*appsv1.ReplicaSet { } } -func scheduledPod(name string, cpu, memory int64, nodeName string) *apiv1.Pod { +func generateReplicaSetWithReplicas(name string, specReplicas, statusReplicas int32, labels map[string]string) []*appsv1.ReplicaSet { + return []*appsv1.ReplicaSet{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: rSetUID(name), + }, + Spec: appsv1.ReplicaSetSpec{ + Replicas: &specReplicas, + Selector: metav1.SetAsLabelSelector(labels), + }, + Status: appsv1.ReplicaSetStatus{ + Replicas: statusReplicas, + }, + }, + } +} + +func rSetUID(name string) types.UID { + return types.UID(fmt.Sprintf("api/v1/namespaces/default/replicasets/%s", name)) +} + +func scheduledDSPod(name string, cpu, memory int64, nodeName string) *apiv1.Pod { + p := BuildTestPod(name, cpu, memory) + p.OwnerReferences = GenerateOwnerReferences("ds", "DaemonSet", "extensions/v1beta1", "api/v1/namespaces/default/daemonsets/ds") + p.Spec.NodeName = nodeName + p.Namespace = "default" + p.Labels = rSetLabels + return p +} + +func scheduledPod(name string, cpu, memory int64, nodeName, rSetName string) *apiv1.Pod { p := BuildTestPod(name, cpu, memory) - p.OwnerReferences = GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", "") + p.OwnerReferences = GenerateOwnerReferences(rSetName, "ReplicaSet", "extensions/v1beta1", rSetUID(rSetName)) p.Spec.NodeName = nodeName + p.Namespace = "default" + p.Labels = rSetLabels return p } From a20685b745b194993bb9a2f2aae9ccd684891f3d Mon Sep 17 00:00:00 2001 From: Aleksandra Gacek Date: Fri, 25 Nov 2022 14:08:08 +0100 Subject: [PATCH 3/3] Use ScaleDownSetProcessor.GetNodesToRemove in scale down planner to filter NodesToDelete. --- .../core/scaledown/legacy/legacy.go | 6 +- .../core/scaledown/planner/planner.go | 60 ++++++++++++------- .../core/scaledown/unneeded/nodes.go | 9 ++- 3 files changed, 45 insertions(+), 30 deletions(-) diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy.go b/cluster-autoscaler/core/scaledown/legacy/legacy.go index 9e2cf563b19e..c224a74a43a0 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy.go @@ -254,7 +254,7 @@ func (sd *ScaleDown) mapNodesToStatusScaleDownNodes(nodes []*apiv1.Node, nodeGro } // NodesToDelete selects the nodes to delete for scale down. -func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDisruptionBudget) (empty, drain []*apiv1.Node, res status.ScaleDownResult, err errors.AutoscalerError) { +func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDisruptionBudget) (_, drain []*apiv1.Node, res status.ScaleDownResult, err errors.AutoscalerError) { _, drained := sd.nodeDeletionTracker.DeletionsInProgress() if len(drained) > 0 { return nil, nil, status.ScaleDownInProgress, nil @@ -288,10 +288,10 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi } candidateNames := make([]string, 0, len(empty)+len(nonEmpty)) for _, n := range empty { - candidateNames = append(candidateNames, n.Name) + candidateNames = append(candidateNames, n.Node.Name) } for _, n := range nonEmpty { - candidateNames = append(candidateNames, n.Name) + candidateNames = append(candidateNames, n.Node.Name) } if len(candidateNames) == 0 { diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index 6fdb7b413eca..29ce2fc52d44 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -28,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" @@ -61,32 +62,34 @@ type replicasInfo struct { // Planner is responsible for deciding which nodes should be deleted during scale down. type Planner struct { - context *context.AutoscalingContext - unremovableNodes *unremovable.Nodes - unneededNodes *unneeded.Nodes - rs removalSimulator - actuationInjector *scheduling.HintingSimulator - latestUpdate time.Time - eligibilityChecker eligibilityChecker - nodeUtilizationMap map[string]utilization.Info - actuationStatus scaledown.ActuationStatus - resourceLimitsFinder *resource.LimitsFinder - cc controllerReplicasCalculator + context *context.AutoscalingContext + unremovableNodes *unremovable.Nodes + unneededNodes *unneeded.Nodes + rs removalSimulator + actuationInjector *scheduling.HintingSimulator + latestUpdate time.Time + eligibilityChecker eligibilityChecker + nodeUtilizationMap map[string]utilization.Info + actuationStatus scaledown.ActuationStatus + resourceLimitsFinder *resource.LimitsFinder + cc controllerReplicasCalculator + scaleDownSetProcessor nodes.ScaleDownSetProcessor } // New creates a new Planner object. func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions simulator.NodeDeleteOptions) *Planner { resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor) return &Planner{ - context: context, - unremovableNodes: unremovable.NewNodes(), - unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder), - rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, simulator.NewUsageTracker(), deleteOptions, true), - actuationInjector: scheduling.NewHintingSimulator(context.PredicateChecker), - eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor), - nodeUtilizationMap: make(map[string]utilization.Info), - resourceLimitsFinder: resourceLimitsFinder, - cc: newControllerReplicasCalculator(context.ListerRegistry), + context: context, + unremovableNodes: unremovable.NewNodes(), + unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder), + rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, simulator.NewUsageTracker(), deleteOptions, true), + actuationInjector: scheduling.NewHintingSimulator(context.PredicateChecker), + eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor), + nodeUtilizationMap: make(map[string]utilization.Info), + resourceLimitsFinder: resourceLimitsFinder, + cc: newControllerReplicasCalculator(context.ListerRegistry), + scaleDownSetProcessor: processors.ScaleDownSetProcessor, } } @@ -133,11 +136,24 @@ func (p *Planner) NodesToDelete() (empty, needDrain []*apiv1.Node) { return nil, nil } limitsLeft := p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate) - empty, needDrain, unremovable := p.unneededNodes.RemovableAt(p.context, p.latestUpdate, limitsLeft, resourceLimiter.GetResources(), p.actuationStatus) + emptyRemovable, needDrainRemovable, unremovable := p.unneededNodes.RemovableAt(p.context, p.latestUpdate, limitsLeft, resourceLimiter.GetResources(), p.actuationStatus) for _, u := range unremovable { p.unremovableNodes.Add(u) } - // TODO: filter results with ScaleDownSetProcessor.GetNodesToRemove + nodesToRemove := p.scaleDownSetProcessor.GetNodesToRemove( + p.context, + // We need to pass empty nodes first, as there might be some non-empty scale + // downs already in progress. If we pass the empty nodes first, they will be first + // to get deleted, thus we decrease chances of hitting the limit on non-empty scale down. + append(emptyRemovable, needDrainRemovable...), + p.context.AutoscalingOptions.MaxScaleDownParallelism) + for _, nodeToRemove := range nodesToRemove { + if len(nodeToRemove.PodsToReschedule) > 0 { + needDrain = append(needDrain, nodeToRemove.Node) + } else { + empty = append(empty, nodeToRemove.Node) + } + } return empty, needDrain } diff --git a/cluster-autoscaler/core/scaledown/unneeded/nodes.go b/cluster-autoscaler/core/scaledown/unneeded/nodes.go index c0c60e1b8d48..70d690694199 100644 --- a/cluster-autoscaler/core/scaledown/unneeded/nodes.go +++ b/cluster-autoscaler/core/scaledown/unneeded/nodes.go @@ -117,21 +117,20 @@ func (n *Nodes) Drop(node string) { // RemovableAt returns all nodes that can be removed at a given time, divided // into empty and non-empty node lists, as well as a list of nodes that were // unneeded, but are not removable, annotated by reason. -func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) (empty, needDrain []*apiv1.Node, unremovable []*simulator.UnremovableNode) { +func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []*simulator.UnremovableNode) { nodeGroupSize := utils.GetNodeGroupSizeMap(context.CloudProvider) for nodeName, v := range n.byName { klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String()) - node := v.ntbr.Node if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeft, resourcesWithLimits, as); r != simulator.NoReason { - unremovable = append(unremovable, &simulator.UnremovableNode{Node: node, Reason: r}) + unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r}) continue } if len(v.ntbr.PodsToReschedule) > 0 { - needDrain = append(needDrain, node) + needDrain = append(needDrain, v.ntbr) } else { - empty = append(empty, node) + empty = append(empty, v.ntbr) } } return