Skip to content

Commit

Permalink
graceful shutdown of nodes
Browse files Browse the repository at this point in the history
watch lifecycle transition of nodes in awsmachinepools
as soon as they enter the termination wait condition, cordon it and evict the pods
post eviction of pods complete the lifecycle hook that prevents immediate deletion of node on scale in event

Signed-off-by: akash-gautam <gautamakash04@gmail.com>
  • Loading branch information
akash-gautam authored and Akash Gautam committed Oct 13, 2022
1 parent 151ba05 commit 2e63b89
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 22 deletions.
275 changes: 274 additions & 1 deletion exp/controllers/awsmachinepool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@ package controllers

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/go-logr/logr"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -44,9 +53,11 @@ import (
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/scope"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/asginstancestate"
asg "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/autoscaling"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/ec2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
expclusterv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -295,12 +306,18 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP
machinePoolScope.AWSMachinePool.Spec.ProviderID = asg.ID
providerIDList := make([]string, len(asg.Instances))

instancestateSvc := asginstancestate.NewService(ec2Scope)
if err := instancestateSvc.ReconcileASGEC2Events(); err != nil {
// non fatal error, so we continue
clusterScope.Error(err, "non-fatal: failed to set up EventBridge")
}

for i, ec2 := range asg.Instances {
providerIDList[i] = fmt.Sprintf("aws:///%s/%s", ec2.AvailabilityZone, ec2.ID)
}

machinePoolScope.SetAnnotation("cluster-api-provider-aws", "true")

r.checkForTerminatingInstancesInMachinePool(ctx, clusterScope, asgsvc, machinePoolScope.Name())
machinePoolScope.AWSMachinePool.Spec.ProviderIDList = providerIDList
machinePoolScope.AWSMachinePool.Status.Replicas = int32(len(providerIDList))
machinePoolScope.AWSMachinePool.Status.Ready = true
Expand Down Expand Up @@ -370,6 +387,11 @@ func (r *AWSMachinePoolReconciler) reconcileDelete(machinePoolScope *scope.Machi
// remove finalizer
controllerutil.RemoveFinalizer(machinePoolScope.AWSMachinePool, expinfrav1.MachinePoolFinalizer)

instancestateSvc := asginstancestate.NewService(ec2Scope)
if err := instancestateSvc.DeleteASGEC2Events(); err != nil {
clusterScope.Error(err, "non-fatal: failed to delete EventBridge notifications")
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -604,3 +626,254 @@ func (r *AWSMachinePoolReconciler) getInfraCluster(ctx context.Context, log logr

return clusterScope, nil
}

func (r *AWSMachinePoolReconciler) checkForTerminatingInstancesInMachinePool(ctx context.Context, clusterScope cloud.ClusterScoper, asgsvc services.ASGInterface, asgName string) {

sqsSvs := scope.NewGlobalSQSClient(clusterScope, clusterScope)
queueURL, _ := r.getQueueURL(clusterScope.Name(), sqsSvs)
resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String(queueURL)})
if err != nil {
clusterScope.Error(err, "non-fatal: failed to receive messages from instance state queue")
return
}
for _, msg := range resp.Messages {

m := message{}
err := json.Unmarshal([]byte(*msg.Body), &m)
if err != nil {
clusterScope.Error(err, "non-fatal: failed to receive messages from instance state queue")
return
}
if r.isInstanceTerminating(m) {
clusterScope.Info("Instance shutting down", "instance-id", m.Detail.EC2InstanceId)
err = r.cordonNode(ctx, clusterScope, m.Detail.EC2InstanceId)
if err != nil {
clusterScope.Error(err, "non-fatal: failed to cordon the node", "instance-id", m.Detail.EC2InstanceId)
} else {
clusterScope.Info("Node cordoned sucessfully", "instance-id", m.Detail.EC2InstanceId)
}
errs := r.evictPods(ctx, clusterScope, m.Detail.EC2InstanceId)
if len(errs) > 0 {
for i := 0; i < len(errs); i++ {
clusterScope.Error(err, "non-fatal: failed to evict pod from the node", "instance-id", m.Detail.EC2InstanceId)
}
} else {
clusterScope.Info("All pods evicted sucessfully from the node", "instance-id", m.Detail.EC2InstanceId)
}
err = asgsvc.CompleteLifeCycleEvent(asgName, m.Detail.EC2InstanceId)
if err != nil {
clusterScope.Error(err, "non-fatal: failed to remove pre deletion hook for the node", "instance-id", m.Detail.EC2InstanceId)
} else {
clusterScope.Info("Pre deletion hook removed sucessfully from the node", "instance-id", m.Detail.EC2InstanceId)
}
_, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
clusterScope.Error(err, "error deleting message", "queueURL", queueURL, "messageReceiptHandle", msg.ReceiptHandle)
}
} else {
_, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle,
})

if err != nil {
clusterScope.Error(err, "error deleting message", "queueURL", queueURL, "messageReceiptHandle", msg.ReceiptHandle)
}
}
}
}

