Skip to content

Commit

Permalink
Allow DaemonSet pods to opt in/out from eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
x13n committed Jun 29, 2021
1 parent 682dec3 commit 44b8d67
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 39 deletions.
22 changes: 8 additions & 14 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -1114,7 +1115,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
}
deletedNodes = append(deletedNodes, node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, needToEvictDaemonSetPods bool) {
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictByDefault bool) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id())
var result status.NodeDeleteResult
Expand All @@ -1130,10 +1131,8 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
}
}()
if needToEvictDaemonSetPods {
if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder); err != nil {
klog.Warningf("error while evicting DS pods from an empty node: %v", err)
}
if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder, evictByDefault); err != nil {
klog.Warningf("error while evicting DS pods from an empty node: %v", err)
}
deleteErr = waitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout)
if deleteErr != nil {
Expand Down Expand Up @@ -1161,7 +1160,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k

// Create eviction object for all DaemonSet pods on the node
func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete *apiv1.Node, client kube_client.Interface, maxGracefulTerminationSec int, timeNow time.Time, dsEvictionTimeout time.Duration, waitBetweenRetries time.Duration,
recorder kube_record.EventRecorder) error {
recorder kube_record.EventRecorder, evictByDefault bool) error {
nodeInfo, err := clusterSnapshot.NodeInfos().Get(nodeToDelete.Name)
if err != nil {
return fmt.Errorf("failed to get node info for %s", nodeToDelete.Name)
Expand All @@ -1171,6 +1170,8 @@ func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
}

daemonSetPods = daemonset.PodsToEvict(daemonSetPods, evictByDefault)

dsEviction := make(chan status.PodEvictionResult, len(daemonSetPods))

// Perform eviction of DaemonSet pods
Expand Down Expand Up @@ -1226,7 +1227,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo

sd.context.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "marked the node as toBeDeleted/unschedulable")

daemonSetPods = podsToEvict(daemonSetPods, sd.context.DaemonSetEvictionForOccupiedNodes)
daemonSetPods = daemonset.PodsToEvict(daemonSetPods, sd.context.DaemonSetEvictionForOccupiedNodes)

// attempt drain
evictionResults, err := drainNode(node, pods, daemonSetPods, sd.context.ClientSet, sd.context.Recorder, sd.context.MaxGracefulTerminationSec, MaxPodEvictionTime, EvictionRetryTime, PodEvictionHeadroom)
Expand All @@ -1249,13 +1250,6 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}

func podsToEvict(pods []*apiv1.Pod, shouldEvict bool) []*apiv1.Pod {
if shouldEvict {
return pods
}
return []*apiv1.Pod{}
}

func evictPod(podToEvict *apiv1.Pod, isDaemonSetPod bool, client kube_client.Interface, recorder kube_record.EventRecorder,
maxGracefulTerminationSec int, retryUntil time.Time, waitBetweenRetries time.Duration) status.PodEvictionResult {
recorder.Eventf(podToEvict, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down")
Expand Down
85 changes: 60 additions & 25 deletions cluster-autoscaler/core/scale_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -1217,33 +1218,35 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
dsEvictionTimeout time.Duration
evictionSuccess bool
err error
evictByDefault bool
extraAnnotationValue map[string]string
expectNotEvicted map[string]struct{}
}{
{
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: nil,
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
},
{
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
evictByDefault: true,
},
{
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
evictByDefault: true,
},
{
name: "Eviction timeout exceed",
Expand All @@ -1253,11 +1256,33 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
dsEvictionTimeout: 100 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to create DaemonSet eviction for"),
evictByDefault: true,
},
{
name: "Evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
extraAnnotationValue: map[string]string{"d1": "true"},
expectNotEvicted: map[string]struct{}{"d2": {}},
},
{
name: "Don't evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
extraAnnotationValue: map[string]string{"d1": "false"},
expectNotEvicted: map[string]struct{}{"d1": {}},
},
}

for _, scenario := range testScenarios {
scenario := scenario
t.Run(scenario.name, func(t *testing.T) {
t.Parallel()
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
Expand All @@ -1277,6 +1302,9 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
ds := BuildTestPod(dsName, 100, 0)
ds.Spec.NodeName = "n1"
ds.OwnerReferences = GenerateOwnerReferences("", "DaemonSet", "", "")
if v, ok := scenario.extraAnnotationValue[dsName]; ok {
ds.Annotations[daemonset.EnableDsEvictionKey] = v
}
dsPods[i] = ds
}

Expand Down Expand Up @@ -1312,18 +1340,25 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{}, []*apiv1.Pod{})
}

err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, scenario.dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient))
err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, scenario.dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient), scenario.evictByDefault)
if scenario.err != nil {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), scenario.err.Error())
return
}
assert.Nil(t, err)
deleted := make([]string, len(scenario.dsPods))
for i := 0; i < len(scenario.dsPods); i++ {
var expectEvicted []string
for _, p := range scenario.dsPods {
if _, found := scenario.expectNotEvicted[p]; found {
continue
}
expectEvicted = append(expectEvicted, p)
}
deleted := make([]string, len(expectEvicted))
for i := 0; i < len(expectEvicted); i++ {
deleted[i] = utils.GetStringFromChan(deletedPods)
}
assert.ElementsMatch(t, deleted, scenario.dsPods)
assert.ElementsMatch(t, deleted, expectEvicted)
})
}
}
Expand Down
20 changes: 20 additions & 0 deletions cluster-autoscaler/utils/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
// EnableDsEvictionKey is the name of annotation controlling whether a
// certain DaemonSet pod should be evicted.
EnableDsEvictionKey = "cluster-autoscaler.kubernetes.io/enable-ds-eviction"
)

