Skip to content

Commit

Permalink
Allow DaemonSet pods to opt in/out from eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
x13n committed Jun 29, 2021
1 parent 682dec3 commit dd1748e
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 35 deletions.
29 changes: 19 additions & 10 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const (
// DelayDeletionAnnotationPrefix is the prefix of annotation marking node as it needs to wait
// for other K8s components before deleting node.
DelayDeletionAnnotationPrefix = "delay-deletion.cluster-autoscaler.kubernetes.io/"
// EnableDsEvictionKey is the name of annotation controlling whether a
// certain DaemonSet pod should be evicted.
EnableDsEvictionKey = "cluster-autoscaler.kubernetes.io/enable-ds-eviction"
)

const (
Expand Down Expand Up @@ -1114,7 +1117,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
}
deletedNodes = append(deletedNodes, node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, needToEvictDaemonSetPods bool) {
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictByDefault bool) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id())
var result status.NodeDeleteResult
Expand All @@ -1130,10 +1133,8 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
}
}()
if needToEvictDaemonSetPods {
if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder); err != nil {
klog.Warningf("error while evicting DS pods from an empty node: %v", err)
}
if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder, evictByDefault); err != nil {
klog.Warningf("error while evicting DS pods from an empty node: %v", err)
}
deleteErr = waitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout)
if deleteErr != nil {
Expand Down Expand Up @@ -1161,7 +1162,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k

// Create eviction object for all DaemonSet pods on the node
func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete *apiv1.Node, client kube_client.Interface, maxGracefulTerminationSec int, timeNow time.Time, dsEvictionTimeout time.Duration, waitBetweenRetries time.Duration,
recorder kube_record.EventRecorder) error {
recorder kube_record.EventRecorder, evictByDefault bool) error {
nodeInfo, err := clusterSnapshot.NodeInfos().Get(nodeToDelete.Name)
if err != nil {
return fmt.Errorf("failed to get node info for %s", nodeToDelete.Name)
Expand All @@ -1171,6 +1172,8 @@ func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
}

daemonSetPods = podsToEvict(daemonSetPods, evictByDefault)

dsEviction := make(chan status.PodEvictionResult, len(daemonSetPods))

// Perform eviction of DaemonSet pods
Expand Down Expand Up @@ -1249,11 +1252,17 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}

func podsToEvict(pods []*apiv1.Pod, shouldEvict bool) []*apiv1.Pod {
if shouldEvict {
return pods
func podsToEvict(pods []*apiv1.Pod, evictByDefault bool) (evictable []*apiv1.Pod) {
for _, pod := range pods {
if a, ok := pod.Annotations[EnableDsEvictionKey]; ok {
if a == "true" {
evictable = append(evictable, pod)
}
} else if evictByDefault {
evictable = append(evictable, pod)
}
}
return []*apiv1.Pod{}
return
}

func evictPod(podToEvict *apiv1.Pod, isDaemonSetPod bool, client kube_client.Interface, recorder kube_record.EventRecorder,
Expand Down
154 changes: 129 additions & 25 deletions cluster-autoscaler/core/scale_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,33 +1217,35 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
dsEvictionTimeout time.Duration
evictionSuccess bool
err error
evictByDefault bool
extraAnnotationValue map[string]string
expectNotEvicted map[string]struct{}
}{
{
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: nil,
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
},
{
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
evictByDefault: true,
},
{
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
evictByDefault: true,
},
{
name: "Eviction timeout exceed",
Expand All @@ -1253,11 +1255,33 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
dsEvictionTimeout: 100 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to create DaemonSet eviction for"),
evictByDefault: true,
},
{
name: "Evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
extraAnnotationValue: map[string]string{"d1": "true"},
expectNotEvicted: map[string]struct{}{"d2": {}},
},
{
name: "Don't evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
extraAnnotationValue: map[string]string{"d1": "false"},
expectNotEvicted: map[string]struct{}{"d1": {}},
},
}

for _, scenario := range testScenarios {
scenario := scenario
t.Run(scenario.name, func(t *testing.T) {
t.Parallel()
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
Expand All @@ -1277,6 +1301,9 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
ds := BuildTestPod(dsName, 100, 0)
ds.Spec.NodeName = "n1"
ds.OwnerReferences = GenerateOwnerReferences("", "DaemonSet", "", "")
if v, ok := scenario.extraAnnotationValue[dsName]; ok {
ds.Annotations[EnableDsEvictionKey] = v
}
dsPods[i] = ds
}

Expand Down Expand Up @@ -1312,18 +1339,95 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{}, []*apiv1.Pod{})
}

err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, scenario.dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient))
err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, scenario.dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient), scenario.evictByDefault)
if scenario.err != nil {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), scenario.err.Error())
return
}
assert.Nil(t, err)
deleted := make([]string, len(scenario.dsPods))
for i := 0; i < len(scenario.dsPods); i++ {
var expectEvicted []string
for _, p := range scenario.dsPods {
if _, found := scenario.expectNotEvicted[p]; found {
continue
}
expectEvicted = append(expectEvicted, p)
}
deleted := make([]string, len(expectEvicted))
for i := 0; i < len(expectEvicted); i++ {
deleted[i] = utils.GetStringFromChan(deletedPods)
}
assert.ElementsMatch(t, deleted, scenario.dsPods)
assert.ElementsMatch(t, deleted, expectEvicted)
})
}
}

func TestEvictedPodsFilter(t *testing.T) {
testCases := []struct{
name string
pods map[string]string
evictionDefault bool
expectedPods []string
}{
{
name: "all pods evicted by default",
pods: map[string]string{
"p1": "",
"p2": "",
"p3": "",
},
evictionDefault: true,
expectedPods: []string{"p1", "p2", "p3"},
},
{
name: "no pods evicted by default",
pods: map[string]string{
"p1": "",
"p2": "",
"p3": "",
},
evictionDefault: false,
expectedPods: []string{},
},
{
name: "all pods evicted by default, one opt-out",
pods: map[string]string{
"p1": "",
"p2": "false",
"p3": "",
},
evictionDefault: true,
expectedPods: []string{"p1", "p3"},
},
{
name: "no pods evicted by default, one opt-in",
pods: map[string]string{
"p1": "",
"p2": "true",
"p3": "",
},
evictionDefault: false,
expectedPods: []string{"p2"},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
var dsPods []*apiv1.Pod
for n, av := range tc.pods {
p := BuildTestPod(n, 100, 0)
if av != "" {
p.Annotations[EnableDsEvictionKey] = av
}
dsPods = append(dsPods, p)
}
pte := podsToEvict(dsPods, tc.evictionDefault)
got := make([]string, len(pte))
for i, p := range pte {
got[i] = p.Name
}
assert.ElementsMatch(t, got, tc.expectedPods)
})
}
}
Expand Down

0 comments on commit dd1748e

Please sign in to comment.