func (r *AWSMachinePoolReconciler) getQueueURL(clusterName string, sqsSvs sqsiface.SQSAPI) (string, error) {
queueName := asginstancestate.GenerateQueueName(clusterName)
resp, err := sqsSvs.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(queueName)})
if err != nil {
return "", err
}

return *resp.QueueUrl, nil
}

func (r *AWSMachinePoolReconciler) isInstanceTerminating(msg message) bool {

if msg.Source != "aws.autoscaling" || msg.Detail.LifecycleTransition == "" {
return false
}
if msg.Detail.LifecycleTransition == LifecycleTransitionTerminating {
return true
}
return false
}

func (r *AWSMachinePoolReconciler) cordonNode(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) error {

node, err := r.getNodeFromInstanceID(ctx, clusterScope, nodeInstanceID)
if err != nil {
return err
}

// disable schedulabling of pods on the node
node.Spec.Unschedulable = true

workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj()))
if err != nil {
return err
}

if err := workloadClient.Update(context.Background(), node); err != nil {
return err
}

return nil
}

func (r *AWSMachinePoolReconciler) evictPods(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) []error {
pods, err := r.podsToEvict(ctx, clusterScope, nodeInstanceID)
if err != nil {
return []error{fmt.Errorf("failed to get the list of pods to evict: %v", err)}
}
errCh := make(chan error, len(pods))
retErrs := []error{}

var wg sync.WaitGroup
var isDone bool
defer func() { isDone = true }()

wg.Add(len(pods))

for _, pod := range pods {
go func(p corev1.Pod) {
defer wg.Done()
for {
if isDone {
return
}
err := r.evictPod(ctx, clusterScope, &p)
if err == nil {
clusterScope.Info("Successfully evicted pod", "namespace", p.Namespace, "pod", p.Name, "instance-id", nodeInstanceID)
return
} else {
errCh <- fmt.Errorf("error evicting pod %s/%s on node : %v", p.Namespace, p.Name, err)
return
}
}
}(pod)
}

finished := make(chan struct{})
go func() { wg.Wait(); finished <- struct{}{} }()

select {
case <-finished:
break
case err := <-errCh:
retErrs = append(retErrs, err)
}

return retErrs
}

func (r *AWSMachinePoolReconciler) podsToEvict(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) ([]corev1.Pod, error) {

var podsToEvict []corev1.Pod
node, err := r.getNodeFromInstanceID(ctx, clusterScope, nodeInstanceID)
if err != nil {
return podsToEvict, nil
}
workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj()))
if err != nil {
return podsToEvict, nil
}
// filter to list all the pods on the node.
listOptions := client.ListOptions{
Namespace: metav1.NamespaceAll,
Raw: &metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String()},
}

listOptions.ApplyToList(&listOptions)

podList := &corev1.PodList{}

if err := workloadClient.List(context.Background(), podList, &listOptions); err != nil {
return podsToEvict, nil
}
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
continue
}
if controllerRef := metav1.GetControllerOf(&pod); controllerRef != nil && controllerRef.Kind == "DaemonSet" {
continue
}
if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found {
continue
}
podsToEvict = append(podsToEvict, pod)
}

return podsToEvict, nil
}

func (r *AWSMachinePoolReconciler) evictPod(ctx context.Context, clusterScope cloud.ClusterScoper, pod *corev1.Pod) error {

// get the config of the cluster where we need to evict pods
clusterConfig, err := remote.RESTConfig(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj()))
if err != nil {
return err
}

kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
return err
}

eviction := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}

return kubeClient.PolicyV1beta1().Evictions(eviction.Namespace).Evict(ctx, eviction)
}

func (r *AWSMachinePoolReconciler) getNodeFromInstanceID(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) (*corev1.Node, error) {

nodes := &corev1.NodeList{}
workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj()))
if err != nil {
return &corev1.Node{}, nil
}
if err := workloadClient.List(context.Background(), nodes); err != nil {
return &corev1.Node{}, nil
}
node := corev1.Node{}
for _, node = range nodes.Items {
// provider ID is of the format aws:///us-east-1c/i-016bbceabf8257d39
if strings.Contains(node.Spec.ProviderID, nodeInstanceID) {
break
}
}
return &node, nil
}

type message struct {
Source string `json:"source"`
DetailType string `json:"detail-type,omitempty"`
Detail *messageDetail `json:"detail,omitempty"`
}

type messageDetail struct {
EC2InstanceId string `json:"EC2InstanceId,omitempty"`
LifecycleTransition LifecycleTransition `json:"LifecycleTransition"`
}

// LifecycleTransition describes the state of an AWS instance lauched via. AWS auto scaling group
type LifecycleTransition string

var (
// LifecycleTransitionTerminating is the string representing an instance in a terminating wait state
LifecycleTransitionTerminating = LifecycleTransition("autoscaling:EC2_INSTANCE_TERMINATING")
)
4 changes: 2 additions & 2 deletions pkg/cloud/services/asginstancestate/asgec2events.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloud/services/asginstancestate/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
Copyright 2020 The Kubernetes Authors.
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2e63b89

Please sign in to comment.