Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow DaemonSet pods to opt in/out from eviction #4172

Merged
merged 1 commit into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -1114,7 +1115,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
return deletedNodes, errors.ToAutoscalerError(errors.ApiCallError, taintErr)
}
deletedNodes = append(deletedNodes, node)
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, needToEvictDaemonSetPods bool) {
go func(nodeToDelete *apiv1.Node, nodeGroupForDeletedNode cloudprovider.NodeGroup, evictByDefault bool) {
sd.nodeDeletionTracker.StartDeletion(nodeGroupForDeletedNode.Id())
defer sd.nodeDeletionTracker.EndDeletion(nodeGroupForDeletedNode.Id())
var result status.NodeDeleteResult
Expand All @@ -1130,10 +1131,8 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: empty node %s removed", nodeToDelete.Name)
}
}()
if needToEvictDaemonSetPods {
if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder); err != nil {
klog.Warningf("error while evicting DS pods from an empty node: %v", err)
}
if err := evictDaemonSetPods(sd.context.ClusterSnapshot, nodeToDelete, client, sd.context.MaxGracefulTerminationSec, time.Now(), DaemonSetEvictionEmptyNodeTimeout, DeamonSetTimeBetweenEvictionRetries, recorder, evictByDefault); err != nil {
klog.Warningf("error while evicting DS pods from an empty node: %v", err)
}
deleteErr = waitForDelayDeletion(nodeToDelete, sd.context.ListerRegistry.AllNodeLister(), sd.context.AutoscalingOptions.NodeDeletionDelayTimeout)
if deleteErr != nil {
Expand Down Expand Up @@ -1161,7 +1160,7 @@ func (sd *ScaleDown) scheduleDeleteEmptyNodes(emptyNodes []*apiv1.Node, client k

// Create eviction object for all DaemonSet pods on the node
func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete *apiv1.Node, client kube_client.Interface, maxGracefulTerminationSec int, timeNow time.Time, dsEvictionTimeout time.Duration, waitBetweenRetries time.Duration,
recorder kube_record.EventRecorder) error {
recorder kube_record.EventRecorder, evictByDefault bool) error {
nodeInfo, err := clusterSnapshot.NodeInfos().Get(nodeToDelete.Name)
if err != nil {
return fmt.Errorf("failed to get node info for %s", nodeToDelete.Name)
Expand All @@ -1171,6 +1170,8 @@ func evictDaemonSetPods(clusterSnapshot simulator.ClusterSnapshot, nodeToDelete
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
}

daemonSetPods = daemonset.PodsToEvict(daemonSetPods, evictByDefault)

dsEviction := make(chan status.PodEvictionResult, len(daemonSetPods))

// Perform eviction of DaemonSet pods
Expand Down Expand Up @@ -1226,7 +1227,7 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo

sd.context.Recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "marked the node as toBeDeleted/unschedulable")

daemonSetPods = podsToEvict(daemonSetPods, sd.context.DaemonSetEvictionForOccupiedNodes)
daemonSetPods = daemonset.PodsToEvict(daemonSetPods, sd.context.DaemonSetEvictionForOccupiedNodes)

// attempt drain
evictionResults, err := drainNode(node, pods, daemonSetPods, sd.context.ClientSet, sd.context.Recorder, sd.context.MaxGracefulTerminationSec, MaxPodEvictionTime, EvictionRetryTime, PodEvictionHeadroom)
Expand All @@ -1249,13 +1250,6 @@ func (sd *ScaleDown) deleteNode(node *apiv1.Node, pods []*apiv1.Pod, daemonSetPo
return status.NodeDeleteResult{ResultType: status.NodeDeleteOk}
}

func podsToEvict(pods []*apiv1.Pod, shouldEvict bool) []*apiv1.Pod {
if shouldEvict {
return pods
}
return []*apiv1.Pod{}
}

func evictPod(podToEvict *apiv1.Pod, isDaemonSetPod bool, client kube_client.Interface, recorder kube_record.EventRecorder,
maxGracefulTerminationSec int, retryUntil time.Time, waitBetweenRetries time.Duration) status.PodEvictionResult {
recorder.Eventf(podToEvict, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down")
Expand Down
85 changes: 60 additions & 25 deletions cluster-autoscaler/core/scale_down_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -1217,33 +1218,35 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
dsEvictionTimeout time.Duration
evictionSuccess bool
err error
evictByDefault bool
extraAnnotationValue map[string]string
expectNotEvicted map[string]struct{}
}{
{
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: nil,
name: "Successful attempt to evict DaemonSet pods",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
},
{
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
name: "Failed to get node info",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to get node info"),
evictByDefault: true,
},
{
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
evictionTimeoutExceed: false,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
name: "Failed to create DaemonSet eviction",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: false,
err: fmt.Errorf("following DaemonSet pod failed to evict on the"),
evictByDefault: true,
},
{
name: "Eviction timeout exceed",
Expand All @@ -1253,11 +1256,33 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
dsEvictionTimeout: 100 * time.Millisecond,
evictionSuccess: true,
err: fmt.Errorf("failed to create DaemonSet eviction for"),
evictByDefault: true,
},
{
name: "Evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
extraAnnotationValue: map[string]string{"d1": "true"},
expectNotEvicted: map[string]struct{}{"d2": {}},
},
{
name: "Don't evict single pod due to annotation",
dsPods: []string{"d1", "d2"},
nodeInfoSuccess: true,
dsEvictionTimeout: 5000 * time.Millisecond,
evictionSuccess: true,
evictByDefault: true,
extraAnnotationValue: map[string]string{"d1": "false"},
expectNotEvicted: map[string]struct{}{"d1": {}},
},
}

for _, scenario := range testScenarios {
scenario := scenario
t.Run(scenario.name, func(t *testing.T) {
t.Parallel()
options := config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.5,
Expand All @@ -1277,6 +1302,9 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
ds := BuildTestPod(dsName, 100, 0)
ds.Spec.NodeName = "n1"
ds.OwnerReferences = GenerateOwnerReferences("", "DaemonSet", "", "")
if v, ok := scenario.extraAnnotationValue[dsName]; ok {
ds.Annotations[daemonset.EnableDsEvictionKey] = v
}
dsPods[i] = ds
}

Expand Down Expand Up @@ -1312,18 +1340,25 @@ func TestDaemonSetEvictionForEmptyNodes(t *testing.T) {
simulator.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, []*apiv1.Node{}, []*apiv1.Pod{})
}

err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, scenario.dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient))
err = evictDaemonSetPods(context.ClusterSnapshot, n1, fakeClient, options.MaxGracefulTerminationSec, timeNow, scenario.dsEvictionTimeout, waitBetweenRetries, kube_util.CreateEventRecorder(fakeClient), scenario.evictByDefault)
if scenario.err != nil {
assert.NotNil(t, err)
assert.Contains(t, err.Error(), scenario.err.Error())
return
}
assert.Nil(t, err)
deleted := make([]string, len(scenario.dsPods))
for i := 0; i < len(scenario.dsPods); i++ {
var expectEvicted []string
for _, p := range scenario.dsPods {
if _, found := scenario.expectNotEvicted[p]; found {
continue
}
expectEvicted = append(expectEvicted, p)
}
deleted := make([]string, len(expectEvicted))
for i := 0; i < len(expectEvicted); i++ {
deleted[i] = utils.GetStringFromChan(deletedPods)
}
assert.ElementsMatch(t, deleted, scenario.dsPods)
assert.ElementsMatch(t, deleted, expectEvicted)
})
}
}
Expand Down
20 changes: 20 additions & 0 deletions cluster-autoscaler/utils/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
// EnableDsEvictionKey is the name of annotation controlling whether a
// certain DaemonSet pod should be evicted.
EnableDsEvictionKey = "cluster-autoscaler.kubernetes.io/enable-ds-eviction"
)

