Skip to content

Commit

Permalink
Modify scale down set processor to add reasons to unremovable nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
abdelrahman882 committed Oct 14, 2024
1 parent 04b1402 commit 9053d15
Show file tree
Hide file tree
Showing 16 changed files with 638 additions and 136 deletions.
45 changes: 25 additions & 20 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ type Planner struct {
minUpdateInterval time.Duration
eligibilityChecker eligibilityChecker
nodeUtilizationMap map[string]utilization.Info
actuationStatus scaledown.ActuationStatus
resourceLimitsFinder *resource.LimitsFinder
cc controllerReplicasCalculator
scaleDownSetProcessor nodes.ScaleDownSetProcessor
scaleDownContext *nodes.ScaleDownContext
}

// New creates a new Planner object.
Expand All @@ -97,7 +97,11 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
resourceLimitsFinder: resourceLimitsFinder,
cc: newControllerReplicasCalculator(context.ListerRegistry),
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
minUpdateInterval: minUpdateInterval,
// Setting MaxNodeCountToRemove to math.MaxInt as there is no need to limit the number of nodes,
// since it will happen later, in the actuation stage. It will make a more appropriate decision
// by using additional information about deletions in progress.
scaleDownContext: nodes.NewDefaultScaleDownContext(math.MaxInt),
minUpdateInterval: minUpdateInterval,
}
}

Expand All @@ -110,7 +114,7 @@ func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*api
p.minUpdateInterval = updateInterval
}
p.latestUpdate = currentTime
p.actuationStatus = as
p.scaleDownContext.ActuationStatus = as
// Avoid persisting changes done by the simulation.
p.context.ClusterSnapshot.Fork()
defer p.context.ClusterSnapshot.Revert()
Expand Down Expand Up @@ -147,22 +151,17 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
klog.Errorf("Nothing will scale down, failed to create resource limiter: %v", err)
return nil, nil
}
limitsLeft := p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate)
emptyRemovable, needDrainRemovable, unremovable := p.unneededNodes.RemovableAt(p.context, p.latestUpdate, limitsLeft, resourceLimiter.GetResources(), p.actuationStatus)
for _, u := range unremovable {
p.unremovableNodes.Add(u)
}
needDrainRemovable = sortByRisk(needDrainRemovable)
nodesToRemove := p.scaleDownSetProcessor.GetNodesToRemove(
p.context,
// We need to pass empty nodes first, as there might be some non-empty scale
// downs already in progress. If we pass the empty nodes first, they will be first
// to get deleted, thus we decrease chances of hitting the limit on non-empty scale down.
append(emptyRemovable, needDrainRemovable...),
// No need to limit the number of nodes, since it will happen later, in the actuation stage.
// It will make a more appropriate decision by using additional information about deletions
// in progress.
math.MaxInt)
p.scaleDownContext.ResourcesLeft = p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate).DeepCopy()
p.scaleDownContext.ResourcesWithLimits = resourceLimiter.GetResources()
emptyRemovableNodes, needDrainRemovableNodes, unremovableNodes := p.unneededNodes.RemovableAt(p.context, *p.scaleDownContext, p.latestUpdate)
p.addUnremovableNodes(unremovableNodes)

needDrainRemovableNodes = sortByRisk(needDrainRemovableNodes)
candidatesToBeRemoved := append(emptyRemovableNodes, needDrainRemovableNodes...)

nodesToRemove, unremovableNodes := p.scaleDownSetProcessor.FilterUnremovableNodes(p.context, *p.scaleDownContext, candidatesToBeRemoved)
p.addUnremovableNodes(unremovableNodes)

