Skip to content

Commit

Permalink
Modify scale down set processor to add reasons to unremovable nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
abdelrahman882 committed Oct 28, 2024
1 parent 00e19fd commit 98396b4
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 141 deletions.
41 changes: 21 additions & 20 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package planner

import (
"fmt"
"math"
"time"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -73,10 +72,10 @@ type Planner struct {
minUpdateInterval time.Duration
eligibilityChecker eligibilityChecker
nodeUtilizationMap map[string]utilization.Info
actuationStatus scaledown.ActuationStatus
resourceLimitsFinder *resource.LimitsFinder
cc controllerReplicasCalculator
scaleDownSetProcessor nodes.ScaleDownSetProcessor
scaleDownContext *nodes.ScaleDownContext
}

// New creates a new Planner object.
Expand All @@ -97,6 +96,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
resourceLimitsFinder: resourceLimitsFinder,
cc: newControllerReplicasCalculator(context.ListerRegistry),
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
scaleDownContext: nodes.NewDefaultScaleDownContext(),
minUpdateInterval: minUpdateInterval,
}
}
Expand All @@ -110,7 +110,7 @@ func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*api
p.minUpdateInterval = updateInterval
}
p.latestUpdate = currentTime
p.actuationStatus = as
p.scaleDownContext.ActuationStatus = as
// Avoid persisting changes done by the simulation.
p.context.ClusterSnapshot.Fork()
defer p.context.ClusterSnapshot.Revert()
Expand Down Expand Up @@ -147,22 +147,17 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
klog.Errorf("Nothing will scale down, failed to create resource limiter: %v", err)
return nil, nil
}
limitsLeft := p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate)
emptyRemovable, needDrainRemovable, unremovable := p.unneededNodes.RemovableAt(p.context, p.latestUpdate, limitsLeft, resourceLimiter.GetResources(), p.actuationStatus)
for _, u := range unremovable {
p.unremovableNodes.Add(u)
}
needDrainRemovable = sortByRisk(needDrainRemovable)
nodesToRemove := p.scaleDownSetProcessor.GetNodesToRemove(
p.context,
// We need to pass empty nodes first, as there might be some non-empty scale
// downs already in progress. If we pass the empty nodes first, they will be first
// to get deleted, thus we decrease chances of hitting the limit on non-empty scale down.
append(emptyRemovable, needDrainRemovable...),
// No need to limit the number of nodes, since it will happen later, in the actuation stage.
// It will make a more appropriate decision by using additional information about deletions
// in progress.
math.MaxInt)
p.scaleDownContext.ResourcesLeft = p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate).DeepCopy()
p.scaleDownContext.ResourcesWithLimits = resourceLimiter.GetResources()
emptyRemovableNodes, needDrainRemovableNodes, unremovableNodes := p.unneededNodes.RemovableAt(p.context, *p.scaleDownContext, p.latestUpdate)
p.addUnremovableNodes(unremovableNodes)

needDrainRemovableNodes = sortByRisk(needDrainRemovableNodes)
candidatesToBeRemoved := append(emptyRemovableNodes, needDrainRemovableNodes...)

nodesToRemove, unremovableNodes := p.scaleDownSetProcessor.FilterUnremovableNodes(p.context, p.scaleDownContext, candidatesToBeRemoved)
p.addUnremovableNodes(unremovableNodes)

for _, nodeToRemove := range nodesToRemove {
if len(nodeToRemove.PodsToReschedule) > 0 {
needDrain = append(needDrain, nodeToRemove.Node)
Expand All @@ -174,6 +169,12 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) {
return empty, needDrain
}

func (p *Planner) addUnremovableNodes(unremovableNodes []simulator.UnremovableNode) {
for _, u := range unremovableNodes {
p.unremovableNodes.Add(&u)
}
}

func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) {
nodeInfos, err := s.NodeInfos().List()
if err != nil {
Expand Down Expand Up @@ -212,7 +213,7 @@ func (p *Planner) NodeUtilizationMap() map[string]utilization.Info {
// For pods that are controlled by controller known by CA, it will check whether
// they have been recreated and will inject only not yet recreated pods.
func (p *Planner) injectRecentlyEvictedPods() error {
recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.actuationStatus.RecentEvictions())
recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.scaleDownContext.ActuationStatus.RecentEvictions())
return p.injectPods(filterOutRecreatedPods(recentlyEvictedRecreatablePods, p.cc))
}

Expand Down
9 changes: 5 additions & 4 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
Expand Down Expand Up @@ -501,7 +502,7 @@ func TestUpdateClusterState(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
Expand Down Expand Up @@ -697,7 +698,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
Expand Down Expand Up @@ -865,9 +866,9 @@ func TestNodesToDelete(t *testing.T) {
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil)
p.latestUpdate = time.Now()
p.actuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(allNodes))}
empty, drain := p.NodesToDelete(time.Now())
Expand Down
20 changes: 10 additions & 10 deletions cluster-autoscaler/core/scaledown/unneeded/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -117,31 +118,30 @@ func (n *Nodes) Drop(node string) {
// RemovableAt returns all nodes that can be removed at a given time, divided
// into empty and non-empty node lists, as well as a list of nodes that were
// unneeded, but are not removable, annotated by reason.
func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []*simulator.UnremovableNode) {
func (n *Nodes) RemovableAt(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, ts time.Time) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []simulator.UnremovableNode) {
nodeGroupSize := utils.GetNodeGroupSizeMap(context.CloudProvider)
resourcesLeftCopy := resourcesLeft.DeepCopy()
emptyNodes, drainNodes := n.splitEmptyAndNonEmptyNodes()

for nodeName, v := range emptyNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String())
if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason {
unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason {
unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
continue
}
empty = append(empty, v.ntbr)
}
for nodeName, v := range drainNodes {
klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String())
if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason {
unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason {
unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r})
continue
}
needDrain = append(needDrain, v.ntbr)
}
return
}

