Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding functionality to cordon the node before destroying it. #3649

Merged
merged 1 commit into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ type AutoscalingOptions struct {
// ClusterAPICloudConfigAuthoritative tells the Cluster API provider to treat the CloudConfig option as authoritative and
// not use KubeConfigPath as a fallback when it is not provided.
ClusterAPICloudConfigAuthoritative bool
// Enable or disable cordon nodes functionality before terminating the node during downscale process
CordonNodeBeforeTerminate bool
}
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
return deletedNodes, errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s", node.Name)
}
taintErr := deletetaint.MarkToBeDeleted(node, client)
taintErr := deletetaint.MarkToBeDeleted(node, client, sd.context.CordonNodeBeforeTerminate)
if taintErr != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", taintErr)
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
Expand All @@ -1075,7 +1075,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
// If we fail to delete the node we want to remove delete taint
defer func() {
if deleteErr != nil {
deletetaint.CleanToBeDeleted(nodeToDelete, client)
deletetaint.CleanToBeDeleted(nodeToDelete, client, sd.context.CordonNodeBeforeTerminate)
recorder.Eventf(nodeToDelete, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to delete empty node: %v", deleteErr)
} else {
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod,
deleteSuccessful := false
drainSuccessful := false

if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet); err != nil {
if err := deletetaint.MarkToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate); err != nil {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return status.NodeDeleteResult{ResultType: status.NodeDeleteErrorFailedToMarkToBeDeleted, Err: errors.ToAutoscalerError(errors.ApiCallError, err)}
}
Expand All @@ -1122,7 +1122,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod,
// If we fail to evict all the pods from the node we want to remove delete taint
defer func() {
if !deleteSuccessful {
deletetaint.CleanToBeDeleted(node, sd.context.ClientSet)
deletetaint.CleanToBeDeleted(node, sd.context.ClientSet, sd.context.CordonNodeBeforeTerminate)
if !drainSuccessful {
sd.context.Recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to drain the node, aborting ScaleDown")
} else {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
} else {
deletetaint.CleanAllToBeDeleted(readyNodes,
a.AutoscalingContext.ClientSet, a.Recorder)
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
deletetaint.CleanAllDeletionCandidates(readyNodes,
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ var (
awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only")
enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled")
clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only")
cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process")
)

func createAutoscalingOptions() config.AutoscalingOptions {
Expand Down Expand Up @@ -243,6 +244,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout,
AWSUseStaticInstanceList: *awsUseStaticInstanceList,
ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative,
CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,
}
}

Expand Down
39 changes: 24 additions & 15 deletions cluster-autoscaler/utils/deletetaint/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ func getKeyShortName(key string) string {
}

// MarkToBeDeleted sets a taint that makes the node unschedulable.
func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error {
return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) error {
return addTaint(node, client, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, cordonNode)
}

// MarkDeletionCandidate sets a soft taint that makes the node preferably unschedulable.
func MarkDeletionCandidate(node *apiv1.Node, client kube_client.Interface) error {
return addTaint(node, client, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
return addTaint(node, client, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
}

func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect) error {
func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, effect apiv1.TaintEffect, cordonNode bool) error {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
Expand All @@ -81,7 +81,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
}
}

if !addTaintToSpec(freshNode, taintKey, effect) {
if !addTaintToSpec(freshNode, taintKey, effect, cordonNode) {
if !refresh {
// Make sure we have the latest version before skipping update.
refresh = true
Expand All @@ -105,7 +105,7 @@ func addTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, e
}
}

func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect) bool {
func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect, cordonNode bool) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == taintKey {
klog.V(2).Infof("%v already present on node %v, taint: %v", taintKey, node.Name, taint)
Expand All @@ -117,6 +117,10 @@ func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect)
Value: fmt.Sprint(time.Now().Unix()),
Effect: effect,
})
if cordonNode {
klog.V(1).Infof("Marking node %v to be cordoned by Cluster Autoscaler", node.Name)
node.Spec.Unschedulable = true
}
return true
}

Expand Down Expand Up @@ -164,16 +168,16 @@ func getTaintTime(node *apiv1.Node, taintKey string) (*time.Time, error) {
}

// CleanToBeDeleted cleans CA's NoSchedule taint from a node.
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) {
return cleanTaint(node, client, ToBeDeletedTaint)
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface, cordonNode bool) (bool, error) {
return cleanTaint(node, client, ToBeDeletedTaint, cordonNode)
}

// CleanDeletionCandidate cleans CA's soft NoSchedule taint from a node.
func CleanDeletionCandidate(node *apiv1.Node, client kube_client.Interface) (bool, error) {
return cleanTaint(node, client, DeletionCandidateTaint)
return cleanTaint(node, client, DeletionCandidateTaint, false)
}

func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string) (bool, error) {
func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string, cordonNode bool) (bool, error) {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
Expand Down Expand Up @@ -205,6 +209,10 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
}