for _, nodeToRemove := range nodesToRemove {
if len(nodeToRemove.PodsToReschedule) > 0 {
needDrain = append(needDrain, nodeToRemove.Node)
Expand All @@ -174,6 +173,12 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
return empty, needDrain
}

func (p *Planner) addUnremovableNodes(unremovableNodes []simulator.UnremovableNode) {
for _, u := range unremovableNodes {
p.unremovableNodes.Add(&u)
}
}

func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) {
nodeInfos, err := s.NodeInfos().List()
if err != nil {
Expand Down Expand Up @@ -212,7 +217,7 @@ func (p *Planner) NodeUtilizationMap() map[string]utilization.Info {
// For pods that are controlled by controller known by CA, it will check whether
// they have been recreated and will inject only not yet recreated pods.
func (p *Planner) injectRecentlyEvictedPods() error {
recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.actuationStatus.RecentEvictions())
recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.scaleDownContext.ActuationStatus.RecentEvictions())
return p.injectPods(filterOutRecreatedPods(recentlyEvictedRecreatablePods, p.cc))
}

Expand Down
106 changes: 90 additions & 16 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package planner

import (
"fmt"
"math"
"testing"
"time"

Expand All @@ -35,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
Expand Down Expand Up @@ -498,7 +500,7 @@ func TestUpdateClusterState(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
Expand Down Expand Up @@ -694,7 +696,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
Expand All @@ -706,16 +708,18 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {

func TestNodesToDelete(t *testing.T) {
testCases := []struct {
name string
nodes map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved
wantEmpty []*apiv1.Node
wantDrain []*apiv1.Node
name string
nodes map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved
wantEmpty []*apiv1.Node
wantDrain []*apiv1.Node
maxNodeCountToBeRemoved int
}{
{
name: "empty",
nodes: map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{},
wantEmpty: []*apiv1.Node{},
wantDrain: []*apiv1.Node{},
name: "empty",
nodes: map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{},
wantEmpty: []*apiv1.Node{},
wantDrain: []*apiv1.Node{},
maxNodeCountToBeRemoved: math.MaxInt,
},
{
name: "single empty",
Expand All @@ -727,7 +731,26 @@ func TestNodesToDelete(t *testing.T) {
wantEmpty: []*apiv1.Node{
buildRemovableNode("test-node", 0).Node,
},
wantDrain: []*apiv1.Node{},
wantDrain: []*apiv1.Node{},
maxNodeCountToBeRemoved: math.MaxInt,
},
{
name: "multiple empty with limit",
nodes: map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{
sizedNodeGroup("test-ng", 3, false): {
buildRemovableNode("node-1", 0),
buildRemovableNode("node-2", 0),
buildRemovableNode("node-3", 0),
buildRemovableNode("node-4", 1),
},
},
wantEmpty: []*apiv1.Node{
buildRemovableNode("node-1", 0).Node,
buildRemovableNode("node-2", 0).Node,
buildRemovableNode("node-3", 0).Node,
},
wantDrain: []*apiv1.Node{},
maxNodeCountToBeRemoved: 3,
},
{
name: "single drain",
Expand All @@ -740,6 +763,7 @@ func TestNodesToDelete(t *testing.T) {
wantDrain: []*apiv1.Node{
buildRemovableNode("test-node", 1).Node,
},
maxNodeCountToBeRemoved: math.MaxInt,
},
{
name: "single empty atomic",
Expand All @@ -748,8 +772,9 @@ func TestNodesToDelete(t *testing.T) {
buildRemovableNode("node-1", 0),
},
},
wantEmpty: []*apiv1.Node{},
wantDrain: []*apiv1.Node{},
wantEmpty: []*apiv1.Node{},
wantDrain: []*apiv1.Node{},
maxNodeCountToBeRemoved: math.MaxInt,
},
{
name: "all empty atomic",
Expand All @@ -765,7 +790,8 @@ func TestNodesToDelete(t *testing.T) {
buildRemovableNode("node-2", 0).Node,
buildRemovableNode("node-3", 0).Node,
},
wantDrain: []*apiv1.Node{},
wantDrain: []*apiv1.Node{},
maxNodeCountToBeRemoved: math.MaxInt,
},
{
name: "some drain atomic",
Expand All @@ -783,6 +809,7 @@ func TestNodesToDelete(t *testing.T) {
wantDrain: []*apiv1.Node{
buildRemovableNode("node-3", 1).Node,
},
maxNodeCountToBeRemoved: math.MaxInt,
},
{
name: "different groups",
Expand Down Expand Up @@ -836,6 +863,52 @@ func TestNodesToDelete(t *testing.T) {
buildRemovableNode("node-14", 0).Node,
buildRemovableNode("node-15", 0).Node,
},
maxNodeCountToBeRemoved: math.MaxInt,
},
{
name: "different groups with max count equal to all empty",
nodes: map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{
sizedNodeGroup("standard-empty-ng", 3, false): {
buildRemovableNode("node-1", 0),
buildRemovableNode("node-2", 0),
buildRemovableNode("node-3", 0),
},
sizedNodeGroup("standard-drain-ng", 3, false): {
buildRemovableNode("node-4", 1),
buildRemovableNode("node-5", 2),
buildRemovableNode("node-6", 3),
},
sizedNodeGroup("standard-mixed-ng", 3, false): {
buildRemovableNode("node-7", 0),
buildRemovableNode("node-8", 1),
buildRemovableNode("node-9", 2),
},
sizedNodeGroup("atomic-empty-ng", 3, true): {
buildRemovableNode("node-10", 0),
buildRemovableNode("node-11", 0),
buildRemovableNode("node-12", 0),
},
sizedNodeGroup("atomic-mixed-ng", 3, true): {
buildRemovableNode("node-13", 0),
buildRemovableNode("node-14", 1),
buildRemovableNode("node-15", 2),
},
sizedNodeGroup("atomic-partial-ng", 3, true): {
buildRemovableNode("node-16", 0),
buildRemovableNode("node-17", 1),
},
},
wantEmpty: []*apiv1.Node{
buildRemovableNode("node-1", 0).Node,
buildRemovableNode("node-2", 0).Node,
buildRemovableNode("node-3", 0).Node,
buildRemovableNode("node-7", 0).Node,
buildRemovableNode("node-10", 0).Node,
buildRemovableNode("node-11", 0).Node,
buildRemovableNode("node-12", 0).Node,
},
wantDrain: []*apiv1.Node{},
maxNodeCountToBeRemoved: 9,
},
}
for _, tc := range testCases {
Expand All @@ -862,9 +935,10 @@ func TestNodesToDelete(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.latestUpdate = time.Now()
p.actuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.scaleDownContext.MaxNodeCountToRemove = tc.maxNodeCountToBeRemoved
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(allNodes))}
empty, drain := p.NodesToDelete(time.Now())
Expand Down
20 changes: 10 additions & 10 deletions cluster-autoscaler/core/scaledown/unneeded/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -117,31 +118,30 @@ func (n *Nodes) Drop(node string) {
// RemovableAt returns all nodes that can be removed at a given time, divided
// into empty and non-empty node lists, as well as a list of nodes that were
// unneeded, but are not removable, annotated by reason.
func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []*simulator.UnremovableNode) {
func (n *Nodes) RemovableAt(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, ts time.Time) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []simulator.UnremovableNode) {
nodeGroupSize := utils.GetNodeGroupSizeMap(context.CloudProvider)
resourcesLeftCopy := resourcesLeft.DeepCopy()
emptyNodes, drainNodes := n.splitEmptyAndNonEmptyNodes()

for nodeName, v := range emptyNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String())
if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason {
unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason {
unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
continue
}
empty = append(empty, v.ntbr)
}
for nodeName, v := range drainNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String())
if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason {
unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason {
unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
continue
}
needDrain = append(needDrain, v.ntbr)
}
return
}

func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node, ts time.Time, nodeGroupSize map[string]int, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) simulator.UnremovableReason {
func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, v *node, ts time.Time, nodeGroupSize map[string]int) simulator.UnremovableReason {
node := v.ntbr.Node
// Check if node is marked with no scale down annotation.
if eligibility.HasNoScaleDownAnnotation(node) {
Expand Down Expand Up @@ -182,17 +182,17 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node,
}
}

if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, as); reason != simulator.NoReason {
if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, scaleDownContext.ActuationStatus); reason != simulator.NoReason {
return reason
}

resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, resourcesWithLimits)
resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, scaleDownContext.ResourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
return simulator.UnexpectedError
}

checkResult := resourcesLeft.TryDecrementBy(resourceDelta)
checkResult := scaleDownContext.ResourcesLeft.TryDecrementBy(resourceDelta)
if checkResult.Exceeded() {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources)
for _, resource := range checkResult.ExceededResources {
Expand Down
15 changes: 11 additions & 4 deletions cluster-autoscaler/core/scaledown/unneeded/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package unneeded

import (
"fmt"
"math"
"testing"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -186,10 +188,10 @@ func TestRemovableAt(t *testing.T) {
})
}

nodes := append(empty, drain...)
removableNodes := append(empty, drain...)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.InsertNodeGroup(ng)
for _, node := range nodes {
for _, node := range removableNodes {
provider.AddNode("ng", node.Node)
}

Expand All @@ -202,8 +204,13 @@ func TestRemovableAt(t *testing.T) {
assert.NoError(t, err)

n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{})
n.Update(nodes, time.Now())
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, time.Now(), resource.Limits{}, []string{}, as)
n.Update(removableNodes, time.Now())
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, nodes.ScaleDownContext{
ActuationStatus: as,
ResourcesLeft: resource.Limits{},
ResourcesWithLimits: []string{},
MaxNodeCountToRemove: math.MaxInt,
}, time.Now())
if len(gotDrainToRemove) != tc.numDrainToRemove || len(gotEmptyToRemove) != tc.numEmptyToRemove {
t.Errorf("%s: getNodesToRemove() return %d, %d, want %d, %d", tc.name, len(gotEmptyToRemove), len(gotDrainToRemove), tc.numEmptyToRemove, tc.numDrainToRemove)
}
Expand Down
Loading

0 comments on commit 9053d15

Please sign in to comment.