func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node, ts time.Time, nodeGroupSize map[string]int, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) simulator.UnremovableReason {
func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, v *node, ts time.Time, nodeGroupSize map[string]int) simulator.UnremovableReason {
node := v.ntbr.Node
// Check if node is marked with no scale down annotation.
if eligibility.HasNoScaleDownAnnotation(node) {
Expand Down Expand Up @@ -182,17 +182,17 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node,
}
}

if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, as); reason != simulator.NoReason {
if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, scaleDownContext.ActuationStatus); reason != simulator.NoReason {
return reason
}

resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, resourcesWithLimits)
resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, scaleDownContext.ResourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
return simulator.UnexpectedError
}

checkResult := resourcesLeft.TryDecrementBy(resourceDelta)
checkResult := scaleDownContext.ResourcesLeft.TryDecrementBy(resourceDelta)
if checkResult.Exceeded() {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources)
for _, resource := range checkResult.ExceededResources {
Expand Down
13 changes: 9 additions & 4 deletions cluster-autoscaler/core/scaledown/unneeded/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -188,10 +189,10 @@ func TestRemovableAt(t *testing.T) {
})
}

nodes := append(empty, drain...)
removableNodes := append(empty, drain...)
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.InsertNodeGroup(ng)
for _, node := range nodes {
for _, node := range removableNodes {
provider.AddNode("ng", node.Node)
}

Expand All @@ -204,8 +205,12 @@ func TestRemovableAt(t *testing.T) {
assert.NoError(t, err)

n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{})
n.Update(nodes, time.Now())
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, time.Now(), resource.Limits{}, []string{}, as)
n.Update(removableNodes, time.Now())
gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, nodes.ScaleDownContext{
ActuationStatus: as,
ResourcesLeft: resource.Limits{},
ResourcesWithLimits: []string{},
}, time.Now())
if len(gotDrainToRemove) != tc.numDrainToRemove || len(gotEmptyToRemove) != tc.numEmptyToRemove {
t.Errorf("%s: getNodesToRemove() return %d, %d, want %d, %d", tc.name, len(gotEmptyToRemove), len(gotDrainToRemove), tc.numEmptyToRemove, tc.numDrainToRemove)
}
Expand Down
19 changes: 10 additions & 9 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
Expand Down Expand Up @@ -1055,7 +1056,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
assert.NoError(t, err)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.ScaleStateNotifier.Register(clusterState)
if config.EnableAutoprovisioning {
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
Expand Down Expand Up @@ -1163,7 +1164,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)

processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
Expand Down Expand Up @@ -1209,7 +1210,7 @@ func TestBinpackingLimiter(t *testing.T) {

extraPod := BuildTestPod("p-new", 500, 0)

processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)

// We should stop binpacking after finding expansion option from first node group.
processors.BinpackingLimiter = &MockBinpackingLimiter{}
Expand Down Expand Up @@ -1263,7 +1264,7 @@ func TestScaleUpNoHelp(t *testing.T) {
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)

processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
Expand Down Expand Up @@ -1485,7 +1486,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0))
}

processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
Expand Down Expand Up @@ -1541,7 +1542,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())

processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0}

Expand Down Expand Up @@ -1596,7 +1597,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())

processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2}

Expand Down Expand Up @@ -1654,7 +1655,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -1746,7 +1747,7 @@ func TestScaleupAsyncNodeGroupsEnabled(t *testing.T) {

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())

processors := NewTestProcessors(&context)
processors := processorstest.NewTestProcessors(&context)
processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t}
processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0}
processors.AsyncNodeGroupStateChecker = &asyncnodegroups.MockAsyncNodeGroupStateChecker{IsUpcomingNodeGroup: tc.isUpcomingMockMap}
Expand Down
Loading

0 comments on commit 98396b4

Please sign in to comment.