// GetDaemonSetPodsForNode returns daemonset nodes for the given pod.
func GetDaemonSetPodsForNode(nodeInfo *schedulerframework.NodeInfo, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker) ([]*apiv1.Pod, error) {
result := make([]*apiv1.Pod, 0)
Expand Down Expand Up @@ -66,3 +72,17 @@ func newPod(ds *appsv1.DaemonSet, nodeName string) *apiv1.Pod {
newPod.Spec.NodeName = nodeName
return newPod
}

// PodsToEvict returns a list of DaemonSet pods that should be evicted during scale down.
func PodsToEvict(pods []*apiv1.Pod, evictByDefault bool) (evictable []*apiv1.Pod) {
for _, pod := range pods {
if a, ok := pod.Annotations[EnableDsEvictionKey]; ok {
if a == "true" {
evictable = append(evictable, pod)
}
} else if evictByDefault {
evictable = append(evictable, pod)
}
}
return
}
70 changes: 70 additions & 0 deletions cluster-autoscaler/utils/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,76 @@ func TestGetDaemonSetPodsForNode(t *testing.T) {
}
}

func TestEvictedPodsFilter(t *testing.T) {
testCases := []struct {
name string
pods map[string]string
evictionDefault bool
expectedPods []string
}{
{
name: "all pods evicted by default",
pods: map[string]string{
"p1": "",
"p2": "",
"p3": "",
},
evictionDefault: true,
expectedPods: []string{"p1", "p2", "p3"},
},
{
name: "no pods evicted by default",
pods: map[string]string{
"p1": "",
"p2": "",
"p3": "",
},
evictionDefault: false,
expectedPods: []string{},
},
{
name: "all pods evicted by default, one opt-out",
pods: map[string]string{
"p1": "",
"p2": "false",
"p3": "",
},
evictionDefault: true,
expectedPods: []string{"p1", "p3"},
},
{
name: "no pods evicted by default, one opt-in",
pods: map[string]string{
"p1": "",
"p2": "true",
"p3": "",
},
evictionDefault: false,
expectedPods: []string{"p2"},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
var dsPods []*apiv1.Pod
for n, av := range tc.pods {
p := BuildTestPod(n, 100, 0)
if av != "" {
p.Annotations[EnableDsEvictionKey] = av
}
dsPods = append(dsPods, p)
}
pte := PodsToEvict(dsPods, tc.evictionDefault)
got := make([]string, len(pte))
for i, p := range pte {
got[i] = p.Name
}
assert.ElementsMatch(t, got, tc.expectedPods)
})
}
}

func newDaemonSet(name string) *appsv1.DaemonSet {
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 44b8d67

Please sign in to comment.