From e92f8547a2aab650bc4c3ce7bb0adfe52bda82e5 Mon Sep 17 00:00:00 2001 From: Oleksandr Fradynskyi Date: Wed, 1 Apr 2020 20:51:09 +0300 Subject: [PATCH 1/3] Add cordon nodes before downscaling --- .../config/autoscaling_options.go | 2 + cluster-autoscaler/main.go | 2 + cluster-autoscaler/utils/cordon/cordon.go | 132 ++++++++++++++++++ .../utils/deletetaint/delete.go | 17 ++- 4 files changed, 151 insertions(+), 2 deletions(-) create mode 100644 cluster-autoscaler/utils/cordon/cordon.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index b7c5c4c6e939..8c871c31716c 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -142,4 +142,6 @@ type AutoscalingOptions struct { AWSUseStaticInstanceList bool // Path to kube configuration if available KubeConfigPath string + // Enable or disable cordon nodes functionality before termiante during downscale process + CordonNodeBeforeTerminate bool } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index bcbf14c28c4b..7c5b2cb97e4a 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -172,6 +172,7 @@ var ( balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar") awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only") enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled") + cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminate", false, "Should CA cordon nodes before termiante during downscale process") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -240,6 +241,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { KubeConfigPath: *kubeConfigFile, NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout, AWSUseStaticInstanceList: *awsUseStaticInstanceList, + CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate, } } diff --git a/cluster-autoscaler/utils/cordon/cordon.go b/cluster-autoscaler/utils/cordon/cordon.go new file mode 100644 index 000000000000..0a95287a9538 --- /dev/null +++ b/cluster-autoscaler/utils/cordon/cordon.go @@ -0,0 +1,132 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cordon + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kube_client "k8s.io/client-go/kubernetes" + "k8s.io/klog" +) + +const ( + // CordonTaint is a taint used to show that the node is unschedulable and cordoned by CA. + CordonTaint = "CordonedByClusterAutoscaler" +) + +var ( + maxRetryDeadlineForCordon time.Duration = 5 * time.Second + conflictCordonRetryInterval time.Duration = 750 * time.Millisecond +) + +// CordonNode node make it unschedulable +func CordonNode(node *apiv1.Node, client kube_client.Interface) error { + if node.Spec.Unschedulable { + if hasNodeCordonTaint(node) { + klog.V(1).Infof("Node %v already was cordoned by Cluster Autoscaler", node.Name) + return nil + } else { + klog.V(1).Infof("Skip cordonning because node %v was not cordoned by Cluster Autoscaler", node.Name) + return nil + } + } + freshNode := node.DeepCopy() + retryDeadline := time.Now().Add(maxRetryDeadlineForCordon) + var err error + refresh := false + for { + if refresh { + // Get the newest version of the node. + freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + } + freshNode.Spec.Taints = append(freshNode.Spec.Taints, apiv1.Taint{ + Key: CordonTaint, + Value: "", + Effect: apiv1.TaintEffectNoSchedule, + }) + freshNode.Spec.Unschedulable = true + _, err = client.CoreV1().Nodes().Update(freshNode) + if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + refresh = true + time.Sleep(conflictCordonRetryInterval) + continue + } + + if err != nil { + klog.Warningf("Error while cordoning node %v: %v", node.Name, err) + return nil + } + } + +} + +// UnCordonNode node make it schedulable +func UnCordonNode(node *apiv1.Node, client kube_client.Interface) error { + if node.Spec.Unschedulable && hasNodeCordonTaint(node) == false { + klog.V(1).Infof("Skip uncordonning because node %v was not cordoned by Cluster Autoscaler", node.Name) + return nil + } + freshNode := node.DeepCopy() + retryDeadline := time.Now().Add(maxRetryDeadlineForCordon) + var err error + refresh := false + for { + if refresh { + // Get the newest version of the node. + freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{}) + refresh = false + } + newTaints := make([]apiv1.Taint, 0) + for _, taint := range freshNode.Spec.Taints { + if taint.Key != CordonTaint { + newTaints = append(newTaints, taint) + } + } + + if len(newTaints) != len(freshNode.Spec.Taints) { + freshNode.Spec.Taints = newTaints + } else { + refresh = true + continue + } + + freshNode.Spec.Unschedulable = false + _, err = client.CoreV1().Nodes().Update(freshNode) + if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) { + refresh = true + time.Sleep(conflictCordonRetryInterval) + continue + } + + if err != nil { + klog.Warningf("Error while uncordoning node %v: %v", node.Name, err) + return nil + } + } +} + +func hasNodeCordonTaint(node *apiv1.Node) bool { + for _, taint := range node.Spec.Taints { + if taint.Key == CordonTaint { + return true + } + } + return false +} diff --git a/cluster-autoscaler/utils/deletetaint/delete.go b/cluster-autoscaler/utils/deletetaint/delete.go index b3ea3dda57d9..4e90e73e03f8 100644 --- a/cluster-autoscaler/utils/deletetaint/delete.go +++ b/cluster-autoscaler/utils/deletetaint/delete.go @@ -25,6 +25,8 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/utils/cordon" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" @@ -42,6 +44,8 @@ const ( var ( maxRetryDeadline time.Duration = 5 * time.Second conflictRetryInterval time.Duration = 750 * time.Millisecond + // cordonNodeOptions variable for cordon options + cordonNodeOptions config.AutoscalingOptions ) // getKeyShortName converts taint key to short name for logging @@ -58,7 +62,12 @@ func getKeyShortName(key string) string { // MarkToBeDeleted sets a taint that makes the node unschedulable. func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error { - return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) + var err error + err = addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) + if err == nil && cordonNodeOptions.CordonNodeBeforeTerminate { + _ = cordon.CordonNode(node, client) + } + return err } // MarkDeletionCandidate sets a soft taint that makes the node preferably unschedulable. @@ -165,7 +174,11 @@ func getTaintTime(node *apiv1.Node, taintKey string) (*time.Time, error) { // CleanToBeDeleted cleans CA's NoSchedule taint from a node. func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) { - return cleanTaint(node, client, ToBeDeletedTaint) + successfulDelete, err := cleanTaint(node, client, ToBeDeletedTaint) + if successfulDelete && cordonNodeOptions.CordonNodeBeforeTerminate { + _ = cordon.UnCordonNode(node, client) + } + return successfulDelete, err } // CleanDeletionCandidate cleans CA's soft NoSchedule taint from a node. From 9df2de95625103080f03ab70daaf0bc3692ef1ab Mon Sep 17 00:00:00 2001 From: Oleksandr Fradynskyi Date: Wed, 1 Apr 2020 21:38:37 +0300 Subject: [PATCH 2/3] fix 'golint' problems: /home/travis/gopath/src/k8s.io/autoscaler/cluster-autoscaler/utils/cordon/cordon.go:45:10: if block ends with a return statement, so drop this else and outdent its block --- cluster-autoscaler/utils/cordon/cordon.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cluster-autoscaler/utils/cordon/cordon.go b/cluster-autoscaler/utils/cordon/cordon.go index 0a95287a9538..dded044892db 100644 --- a/cluster-autoscaler/utils/cordon/cordon.go +++ b/cluster-autoscaler/utils/cordon/cordon.go @@ -42,7 +42,8 @@ func CordonNode(node *apiv1.Node, client kube_client.Interface) error { if hasNodeCordonTaint(node) { klog.V(1).Infof("Node %v already was cordoned by Cluster Autoscaler", node.Name) return nil - } else { + } + if !hasNodeCordonTaint(node) { klog.V(1).Infof("Skip cordonning because node %v was not cordoned by Cluster Autoscaler", node.Name) return nil } From 7515e801ade2cb393325f44504492b272623f22e Mon Sep 17 00:00:00 2001 From: Oleksandr Fradynskyi Date: Tue, 14 Apr 2020 14:33:27 +0300 Subject: [PATCH 3/3] Fixed startup option and some improvements for cordon functionality. --- cluster-autoscaler/core/scale_down.go | 8 ++++---- cluster-autoscaler/utils/cordon/cordon.go | 4 ++++ cluster-autoscaler/utils/deletetaint/delete.go | 11 ++++------- cluster-autoscaler/utils/deletetaint/delete_test.go | 6 +++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index 2df6cb072c62..4ef26fbedb1f 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -1058,7 +1058,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k return deletedNodes, errors.NewAutoscalerError( errors.CloudProviderError, "failed to find node group for %s", node.Name) } - taintErr := deletetaint.MarkToBeDeleted(node, client) + taintErr := deletetaint.MarkToBeDeleted(node, client, sd.context.CordonNodeBeforeTerminate) if taintErr != nil { recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr) return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr) @@ -1074,7 +1074,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k // If we fail to delete the node we want to remove delete taint defer func() { if deleteErr != nil { - deletetaint.CleanToBeDeleted(nodeToDelete, client) + deletetaint.CleanToBeDeleted(nodeToDelete, client, sd.context.CordonNodeBeforeTerminate) recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", deleteErr) } else { sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name) @@ -1110,7 +1110,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, deleteSuccessful := false drainSuccessful := false - if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet); err != nil { + if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate); err != nil { sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)} } @@ -1121,7 +1121,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, // If we fail to evict all the pods from the node we want to remove delete taint defer func() { if !deleteSuccessful { - deletetaint.CleanToBeDeleted(node, sd.context.ClientSet) + deletetaint.CleanToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate) if !drainSuccessful { sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown") } else { diff --git a/cluster-autoscaler/utils/cordon/cordon.go b/cluster-autoscaler/utils/cordon/cordon.go index dded044892db..2dccfb1e3c3f 100644 --- a/cluster-autoscaler/utils/cordon/cordon.go +++ b/cluster-autoscaler/utils/cordon/cordon.go @@ -74,6 +74,8 @@ func CordonNode(node *apiv1.Node, client kube_client.Interface) error { klog.Warningf("Error while cordoning node %v: %v", node.Name, err) return nil } + klog.V(1).Infof("Successfully cordoned node %v by Cluster Autoscaler", node.Name) + return nil } } @@ -120,6 +122,8 @@ func UnCordonNode(node *apiv1.Node, client kube_client.Interface) error { klog.Warningf("Error while uncordoning node %v: %v", node.Name, err) return nil } + klog.V(1).Infof("Successfully uncordoned node %v by Cluster Autoscaler", node.Name) + return nil } } diff --git a/cluster-autoscaler/utils/deletetaint/delete.go b/cluster-autoscaler/utils/deletetaint/delete.go index 4e90e73e03f8..07bb317b01c4 100644 --- a/cluster-autoscaler/utils/deletetaint/delete.go +++ b/cluster-autoscaler/utils/deletetaint/delete.go @@ -25,7 +25,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/utils/cordon" kube_client "k8s.io/client-go/kubernetes" kube_record "k8s.io/client-go/tools/record" @@ -44,8 +43,6 @@ const ( var ( maxRetryDeadline time.Duration = 5 * time.Second conflictRetryInterval time.Duration = 750 * time.Millisecond - // cordonNodeOptions variable for cordon options - cordonNodeOptions config.AutoscalingOptions ) // getKeyShortName converts taint key to short name for logging @@ -61,10 +58,10 @@ func getKeyShortName(key string) string { } // MarkToBeDeleted sets a taint that makes the node unschedulable. -func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error { +func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) error { var err error err = addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule) - if err == nil && cordonNodeOptions.CordonNodeBeforeTerminate { + if err == nil && cordonNode { _ = cordon.CordonNode(node, client) } return err @@ -173,9 +170,9 @@ func getTaintTime(node *apiv1.Node, taintKey string) (*time.Time, error) { } // CleanToBeDeleted cleans CA's NoSchedule taint from a node. -func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) { +func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) (bool, error) { successfulDelete, err := cleanTaint(node, client, ToBeDeletedTaint) - if successfulDelete && cordonNodeOptions.CordonNodeBeforeTerminate { + if successfulDelete && cordonNode { _ = cordon.UnCordonNode(node, client) } return successfulDelete, err diff --git a/cluster-autoscaler/utils/deletetaint/delete_test.go b/cluster-autoscaler/utils/deletetaint/delete_test.go index a36f05da07ca..3e80d90a84cc 100644 --- a/cluster-autoscaler/utils/deletetaint/delete_test.go +++ b/cluster-autoscaler/utils/deletetaint/delete_test.go @@ -42,7 +42,7 @@ func TestMarkNodes(t *testing.T) { defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond)) node := BuildTestNode("node", 1000, 1000) fakeClient := buildFakeClientWithConflicts(t, node) - err := MarkToBeDeleted(node, fakeClient) + err := MarkToBeDeleted(node, fakeClient, false) assert.NoError(t, err) updatedNode := getNode(t, fakeClient, "node") @@ -88,7 +88,7 @@ func TestQueryNodes(t *testing.T) { defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond)) node := BuildTestNode("node", 1000, 1000) fakeClient := buildFakeClientWithConflicts(t, node) - err := MarkToBeDeleted(node, fakeClient) + err := MarkToBeDeleted(node, fakeClient, false) assert.NoError(t, err) updatedNode := getNode(t, fakeClient, "node") @@ -125,7 +125,7 @@ func TestCleanNodes(t *testing.T) { updatedNode := getNode(t, fakeClient, "node") assert.True(t, HasToBeDeletedTaint(updatedNode)) - cleaned, err := CleanToBeDeleted(node, fakeClient) + cleaned, err := CleanToBeDeleted(node, fakeClient, false) assert.True(t, cleaned) assert.NoError(t, err)