diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index 3ece1db00490..9550516eacf9 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -18,7 +18,6 @@ package planner import ( "fmt" - "math" "time" apiv1 "k8s.io/api/core/v1" @@ -73,10 +72,10 @@ type Planner struct { minUpdateInterval time.Duration eligibilityChecker eligibilityChecker nodeUtilizationMap map[string]utilization.Info - actuationStatus scaledown.ActuationStatus resourceLimitsFinder *resource.LimitsFinder cc controllerReplicasCalculator scaleDownSetProcessor nodes.ScaleDownSetProcessor + scaleDownContext *nodes.ScaleDownContext } // New creates a new Planner object. @@ -97,6 +96,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling resourceLimitsFinder: resourceLimitsFinder, cc: newControllerReplicasCalculator(context.ListerRegistry), scaleDownSetProcessor: processors.ScaleDownSetProcessor, + scaleDownContext: nodes.NewDefaultScaleDownContext(), minUpdateInterval: minUpdateInterval, } } @@ -110,7 +110,7 @@ func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*api p.minUpdateInterval = updateInterval } p.latestUpdate = currentTime - p.actuationStatus = as + p.scaleDownContext.ActuationStatus = as // Avoid persisting changes done by the simulation. p.context.ClusterSnapshot.Fork() defer p.context.ClusterSnapshot.Revert() @@ -147,22 +147,17 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) { klog.Errorf("Nothing will scale down, failed to create resource limiter: %v", err) return nil, nil } - limitsLeft := p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate) - emptyRemovable, needDrainRemovable, unremovable := p.unneededNodes.RemovableAt(p.context, p.latestUpdate, limitsLeft, resourceLimiter.GetResources(), p.actuationStatus) - for _, u := range unremovable { - p.unremovableNodes.Add(u) - } - needDrainRemovable = sortByRisk(needDrainRemovable) - nodesToRemove := p.scaleDownSetProcessor.GetNodesToRemove( - p.context, - // We need to pass empty nodes first, as there might be some non-empty scale - // downs already in progress. If we pass the empty nodes first, they will be first - // to get deleted, thus we decrease chances of hitting the limit on non-empty scale down. - append(emptyRemovable, needDrainRemovable...), - // No need to limit the number of nodes, since it will happen later, in the actuation stage. - // It will make a more appropriate decision by using additional information about deletions - // in progress. - math.MaxInt) + p.scaleDownContext.ResourcesLeft = p.resourceLimitsFinder.LimitsLeft(p.context, nodes, resourceLimiter, p.latestUpdate).DeepCopy() + p.scaleDownContext.ResourcesWithLimits = resourceLimiter.GetResources() + emptyRemovableNodes, needDrainRemovableNodes, unremovableNodes := p.unneededNodes.RemovableAt(p.context, *p.scaleDownContext, p.latestUpdate) + p.addUnremovableNodes(unremovableNodes) + + needDrainRemovableNodes = sortByRisk(needDrainRemovableNodes) + candidatesToBeRemoved := append(emptyRemovableNodes, needDrainRemovableNodes...) + + nodesToRemove, unremovableNodes := p.scaleDownSetProcessor.FilterUnremovableNodes(p.context, p.scaleDownContext, candidatesToBeRemoved) + p.addUnremovableNodes(unremovableNodes) + for _, nodeToRemove := range nodesToRemove { if len(nodeToRemove.PodsToReschedule) > 0 { needDrain = append(needDrain, nodeToRemove.Node) @@ -174,6 +169,12 @@ func (p *Planner) NodesToDelete(_ time.Time) (empty, needDrain []*apiv1.Node) { return empty, needDrain } +func (p *Planner) addUnremovableNodes(unremovableNodes []simulator.UnremovableNode) { + for _, u := range unremovableNodes { + p.unremovableNodes.Add(&u) + } +} + func allNodes(s clustersnapshot.ClusterSnapshot) ([]*apiv1.Node, error) { nodeInfos, err := s.NodeInfos().List() if err != nil { @@ -212,7 +213,7 @@ func (p *Planner) NodeUtilizationMap() map[string]utilization.Info { // For pods that are controlled by controller known by CA, it will check whether // they have been recreated and will inject only not yet recreated pods. func (p *Planner) injectRecentlyEvictedPods() error { - recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.actuationStatus.RecentEvictions()) + recentlyEvictedRecreatablePods := pod_util.FilterRecreatablePods(p.scaleDownContext.ActuationStatus.RecentEvictions()) return p.injectPods(filterOutRecreatedPods(recentlyEvictedRecreatablePods, p.cc)) } diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index ce2e210bf92b..0aa4881a9613 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" . "k8s.io/autoscaler/cluster-autoscaler/core/test" + processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/options" @@ -501,7 +502,7 @@ func TestUpdateClusterState(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)} if tc.isSimulationTimeout { context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second @@ -697,7 +698,7 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))} p.minUpdateInterval = tc.updateInterval p.unneededNodes.Update(previouslyUnneeded, time.Now()) @@ -865,9 +866,9 @@ func TestNodesToDelete(t *testing.T) { assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil) deleteOptions := options.NodeDeleteOptions{} - p := New(&context, NewTestProcessors(&context), deleteOptions, nil) + p := New(&context, processorstest.NewTestProcessors(&context), deleteOptions, nil) p.latestUpdate = time.Now() - p.actuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second) + p.scaleDownContext.ActuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second) p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour)) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(allNodes))} empty, drain := p.NodesToDelete(time.Now()) diff --git a/cluster-autoscaler/core/scaledown/unneeded/nodes.go b/cluster-autoscaler/core/scaledown/unneeded/nodes.go index 47f4797ce2af..9c29787608f9 100644 --- a/cluster-autoscaler/core/scaledown/unneeded/nodes.go +++ b/cluster-autoscaler/core/scaledown/unneeded/nodes.go @@ -26,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -117,23 +118,22 @@ func (n *Nodes) Drop(node string) { // RemovableAt returns all nodes that can be removed at a given time, divided // into empty and non-empty node lists, as well as a list of nodes that were // unneeded, but are not removable, annotated by reason. -func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []*simulator.UnremovableNode) { +func (n *Nodes) RemovableAt(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, ts time.Time) (empty, needDrain []simulator.NodeToBeRemoved, unremovable []simulator.UnremovableNode) { nodeGroupSize := utils.GetNodeGroupSizeMap(context.CloudProvider) - resourcesLeftCopy := resourcesLeft.DeepCopy() emptyNodes, drainNodes := n.splitEmptyAndNonEmptyNodes() for nodeName, v := range emptyNodes { klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String()) - if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason { - unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r}) + if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason { + unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r}) continue } empty = append(empty, v.ntbr) } for nodeName, v := range drainNodes { klog.V(2).Infof("%s was unneeded for %s", nodeName, ts.Sub(v.since).String()) - if r := n.unremovableReason(context, v, ts, nodeGroupSize, resourcesLeftCopy, resourcesWithLimits, as); r != simulator.NoReason { - unremovable = append(unremovable, &simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r}) + if r := n.unremovableReason(context, scaleDownContext, v, ts, nodeGroupSize); r != simulator.NoReason { + unremovable = append(unremovable, simulator.UnremovableNode{Node: v.ntbr.Node, Reason: r}) continue } needDrain = append(needDrain, v.ntbr) @@ -141,7 +141,7 @@ func (n *Nodes) RemovableAt(context *context.AutoscalingContext, ts time.Time, r return } -func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node, ts time.Time, nodeGroupSize map[string]int, resourcesLeft resource.Limits, resourcesWithLimits []string, as scaledown.ActuationStatus) simulator.UnremovableReason { +func (n *Nodes) unremovableReason(context *context.AutoscalingContext, scaleDownContext nodes.ScaleDownContext, v *node, ts time.Time, nodeGroupSize map[string]int) simulator.UnremovableReason { node := v.ntbr.Node // Check if node is marked with no scale down annotation. if eligibility.HasNoScaleDownAnnotation(node) { @@ -182,17 +182,17 @@ func (n *Nodes) unremovableReason(context *context.AutoscalingContext, v *node, } } - if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, as); reason != simulator.NoReason { + if reason := verifyMinSize(node.Name, nodeGroup, nodeGroupSize, scaleDownContext.ActuationStatus); reason != simulator.NoReason { return reason } - resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, resourcesWithLimits) + resourceDelta, err := n.limitsFinder.DeltaForNode(context, node, nodeGroup, scaleDownContext.ResourcesWithLimits) if err != nil { klog.Errorf("Error getting node resources: %v", err) return simulator.UnexpectedError } - checkResult := resourcesLeft.TryDecrementBy(resourceDelta) + checkResult := scaleDownContext.ResourcesLeft.TryDecrementBy(resourceDelta) if checkResult.Exceeded() { klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources) for _, resource := range checkResult.ExceededResources { diff --git a/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go b/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go index 6d8fc7161f29..51048eb57df5 100644 --- a/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go +++ b/cluster-autoscaler/core/scaledown/unneeded/nodes_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/simulator" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -188,10 +189,10 @@ func TestRemovableAt(t *testing.T) { }) } - nodes := append(empty, drain...) + removableNodes := append(empty, drain...) provider := testprovider.NewTestCloudProvider(nil, nil) provider.InsertNodeGroup(ng) - for _, node := range nodes { + for _, node := range removableNodes { provider.AddNode("ng", node.Node) } @@ -204,8 +205,12 @@ func TestRemovableAt(t *testing.T) { assert.NoError(t, err) n := NewNodes(&fakeScaleDownTimeGetter{}, &resource.LimitsFinder{}) - n.Update(nodes, time.Now()) - gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, time.Now(), resource.Limits{}, []string{}, as) + n.Update(removableNodes, time.Now()) + gotEmptyToRemove, gotDrainToRemove, _ := n.RemovableAt(&ctx, nodes.ScaleDownContext{ + ActuationStatus: as, + ResourcesLeft: resource.Limits{}, + ResourcesWithLimits: []string{}, + }, time.Now()) if len(gotDrainToRemove) != tc.numDrainToRemove || len(gotEmptyToRemove) != tc.numEmptyToRemove { t.Errorf("%s: getNodesToRemove() return %d, %d, want %d, %d", tc.name, len(gotEmptyToRemove), len(gotDrainToRemove), tc.numEmptyToRemove, tc.numDrainToRemove) } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index 04c9ed4844a3..eba980f48c9e 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -46,6 +46,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -1055,7 +1056,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR assert.NoError(t, err) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) processors.ScaleStateNotifier.Register(clusterState) if config.EnableAutoprovisioning { processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} @@ -1163,7 +1164,7 @@ func TestScaleUpUnhealthy(t *testing.T) { clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 550, 0) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) @@ -1209,7 +1210,7 @@ func TestBinpackingLimiter(t *testing.T) { extraPod := BuildTestPod("p-new", 500, 0) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) // We should stop binpacking after finding expansion option from first node group. processors.BinpackingLimiter = &MockBinpackingLimiter{} @@ -1263,7 +1264,7 @@ func TestScaleUpNoHelp(t *testing.T) { clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) p3 := BuildTestPod("p-new", 500, 0) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) @@ -1485,7 +1486,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0)) } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) suOrchestrator := New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false) @@ -1541,7 +1542,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0} @@ -1596,7 +1597,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2} @@ -1654,7 +1655,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) { nodes := []*apiv1.Node{n1, n2} nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -1746,7 +1747,7 @@ func TestScaleupAsyncNodeGroupsEnabled(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 0} processors.AsyncNodeGroupStateChecker = &asyncnodegroups.MockAsyncNodeGroupStateChecker{IsUpcomingNodeGroup: tc.isUpcomingMockMap} diff --git a/cluster-autoscaler/core/scaleup/resource/manager_test.go b/cluster-autoscaler/core/scaleup/resource/manager_test.go index 0d01f9d831c1..ac1204be1b55 100644 --- a/cluster-autoscaler/core/scaleup/resource/manager_test.go +++ b/cluster-autoscaler/core/scaleup/resource/manager_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -68,7 +69,7 @@ func TestDeltaForNode(t *testing.T) { for _, testCase := range testCases { cp := testprovider.NewTestCloudProvider(nil, nil) ctx := newContext(t, cp) - processors := test.NewTestProcessors(&ctx) + processors := processorstest.NewTestProcessors(&ctx) ng := testCase.nodeGroupConfig group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem) @@ -109,7 +110,7 @@ func TestResourcesLeft(t *testing.T) { for _, testCase := range testCases { cp := newCloudProvider(t, 1000, 1000) ctx := newContext(t, cp) - processors := test.NewTestProcessors(&ctx) + processors := processorstest.NewTestProcessors(&ctx) ng := testCase.nodeGroupConfig _, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem) @@ -160,7 +161,7 @@ func TestApplyLimits(t *testing.T) { for _, testCase := range testCases { cp := testprovider.NewTestCloudProvider(nil, nil) ctx := newContext(t, cp) - processors := test.NewTestProcessors(&ctx) + processors := processorstest.NewTestProcessors(&ctx) ng := testCase.nodeGroupConfig group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem) @@ -218,7 +219,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) { provider.SetResourceLimiter(resourceLimiter) context := newContext(t, provider) - processors := test.NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) n1 := newNode(t, "n1", 8, 16) utils_test.AddGpusToNode(n1, 4) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index d4767ad20859..3433bcb5ef12 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -49,6 +49,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" + processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" @@ -164,7 +165,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error { func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) { deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) - ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), NewTestProcessors(ctx).NodeGroupConfigProcessor) + ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), processorstest.NewTestProcessors(ctx).NodeGroupConfigProcessor) } type nodeGroup struct { @@ -284,7 +285,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { ngConfigProcesssor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.autoscalingOptions.NodeGroupDefaults) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, context.LogRecorder, NewBackoff(), ngConfigProcesssor, processors.AsyncNodeGroupStateChecker) @@ -383,7 +384,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) suOrchestrator := orchestrator.New() @@ -651,7 +652,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) sddProcessor := scaledowncandidates.NewScaleDownCandidatesDelayProcessor() processors.ScaleStateNotifier.Register(sddProcessor) scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{} @@ -797,7 +798,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { setUpScaleDownActuator(&context, options) - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) processors.NodeGroupManager = nodeGroupManager processors.NodeGroupListProcessor = nodeGroupListProcessor @@ -950,7 +951,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) // broken node detected as unregistered @@ -1109,7 +1110,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) suOrchestrator := orchestrator.New() @@ -1241,7 +1242,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) @@ -1340,7 +1341,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) @@ -1443,7 +1444,7 @@ func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ OkTotalUnreadyCount: 1, } - processors := NewTestProcessors(&context) + processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingOptions.NodeGroupDefaults), processors.AsyncNodeGroupStateChecker) @@ -2088,14 +2089,14 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { ctx, err := NewScaleTestAutoscalingContext(autoscalingOptions, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil) assert.NoError(t, err) - processors := NewTestProcessors(&ctx) + processors := processorstest.NewTestProcessors(&ctx) // Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic. csrConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: nodeGroupCount * unreadyNodesCount} csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), processors.AsyncNodeGroupStateChecker) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. - actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, NewTestProcessors(&ctx).NodeGroupConfigProcessor) + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, processorstest.NewTestProcessors(&ctx).NodeGroupConfigProcessor) ctx.ScaleDownActuator = actuator // Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState. diff --git a/cluster-autoscaler/core/test/common.go b/cluster-autoscaler/core/test/common.go index 5eb7cd20709b..49a44476a5c5 100644 --- a/cluster-autoscaler/core/test/common.go +++ b/cluster-autoscaler/core/test/common.go @@ -27,30 +27,17 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander/random" "k8s.io/autoscaler/cluster-autoscaler/metrics" - "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" - "k8s.io/autoscaler/cluster-autoscaler/processors" - "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" - "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" - "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" - "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" - "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" - "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -175,32 +162,6 @@ func ExtractPodNames(pods []*apiv1.Pod) []string { return podNames } -// NewTestProcessors returns a set of simple processors for use in tests. -func NewTestProcessors(context *context.AutoscalingContext) *processors.AutoscalingProcessors { - return &processors.AutoscalingProcessors{ - PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker, scheduling.ScheduleAnywhere), - NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, - BinpackingLimiter: binpacking.NewTimeLimiter(context.MaxNodeGroupBinpackingDuration), - NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}), - ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor([]nodes.ScaleDownSetProcessor{ - nodes.NewMaxNodesProcessor(), - nodes.NewAtomicResizeFilteringProcessor(), - }), - // TODO(bskiba): change scale up test so that this can be a NoOpProcessor - ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, - ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, - AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, - NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), - TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), - NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults), - CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), - ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), - ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), - ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), - AsyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), - } -} - // NewScaleTestAutoscalingContext creates a new test autoscaling context for scaling tests. func NewScaleTestAutoscalingContext( options config.AutoscalingOptions, fakeClient kube_client.Interface, diff --git a/cluster-autoscaler/processors/nodes/scale_down_context.go b/cluster-autoscaler/processors/nodes/scale_down_context.go new file mode 100644 index 000000000000..0f5b7247dde4 --- /dev/null +++ b/cluster-autoscaler/processors/nodes/scale_down_context.go @@ -0,0 +1,34 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodes + +import ( + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" +) + +// ScaleDownContext keeps an updated version actuationStatus and resourcesLeft for the scaling down process +type ScaleDownContext struct { + ActuationStatus scaledown.ActuationStatus + ResourcesLeft resource.Limits + ResourcesWithLimits []string +} + +// NewDefaultScaleDownContext returns ScaleDownContext with passed MaxNodeCountToBeRemoved +func NewDefaultScaleDownContext() *ScaleDownContext { + return &ScaleDownContext{} +} diff --git a/cluster-autoscaler/processors/nodes/scale_down_set_processor.go b/cluster-autoscaler/processors/nodes/scale_down_set_processor.go index 9c86aadafa8a..cc624578d6d4 100644 --- a/cluster-autoscaler/processors/nodes/scale_down_set_processor.go +++ b/cluster-autoscaler/processors/nodes/scale_down_set_processor.go @@ -37,12 +37,24 @@ func NewCompositeScaleDownSetProcessor(orderedProcessorList []ScaleDownSetProces } } -// GetNodesToRemove selects nodes to remove. -func (p *CompositeScaleDownSetProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { - for _, p := range p.orderedProcessorList { - candidates = p.GetNodesToRemove(ctx, candidates, maxCount) +// FilterUnremovableNodes filters the passed removable candidates from unremovable nodes by calling orderedProcessorList in order +func (p *CompositeScaleDownSetProcessor) FilterUnremovableNodes(ctx *context.AutoscalingContext, scaleDownCtx *ScaleDownContext, candidates []simulator.NodeToBeRemoved) ([]simulator.NodeToBeRemoved, []simulator.UnremovableNode) { + unremovableNodes := []simulator.UnremovableNode{} + nodesToBeRemoved := []simulator.NodeToBeRemoved{} + nodesToBeRemoved = append(nodesToBeRemoved, candidates...) + + for indx, p := range p.orderedProcessorList { + processorRemovableNodes, processorUnremovableNodes := p.FilterUnremovableNodes(ctx, scaleDownCtx, nodesToBeRemoved) + + if len(processorRemovableNodes)+len(processorUnremovableNodes) != len(candidates) { + klog.Errorf("Scale down set composite processor failed with processor at index %d: removable nodes (%d) + unremovable nodes (%d) != candidates nodes (%d)", + indx, len(processorRemovableNodes), len(processorUnremovableNodes), len(candidates)) + } + + nodesToBeRemoved = processorRemovableNodes + unremovableNodes = append(unremovableNodes, processorUnremovableNodes...) } - return candidates + return nodesToBeRemoved, unremovableNodes } // CleanUp is called at CA termination @@ -52,28 +64,6 @@ func (p *CompositeScaleDownSetProcessor) CleanUp() { } } -// MaxNodesProcessor selects first maxCount nodes (if possible) to be removed -type MaxNodesProcessor struct { -} - -// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates -func (p *MaxNodesProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { - end := len(candidates) - if len(candidates) > maxCount { - end = maxCount - } - return candidates[:end] -} - -// CleanUp is called at CA termination -func (p *MaxNodesProcessor) CleanUp() { -} - -// NewMaxNodesProcessor returns a new MaxNodesProcessor -func NewMaxNodesProcessor() *MaxNodesProcessor { - return &MaxNodesProcessor{} -} - // AtomicResizeFilteringProcessor removes node groups which should be scaled down as one unit // if only part of these nodes were scheduled for scale down. // NOTE! When chaining with other processors, AtomicResizeFilteringProcessors should be always used last. @@ -82,21 +72,25 @@ func NewMaxNodesProcessor() *MaxNodesProcessor { type AtomicResizeFilteringProcessor struct { } -// GetNodesToRemove selects up to maxCount nodes for deletion, by selecting a first maxCount candidates -func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved { +// FilterUnremovableNodes marks all candidate nodes as unremovable if ZeroOrMaxNodeScaling is enabled and number of nodes to remove are not equal to target size +func (p *AtomicResizeFilteringProcessor) FilterUnremovableNodes(ctx *context.AutoscalingContext, scaleDownCtx *ScaleDownContext, candidates []simulator.NodeToBeRemoved) ([]simulator.NodeToBeRemoved, []simulator.UnremovableNode) { + nodesToBeRemoved := []simulator.NodeToBeRemoved{} + unremovableNodes := []simulator.UnremovableNode{} + atomicQuota := klogx.NodesLoggingQuota() standardQuota := klogx.NodesLoggingQuota() nodesByGroup := map[cloudprovider.NodeGroup][]simulator.NodeToBeRemoved{} - result := []simulator.NodeToBeRemoved{} for _, node := range candidates { nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node.Node) if err != nil { klog.Errorf("Node %v will not scale down, failed to get node info: %s", node.Node.Name, err) + unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.UnexpectedError}) continue } autoscalingOptions, err := nodeGroup.GetOptions(ctx.NodeGroupDefaults) if err != nil && err != cloudprovider.ErrNotImplemented { klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err) + unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.UnexpectedError}) continue } if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling { @@ -104,7 +98,7 @@ func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.Autoscali nodesByGroup[nodeGroup] = append(nodesByGroup[nodeGroup], node) } else { klogx.V(2).UpTo(standardQuota).Infof("Considering node %s for standard scale down", node.Node.Name) - result = append(result, node) + nodesToBeRemoved = append(nodesToBeRemoved, node) } } klogx.V(2).Over(atomicQuota).Infof("Considering %d other nodes for atomic scale down", -atomicQuota.Left()) @@ -113,16 +107,22 @@ func (p *AtomicResizeFilteringProcessor) GetNodesToRemove(ctx *context.Autoscali ngSize, err := nodeGroup.TargetSize() if err != nil { klog.Errorf("Nodes from group %s will not scale down, failed to get target size: %s", nodeGroup.Id(), err) + for _, node := range nodes { + unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.UnexpectedError}) + } continue } if ngSize == len(nodes) { klog.V(2).Infof("Scheduling atomic scale down for all %v nodes from node group %s", len(nodes), nodeGroup.Id()) - result = append(result, nodes...) + nodesToBeRemoved = append(nodesToBeRemoved, nodes...) } else { klog.V(2).Infof("Skipping scale down for %v nodes from node group %s, all %v nodes have to be scaled down atomically", len(nodes), nodeGroup.Id(), ngSize) + for _, node := range nodes { + unremovableNodes = append(unremovableNodes, simulator.UnremovableNode{Node: node.Node, Reason: simulator.AtomicScaleDownFailed}) + } } } - return result + return nodesToBeRemoved, unremovableNodes } // CleanUp is called at CA termination diff --git a/cluster-autoscaler/processors/nodes/scale_down_set_processor_test.go b/cluster-autoscaler/processors/nodes/scale_down_set_processor_test.go new file mode 100644 index 000000000000..dc43792c7183 --- /dev/null +++ b/cluster-autoscaler/processors/nodes/scale_down_set_processor_test.go @@ -0,0 +1,251 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodes + +import ( + "testing" + + "github.com/stretchr/testify/assert" + testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/config" + . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/simulator" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/client-go/kubernetes/fake" + schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics" +) + +func TestAtomicResizeFilterUnremovableNodes(t *testing.T) { + schedulermetrics.Register() + testCases := []struct { + name string + nodeGroups []struct { + nodeGroupName string + nodeGroupTargetSize int + zeroOrMaxNodeScaling bool + } + removableCandidates []struct { + candidate simulator.NodeToBeRemoved + nodeGroup string + } + scaleDownContext *ScaleDownContext + expectedToBeRemoved []simulator.NodeToBeRemoved + expectedUnremovable []simulator.UnremovableNode + }{ + { + name: "Atomic removal", + nodeGroups: []struct { + nodeGroupName string + nodeGroupTargetSize int + zeroOrMaxNodeScaling bool + }{ + { + nodeGroupName: "ng1", + nodeGroupTargetSize: 3, + zeroOrMaxNodeScaling: true, + }, + { + nodeGroupName: "ng2", + nodeGroupTargetSize: 4, + zeroOrMaxNodeScaling: true, + }, + }, + removableCandidates: []struct { + candidate simulator.NodeToBeRemoved + nodeGroup string + }{ + { + candidate: buildRemovableNode("ng1-node-1"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng1-node-2"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng1-node-3"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng2-node-1"), + nodeGroup: "ng2", + }, + { + candidate: buildRemovableNode("ng2-node-2"), + nodeGroup: "ng2", + }, + }, + scaleDownContext: NewDefaultScaleDownContext(), + expectedToBeRemoved: []simulator.NodeToBeRemoved{ + buildRemovableNode("ng1-node-1"), + buildRemovableNode("ng1-node-2"), + buildRemovableNode("ng1-node-3"), + }, + expectedUnremovable: []simulator.UnremovableNode{ + buildUnremovableNode("ng2-node-1", simulator.AtomicScaleDownFailed), + buildUnremovableNode("ng2-node-2", simulator.AtomicScaleDownFailed), + }, + }, + { + name: "Mixed Groups", + nodeGroups: []struct { + nodeGroupName string + nodeGroupTargetSize int + zeroOrMaxNodeScaling bool + }{ + { + nodeGroupName: "ng1", + nodeGroupTargetSize: 3, + zeroOrMaxNodeScaling: false, + }, + { + nodeGroupName: "ng2", + nodeGroupTargetSize: 4, + zeroOrMaxNodeScaling: true, + }, + }, + removableCandidates: []struct { + candidate simulator.NodeToBeRemoved + nodeGroup string + }{ + { + candidate: buildRemovableNode("ng1-node-1"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng1-node-2"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng1-node-3"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng2-node-1"), + nodeGroup: "ng2", + }, + { + candidate: buildRemovableNode("ng2-node-2"), + nodeGroup: "ng2", + }, + }, + scaleDownContext: NewDefaultScaleDownContext(), + expectedToBeRemoved: []simulator.NodeToBeRemoved{ + buildRemovableNode("ng1-node-1"), + buildRemovableNode("ng1-node-2"), + buildRemovableNode("ng1-node-3"), + }, + expectedUnremovable: []simulator.UnremovableNode{ + buildUnremovableNode("ng2-node-1", simulator.AtomicScaleDownFailed), + buildUnremovableNode("ng2-node-2", simulator.AtomicScaleDownFailed), + }, + }, + { + name: "No atomic groups", + nodeGroups: []struct { + nodeGroupName string + nodeGroupTargetSize int + zeroOrMaxNodeScaling bool + }{ + { + nodeGroupName: "ng1", + nodeGroupTargetSize: 3, + zeroOrMaxNodeScaling: false, + }, + { + nodeGroupName: "ng2", + nodeGroupTargetSize: 4, + zeroOrMaxNodeScaling: false, + }, + }, + removableCandidates: []struct { + candidate simulator.NodeToBeRemoved + nodeGroup string + }{ + { + candidate: buildRemovableNode("ng1-node-1"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng1-node-2"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng1-node-3"), + nodeGroup: "ng1", + }, + { + candidate: buildRemovableNode("ng2-node-1"), + nodeGroup: "ng2", + }, + { + candidate: buildRemovableNode("ng2-node-2"), + nodeGroup: "ng2", + }, + }, + scaleDownContext: NewDefaultScaleDownContext(), + expectedToBeRemoved: []simulator.NodeToBeRemoved{ + buildRemovableNode("ng1-node-1"), + buildRemovableNode("ng1-node-2"), + buildRemovableNode("ng1-node-3"), + buildRemovableNode("ng2-node-1"), + buildRemovableNode("ng2-node-2"), + }, + expectedUnremovable: []simulator.UnremovableNode{}, + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + processor := NewAtomicResizeFilteringProcessor() + provider := testprovider.NewTestCloudProvider(nil, nil) + for _, ng := range tc.nodeGroups { + provider.AddNodeGroupWithCustomOptions(ng.nodeGroupName, 0, 100, ng.nodeGroupTargetSize, &config.NodeGroupAutoscalingOptions{ + ZeroOrMaxNodeScaling: ng.zeroOrMaxNodeScaling, + }) + } + candidates := []simulator.NodeToBeRemoved{} + for _, node := range tc.removableCandidates { + provider.AddNode(node.nodeGroup, node.candidate.Node) + candidates = append(candidates, node.candidate) + } + context, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{}, + }, &fake.Clientset{}, nil, provider, nil, nil) + + toBeRemoved, unRemovable := processor.FilterUnremovableNodes(&context, tc.scaleDownContext, candidates) + + assert.ElementsMatch(t, tc.expectedToBeRemoved, toBeRemoved) + assert.ElementsMatch(t, tc.expectedUnremovable, unRemovable) + }) + } +} + +func buildRemovableNode(name string) simulator.NodeToBeRemoved { + return simulator.NodeToBeRemoved{ + Node: BuildTestNode(name, 1000, 10), + } +} + +func buildUnremovableNode(name string, reason simulator.UnremovableReason) simulator.UnremovableNode { + return simulator.UnremovableNode{ + Node: BuildTestNode(name, 1000, 10), + Reason: reason, + } +} diff --git a/cluster-autoscaler/processors/nodes/types.go b/cluster-autoscaler/processors/nodes/types.go index 3bc671084fc9..606911b9d54d 100644 --- a/cluster-autoscaler/processors/nodes/types.go +++ b/cluster-autoscaler/processors/nodes/types.go @@ -18,7 +18,6 @@ package nodes import ( apiv1 "k8s.io/api/core/v1" - "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -37,8 +36,10 @@ type ScaleDownNodeProcessor interface { // ScaleDownSetProcessor contains a method to select nodes for deletion type ScaleDownSetProcessor interface { - // GetNodesToRemove selects up to maxCount nodes for deletion - GetNodesToRemove(ctx *context.AutoscalingContext, candidates []simulator.NodeToBeRemoved, maxCount int) []simulator.NodeToBeRemoved + // FilterUnremovableNodes divides all candidates into removable nodes and unremovable nodes with reason + // Note that len(removableNodes) + len(unremovableNode) should equal len(candidates) + // in other words, each candidate should end up in one and only one of the resulting node lists. + FilterUnremovableNodes(ctx *context.AutoscalingContext, scaleDownCtx *ScaleDownContext, candidates []simulator.NodeToBeRemoved) ([]simulator.NodeToBeRemoved, []simulator.UnremovableNode) // CleanUp is called at CA termination CleanUp() } diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 69ae6273cc4d..cefdc36dcb22 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -87,14 +87,9 @@ func DefaultProcessors(options config.AutoscalingOptions) *AutoscalingProcessors MaxCapacityMemoryDifferenceRatio: config.DefaultMaxCapacityMemoryDifferenceRatio, MaxFreeDifferenceRatio: config.DefaultMaxFreeDifferenceRatio, }), - ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), - ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), - ScaleDownSetProcessor: nodes.NewCompositeScaleDownSetProcessor( - []nodes.ScaleDownSetProcessor{ - nodes.NewMaxNodesProcessor(), - nodes.NewAtomicResizeFilteringProcessor(), - }, - ), + ScaleUpStatusProcessor: status.NewDefaultScaleUpStatusProcessor(), + ScaleDownNodeProcessor: nodes.NewPreFilteringScaleDownNodeProcessor(), + ScaleDownSetProcessor: nodes.NewAtomicResizeFilteringProcessor(), ScaleDownStatusProcessor: status.NewDefaultScaleDownStatusProcessor(), AutoscalingStatusProcessor: status.NewDefaultAutoscalingStatusProcessor(), NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), diff --git a/cluster-autoscaler/processors/test/common.go b/cluster-autoscaler/processors/test/common.go new file mode 100644 index 000000000000..dd45d6569f91 --- /dev/null +++ b/cluster-autoscaler/processors/test/common.go @@ -0,0 +1,60 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor" + "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" + "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" + "k8s.io/autoscaler/cluster-autoscaler/processors/binpacking" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" + "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" +) + +// NewTestProcessors returns a set of simple processors for use in tests. +func NewTestProcessors(context *context.AutoscalingContext) *processors.AutoscalingProcessors { + return &processors.AutoscalingProcessors{ + PodListProcessor: podlistprocessor.NewDefaultPodListProcessor(context.PredicateChecker, scheduling.ScheduleAnywhere), + NodeGroupListProcessor: &nodegroups.NoOpNodeGroupListProcessor{}, + BinpackingLimiter: binpacking.NewTimeLimiter(context.MaxNodeGroupBinpackingDuration), + NodeGroupSetProcessor: nodegroupset.NewDefaultNodeGroupSetProcessor([]string{}, config.NodeGroupDifferenceRatios{}), + ScaleDownSetProcessor: nodes.NewAtomicResizeFilteringProcessor(), + // TODO(bskiba): change scale up test so that this can be a NoOpProcessor + ScaleUpStatusProcessor: &status.EventingScaleUpStatusProcessor{}, + ScaleDownStatusProcessor: &status.NoOpScaleDownStatusProcessor{}, + AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{}, + NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(), + TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false), + NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(context.NodeGroupDefaults), + CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), + ScaleDownCandidatesNotifier: scaledowncandidates.NewObserversList(), + ScaleStateNotifier: nodegroupchange.NewNodeGroupChangeObserversList(), + AsyncNodeGroupStateChecker: asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker(), + } +} diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index 59f0aaedbc5e..0b527625f589 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -26,7 +26,7 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" + v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" @@ -36,6 +36,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/provreq" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" @@ -486,7 +487,7 @@ func setupTest(t *testing.T, client *provreqclient.ProvisioningRequestClient, no assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, nodes, nil) - processors := NewTestProcessors(&autoscalingContext) + processors := processorstest.NewTestProcessors(&autoscalingContext) if autoprovisioning { processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2} diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 52ceacff8a37..34288e013039 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -73,6 +73,10 @@ const ( NotUnreadyLongEnough // NodeGroupMinSizeReached - node can't be removed because its node group is at its minimal size already. NodeGroupMinSizeReached + // NodeGroupMaxDeletionCountReached - node can't be removed because max node count to be removed value set in planner reached + NodeGroupMaxDeletionCountReached + // AtomicScaleDownFailed - node can't be removed as node group has ZeroOrMaxNodeScaling enabled and number of nodes to remove are not equal to target size + AtomicScaleDownFailed // MinimalResourceLimitExceeded - node can't be removed because it would violate cluster-wide minimal resource limits. MinimalResourceLimitExceeded // CurrentlyBeingDeleted - node can't be removed because it's already in the process of being deleted.