// GetDaemonSetPodsForNode returns daemonset nodes for the given pod.
func GetDaemonSetPodsForNode(nodeInfo *schedulerframework.NodeInfo, daemonsets []*appsv1.DaemonSet, predicateChecker simulator.PredicateChecker) ([]*apiv1.Pod, error) {
result := make([]*apiv1.Pod, 0)
Expand Down Expand Up @@ -66,3 +72,17 @@ func newPod(ds *appsv1.DaemonSet, nodeName string) *apiv1.Pod {
newPod.Spec.NodeName = nodeName
return newPod
}

// PodsToEvict returns a list of DaemonSet pods that should be evicted during scale down.
func PodsToEvict(pods []*apiv1.Pod, evictByDefault bool) (evictable []*apiv1.Pod) {
for _, pod := range pods {
if a, ok := pod.Annotations[EnableDsEvictionKey]; ok {
if a == "true" {
evictable = append(evictable, pod)
}
} else if evictByDefault {
evictable = append(evictable, pod)
}
}
return
}
70 changes: 70 additions & 0 deletions cluster-autoscaler/utils/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,76 @@ func TestGetDaemonSetPodsForNode(t *testing.T) {
}
}

func TestEvictedPodsFilter(t *testing.T) {
testCases := []struct {
name string
pods map[string]string
evictionDefault bool
expectedPods []string
}{
{
name: "all pods evicted by default",
pods: map[string]string{
"p1": "",
"p2": "",
"p3": "",
},
evictionDefault: true,
expectedPods: []string{"p1", "p2", "p3"},
},
{
name: "no pods evicted by default",
pods: map[string]string{
"p1": "",
"p2": "",
"p3": "",
},
evictionDefault: false,
expectedPods: []string{},
},
{
name: "all pods evicted by default, one opt-out",
pods: map[string]string{
"p1": "",
"p2": "false",
"p3": "",
},
evictionDefault: true,
expectedPods: []string{"p1", "p3"},
},
{
name: "no pods evicted by default, one opt-in",
pods: map[string]string{
"p1": "",
"p2": "true",
"p3": "",
},
evictionDefault: false,
expectedPods: []string{"p2"},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
var dsPods []*apiv1.Pod
for n, av := range tc.pods {
p := BuildTestPod(n, 100, 0)
if av != "" {
p.Annotations[EnableDsEvictionKey] = av
}
dsPods = append(dsPods, p)
}
pte := PodsToEvict(dsPods, tc.evictionDefault)
got := make([]string, len(pte))
for i, p := range pte {
got[i] = p.Name
}
assert.ElementsMatch(t, got, tc.expectedPods)
})
}
}

func newDaemonSet(name string) *appsv1.DaemonSet {
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down