freshNode.Spec.Taints = newTaints
if cordonNode {
klog.V(1).Infof("Marking node %v to be uncordoned by Cluster Autoscaler", freshNode.Name)
freshNode.Spec.Unschedulable = false
}
_, err = client.CoreV1().Nodes().Update(context.TODO(), freshNode, metav1.UpdateOptions{})

if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
Expand All @@ -223,21 +231,22 @@ func cleanTaint(node *apiv1.Node, client kube_client.Interface, taintKey string)
}

// CleanAllToBeDeleted cleans ToBeDeleted taints from given nodes.
func CleanAllToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) {
cleanAllTaints(nodes, client, recorder, ToBeDeletedTaint)
func CleanAllToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, cordonNode bool) {
cleanAllTaints(nodes, client, recorder, ToBeDeletedTaint, cordonNode)
}

// CleanAllDeletionCandidates cleans DeletionCandidate taints from given nodes.
func CleanAllDeletionCandidates(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder) {
cleanAllTaints(nodes, client, recorder, DeletionCandidateTaint)
cleanAllTaints(nodes, client, recorder, DeletionCandidateTaint, false)
}

func cleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder, taintKey string) {
func cleanAllTaints(nodes []*apiv1.Node, client kube_client.Interface, recorder kube_record.EventRecorder,
taintKey string, cordonNode bool) {
for _, node := range nodes {
if !hasTaint(node, taintKey) {
continue
}
cleaned, err := cleanTaint(node, client, taintKey)
cleaned, err := cleanTaint(node, client, taintKey, cordonNode)
if err != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ClusterAutoscalerCleanup",
"failed to clean %v on node %v: %v", getKeyShortName(taintKey), node.Name, err)
Expand Down
58 changes: 50 additions & 8 deletions cluster-autoscaler/utils/deletetaint/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMarkNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
fakeClient := buildFakeClientWithConflicts(t, node)
err := MarkToBeDeleted(node, fakeClient)
err := MarkToBeDeleted(node, fakeClient, false)
assert.NoError(t, err)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -65,7 +65,7 @@ func TestSoftMarkNodes(t *testing.T) {
func TestCheckNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -76,7 +76,7 @@ func TestCheckNodes(t *testing.T) {
func TestSoftCheckNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -88,7 +88,7 @@ func TestQueryNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
fakeClient := buildFakeClientWithConflicts(t, node)
err := MarkToBeDeleted(node, fakeClient)
err := MarkToBeDeleted(node, fakeClient, false)
assert.NoError(t, err)

updatedNode := getNode(t, fakeClient, "node")
Expand Down Expand Up @@ -119,25 +119,67 @@ func TestSoftQueryNodes(t *testing.T) {
func TestCleanNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient)
cleaned, err := CleanToBeDeleted(node, fakeClient, false)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)
}

func TestCleanNodesWithCordon(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, true)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient, true)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.False(t, updatedNode.Spec.Unschedulable)
}

func TestCleanNodesWithCordonOnOff(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, ToBeDeletedTaint, apiv1.TaintEffectNoSchedule, true)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
assert.True(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)

cleaned, err := CleanToBeDeleted(node, fakeClient, false)
assert.True(t, cleaned)
assert.NoError(t, err)

updatedNode = getNode(t, fakeClient, "node")
assert.NoError(t, err)
assert.False(t, HasToBeDeletedTaint(updatedNode))
assert.True(t, updatedNode.Spec.Unschedulable)
}

func TestSoftCleanNodes(t *testing.T) {
defer setConflictRetryInterval(setConflictRetryInterval(time.Millisecond))
node := BuildTestNode("node", 1000, 1000)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule)
addTaintToSpec(node, DeletionCandidateTaint, apiv1.TaintEffectPreferNoSchedule, false)
fakeClient := buildFakeClientWithConflicts(t, node)

updatedNode := getNode(t, fakeClient, "node")
Expand All @@ -162,7 +204,7 @@ func TestCleanAllToBeDeleted(t *testing.T) {

assert.Equal(t, 1, len(getNode(t, fakeClient, "n2").Spec.Taints))

CleanAllToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder)
CleanAllToBeDeleted([]*apiv1.Node{n1, n2}, fakeClient, fakeRecorder, false)

assert.Equal(t, 0, len(getNode(t, fakeClient, "n1").Spec.Taints))
assert.Equal(t, 0, len(getNode(t, fakeClient, "n2").Spec.Taints))
Expand Down