diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index b7c5c4c6e939..8c871c31716c 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -142,4 +142,6 @@ type AutoscalingOptions struct { AWSUseStaticInstanceList bool // Path to kube configuration if available KubeConfigPath string + // Enable or disable cordon nodes functionality before termiante during downscale process + CordonNodeBeforeTerminate bool } diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index 2df6cb072c62..4ef26fbedb1f 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -1058,7 +1058,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k return deletedNodes, errors.NewAutoscalerError( errors.CloudProviderError, "failed to find node group for %s", node.Name) } - taintErr := deletetaint.MarkToBeDeleted(node, client) + taintErr := deletetaint.MarkToBeDeleted(node, client, sd.context.CordonNodeBeforeTerminate) if taintErr != nil { recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr) return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr) @@ -1074,7 +1074,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k // If we fail to delete the node we want to remove delete taint defer func() { if deleteErr != nil { - deletetaint.CleanToBeDeleted(nodeToDelete, client) + deletetaint.CleanToBeDeleted(nodeToDelete, client, sd.context.CordonNodeBeforeTerminate) recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", deleteErr) } else { sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name) @@ -1110,7 +1110,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, deleteSuccessful := false drainSuccessful := false - if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet); err != nil { + if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate); err != nil { sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)} } @@ -1121,7 +1121,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, // If we fail to evict all the pods from the node we want to remove delete taint defer func() { if !deleteSuccessful { - deletetaint.CleanToBeDeleted(node, sd.context.ClientSet) + deletetaint.CleanToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate) if !drainSuccessful { sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown") } else { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index bcbf14c28c4b..7c5b2cb97e4a 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -172,6 +172,7 @@ var ( balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar") awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only") enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled") + cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminate", false, "Should CA cordon nodes before termiante during downscale process") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -240,6 +241,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { KubeConfigPath: *kubeConfigFile, NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout, AWSUseStaticInstanceList: *awsUseStaticInstanceList, + CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate, } } diff --git a/cluster-autoscaler/utils/cordon/cordon.go b/cluster-autoscaler/utils/cordon/cordon.go new file mode 100644 index 000000000000..2dccfb1e3c3f --- /dev/null +++ b/cluster-autoscaler/utils/cordon/cordon.go @@ -0,0 +1,137 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cordon + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kube_client "k8s.io/client-go/kubernetes" + "k8s.io/klog" +) + +const ( + // CordonTaint is a taint used to show that the node is unschedulable and cordoned by CA. + CordonTaint = "CordonedByClusterAutoscaler" +) + +var ( + maxRetryDeadlineForCordon time.Duration = 5 * time.Second + conflictCordonRetryInterval time.Duration = 750 * time.Millisecond +) + +// CordonNode node make it unschedulable +func CordonNode(node *apiv1.Node, client kube_client.Interface) error { + if node.Spec.Unschedulable { + if hasNodeCordonTaint(node) { + klog.V(1).Infof("Node %v already was cordoned by Cluster Autoscaler", node.Name) + return nil + } + if !hasNodeCordonTaint(node) { + klog.V(1).Infof("Skip cordonning because node %v was not cordoned by Cluster Autoscaler", node.Name) + return nil + } + } + freshNode := node.DeepCopy() + retryDeadline := time.Now().Add(maxRetryDeadlineForCordon) + var err error + refresh := false + for { + if refresh { + // Get the newest version of the node. + freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + } + freshNode.Spec.Taints = append(freshNode.Spec.Taints, apiv1.Taint{ + Key: CordonTaint, + Value: "", + Effect: apiv1.TaintEffectNoSchedule, + }) + freshNode.Spec.Unschedulable = true + _, err = client.CoreV1().Nodes().Update(freshNode) + if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + refresh = true + time.Sleep(conflictCordonRetryInterval) + continue + } + + if err != nil { + klog.Warningf("Error while cordoning node %v: %v", node.Name, err) + return nil + } + klog.V(1).Infof("Successfully cordoned node %v by Cluster Autoscaler", node.Name) + return nil + } + +} + +// UnCordonNode node make it schedulable +func UnCordonNode(node *apiv1.Node, client kube_client.Interface) error { + if node.Spec.Unschedulable && hasNodeCordonTaint(node) == false { + klog.V(1).Infof("Skip uncordonning because node %v was not cordoned by Cluster Autoscaler", node.Name) + return nil + } + freshNode := node.DeepCopy() + retryDeadline := time.Now().Add(maxRetryDeadlineForCordon) + var err error + refresh := false + for { + if refresh { + // Get the newest version of the node. + freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + refresh = false + } + newTaints := make([]apiv1.Taint, 0) + for _, taint := range freshNode.Spec.Taints { + if taint.Key != CordonTaint { + newTaints = append(newTaints, taint) + } + } + + if len(newTaints) != len(freshNode.Spec.Taints) { + freshNode.Spec.Taints = newTaints + } else { + refresh = true + continue + } + + freshNode.Spec.Unschedulable = false + _, err = client.CoreV1().Nodes().Update(freshNode) + if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + refresh = true + time.Sleep(conflictCordonRetryInterval) + continue + } + + if err != nil { + klog.Warningf("Error while uncordoning node %v: %v", node.Name, err) + return nil + } + klog.V(1).Infof("Successfully uncordoned node %v by Cluster Autoscaler", node.Name) + return nil + } +} + +func hasNodeCordonTaint(node *apiv1.Node) bool { + for _, taint := range node.Spec.Taints { + if taint.Key == CordonTaint { + return true + } + } + return false +} diff --git a/cluster-autoscaler/utils/deletetaint/delete.go b/cluster-autoscaler/utils/deletetaint/delete.go index b3ea3dda57d9..07bb317b01c4 100644 --- a/cluster-autoscaler/utils/deletetaint/delete.go +++ b/cluster-autoscaler/utils/deletetaint/delete.go @@ -25,6 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/utils/cordon" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" @@ -57,8 +58,13 @@ func getKeyShortName(key string) string { } // MarkToBeDeleted sets a taint that makes the node unschedulable. -func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error { - return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) +func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) error { + var err error + err = addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) + if err == nil && cordonNode { + _ = cordon.CordonNode(node, client) + } + return err } // MarkDeletionCandidate sets a soft taint that makes the node preferably unschedulable. @@ -164,8 +170,12 @@ func getTaintTime(node *apiv1.Node, taintKey string) (*time.Time, error) { } // CleanToBeDeleted cleans CA's NoSchedule taint from a node. -func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) { - return cleanTaint(node, client, ToBeDeletedTaint) +func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) (bool, error) { + successfulDelete, err := cleanTaint(node, client, ToBeDeletedTaint) + if successfulDelete && cordonNode { + _ = cordon.UnCordonNode(node, client) + } + return successfulDelete, err } // CleanDeletionCandidate cleans CA's soft NoSchedule taint from a node. diff --git a/cluster-autoscaler/utils/deletetaint/delete_test.go b/cluster-autoscaler/utils/deletetaint/delete_test.go index a36f05da07ca..3e80d90a84cc 100644 --- a/cluster-autoscaler/utils/deletetaint/delete_test.go +++ b/cluster-autoscaler/utils/deletetaint/delete_test.go @@ -42,7 +42,7 @@ func TestMarkNodes(t *testing.T) { defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond)) node := BuildTestNode("node", 1000, 1000) fakeClient := buildFakeClientWithConflicts(t, node) - err := MarkToBeDeleted(node, fakeClient) + err := MarkToBeDeleted(node, fakeClient, false) assert.NoError(t, err) updatedNode := getNode(t, fakeClient, "node") @@ -88,7 +88,7 @@ func TestQueryNodes(t *testing.T) { defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond)) node := BuildTestNode("node", 1000, 1000) fakeClient := buildFakeClientWithConflicts(t, node) - err := MarkToBeDeleted(node, fakeClient) + err := MarkToBeDeleted(node, fakeClient, false) assert.NoError(t, err) updatedNode := getNode(t, fakeClient, "node") @@ -125,7 +125,7 @@ func TestCleanNodes(t *testing.T) { updatedNode := getNode(t, fakeClient, "node") assert.True(t, HasToBeDeletedTaint(updatedNode)) - cleaned, err := CleanToBeDeleted(node, fakeClient) + cleaned, err := CleanToBeDeleted(node, fakeClient, false) assert.True(t, cleaned) assert.NoError(t, err)