From 2e63b89874e08445f722e419eaa7d11836ea3e72 Mon Sep 17 00:00:00 2001 From: akash-gautam Date: Fri, 25 Feb 2022 22:03:01 +0530 Subject: [PATCH] graceful shutdown of nodes watch lifecycle transition of nodes in awsmachinepools as soon as they enter the termination wait condition, cordon it and evict the pods post eviction of pods complete the lifecycle hook that prevents immediate deletion of node on scale in event Signed-off-by: akash-gautam --- exp/controllers/awsmachinepool_controller.go | 275 +++++++++++++++++- .../services/asginstancestate/asgec2events.go | 4 +- .../services/asginstancestate/helpers_test.go | 4 +- .../mock_eventbridgeiface/doc.go | 4 +- .../eventbridgeiface_mock.go | 2 +- .../asginstancestate/mock_sqsiface/doc.go | 4 +- .../mock_sqsiface/sqsiface_mock.go | 2 +- pkg/cloud/services/asginstancestate/queue.go | 4 +- .../services/asginstancestate/queue_test.go | 4 +- pkg/cloud/services/asginstancestate/rule.go | 4 +- .../services/asginstancestate/rule_test.go | 4 +- .../services/asginstancestate/service.go | 4 +- pkg/cloud/services/interfaces.go | 2 +- 13 files changed, 295 insertions(+), 22 deletions(-) diff --git a/exp/controllers/awsmachinepool_controller.go b/exp/controllers/awsmachinepool_controller.go index cc88861f64..4744c087c4 100644 --- a/exp/controllers/awsmachinepool_controller.go +++ b/exp/controllers/awsmachinepool_controller.go @@ -18,15 +18,24 @@ package controllers import ( "context" + "encoding/json" "fmt" + "strings" + "sync" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -44,9 +53,11 @@ import ( "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud" "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/scope" "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services" + "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/asginstancestate" asg "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/autoscaling" "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/ec2" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/controllers/remote" expclusterv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/conditions" @@ -295,12 +306,18 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP machinePoolScope.AWSMachinePool.Spec.ProviderID = asg.ID providerIDList := make([]string, len(asg.Instances)) + instancestateSvc := asginstancestate.NewService(ec2Scope) + if err := instancestateSvc.ReconcileASGEC2Events(); err != nil { + // non fatal error, so we continue + clusterScope.Error(err, "non-fatal: failed to set up EventBridge") + } + for i, ec2 := range asg.Instances { providerIDList[i] = fmt.Sprintf("aws:///%s/%s", ec2.AvailabilityZone, ec2.ID) } machinePoolScope.SetAnnotation("cluster-api-provider-aws", "true") - + r.checkForTerminatingInstancesInMachinePool(ctx, clusterScope, asgsvc, machinePoolScope.Name()) machinePoolScope.AWSMachinePool.Spec.ProviderIDList = providerIDList machinePoolScope.AWSMachinePool.Status.Replicas = int32(len(providerIDList)) machinePoolScope.AWSMachinePool.Status.Ready = true @@ -370,6 +387,11 @@ func (r *AWSMachinePoolReconciler) reconcileDelete(machinePoolScope *scope.Machi // remove finalizer controllerutil.RemoveFinalizer(machinePoolScope.AWSMachinePool, expinfrav1.MachinePoolFinalizer) + instancestateSvc := asginstancestate.NewService(ec2Scope) + if err := instancestateSvc.DeleteASGEC2Events(); err != nil { + clusterScope.Error(err, "non-fatal: failed to delete EventBridge notifications") + } + return ctrl.Result{}, nil } @@ -604,3 +626,254 @@ func (r *AWSMachinePoolReconciler) getInfraCluster(ctx context.Context, log logr return clusterScope, nil } + +func (r *AWSMachinePoolReconciler) checkForTerminatingInstancesInMachinePool(ctx context.Context, clusterScope cloud.ClusterScoper, asgsvc services.ASGInterface, asgName string) { + + sqsSvs := scope.NewGlobalSQSClient(clusterScope, clusterScope) + queueURL, _ := r.getQueueURL(clusterScope.Name(), sqsSvs) + resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String(queueURL)}) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to receive messages from instance state queue") + return + } + for _, msg := range resp.Messages { + + m := message{} + err := json.Unmarshal([]byte(*msg.Body), &m) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to receive messages from instance state queue") + return + } + if r.isInstanceTerminating(m) { + clusterScope.Info("Instance shutting down", "instance-id", m.Detail.EC2InstanceId) + err = r.cordonNode(ctx, clusterScope, m.Detail.EC2InstanceId) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to cordon the node", "instance-id", m.Detail.EC2InstanceId) + } else { + clusterScope.Info("Node cordoned sucessfully", "instance-id", m.Detail.EC2InstanceId) + } + errs := r.evictPods(ctx, clusterScope, m.Detail.EC2InstanceId) + if len(errs) > 0 { + for i := 0; i < len(errs); i++ { + clusterScope.Error(err, "non-fatal: failed to evict pod from the node", "instance-id", m.Detail.EC2InstanceId) + } + } else { + clusterScope.Info("All pods evicted sucessfully from the node", "instance-id", m.Detail.EC2InstanceId) + } + err = asgsvc.CompleteLifeCycleEvent(asgName, m.Detail.EC2InstanceId) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to remove pre deletion hook for the node", "instance-id", m.Detail.EC2InstanceId) + } else { + clusterScope.Info("Pre deletion hook removed sucessfully from the node", "instance-id", m.Detail.EC2InstanceId) + } + _, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: aws.String(queueURL), + ReceiptHandle: msg.ReceiptHandle, + }) + if err != nil { + clusterScope.Error(err, "error deleting message", "queueURL", queueURL, "messageReceiptHandle", msg.ReceiptHandle) + } + } else { + _, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: aws.String(queueURL), + ReceiptHandle: msg.ReceiptHandle, + }) + + if err != nil { + clusterScope.Error(err, "error deleting message", "queueURL", queueURL, "messageReceiptHandle", msg.ReceiptHandle) + } + } + } +} + +func (r *AWSMachinePoolReconciler) getQueueURL(clusterName string, sqsSvs sqsiface.SQSAPI) (string, error) { + queueName := asginstancestate.GenerateQueueName(clusterName) + resp, err := sqsSvs.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(queueName)}) + if err != nil { + return "", err + } + + return *resp.QueueUrl, nil +} + +func (r *AWSMachinePoolReconciler) isInstanceTerminating(msg message) bool { + + if msg.Source != "aws.autoscaling" || msg.Detail.LifecycleTransition == "" { + return false + } + if msg.Detail.LifecycleTransition == LifecycleTransitionTerminating { + return true + } + return false +} + +func (r *AWSMachinePoolReconciler) cordonNode(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) error { + + node, err := r.getNodeFromInstanceID(ctx, clusterScope, nodeInstanceID) + if err != nil { + return err + } + + // disable schedulabling of pods on the node + node.Spec.Unschedulable = true + + workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return err + } + + if err := workloadClient.Update(context.Background(), node); err != nil { + return err + } + + return nil +} + +func (r *AWSMachinePoolReconciler) evictPods(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) []error { + pods, err := r.podsToEvict(ctx, clusterScope, nodeInstanceID) + if err != nil { + return []error{fmt.Errorf("failed to get the list of pods to evict: %v", err)} + } + errCh := make(chan error, len(pods)) + retErrs := []error{} + + var wg sync.WaitGroup + var isDone bool + defer func() { isDone = true }() + + wg.Add(len(pods)) + + for _, pod := range pods { + go func(p corev1.Pod) { + defer wg.Done() + for { + if isDone { + return + } + err := r.evictPod(ctx, clusterScope, &p) + if err == nil { + clusterScope.Info("Successfully evicted pod", "namespace", p.Namespace, "pod", p.Name, "instance-id", nodeInstanceID) + return + } else { + errCh <- fmt.Errorf("error evicting pod %s/%s on node : %v", p.Namespace, p.Name, err) + return + } + } + }(pod) + } + + finished := make(chan struct{}) + go func() { wg.Wait(); finished <- struct{}{} }() + + select { + case <-finished: + break + case err := <-errCh: + retErrs = append(retErrs, err) + } + + return retErrs +} + +func (r *AWSMachinePoolReconciler) podsToEvict(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) ([]corev1.Pod, error) { + + var podsToEvict []corev1.Pod + node, err := r.getNodeFromInstanceID(ctx, clusterScope, nodeInstanceID) + if err != nil { + return podsToEvict, nil + } + workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return podsToEvict, nil + } + // filter to list all the pods on the node. + listOptions := client.ListOptions{ + Namespace: metav1.NamespaceAll, + Raw: &metav1.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String()}, + } + + listOptions.ApplyToList(&listOptions) + + podList := &corev1.PodList{} + + if err := workloadClient.List(context.Background(), podList, &listOptions); err != nil { + return podsToEvict, nil + } + for _, pod := range podList.Items { + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + continue + } + if controllerRef := metav1.GetControllerOf(&pod); controllerRef != nil && controllerRef.Kind == "DaemonSet" { + continue + } + if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { + continue + } + podsToEvict = append(podsToEvict, pod) + } + + return podsToEvict, nil +} + +func (r *AWSMachinePoolReconciler) evictPod(ctx context.Context, clusterScope cloud.ClusterScoper, pod *corev1.Pod) error { + + // get the config of the cluster where we need to evict pods + clusterConfig, err := remote.RESTConfig(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return err + } + + kubeClient, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + return err + } + + eviction := &policy.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + } + + return kubeClient.PolicyV1beta1().Evictions(eviction.Namespace).Evict(ctx, eviction) +} + +func (r *AWSMachinePoolReconciler) getNodeFromInstanceID(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) (*corev1.Node, error) { + + nodes := &corev1.NodeList{} + workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return &corev1.Node{}, nil + } + if err := workloadClient.List(context.Background(), nodes); err != nil { + return &corev1.Node{}, nil + } + node := corev1.Node{} + for _, node = range nodes.Items { + // provider ID is of the format aws:///us-east-1c/i-016bbceabf8257d39 + if strings.Contains(node.Spec.ProviderID, nodeInstanceID) { + break + } + } + return &node, nil +} + +type message struct { + Source string `json:"source"` + DetailType string `json:"detail-type,omitempty"` + Detail *messageDetail `json:"detail,omitempty"` +} + +type messageDetail struct { + EC2InstanceId string `json:"EC2InstanceId,omitempty"` + LifecycleTransition LifecycleTransition `json:"LifecycleTransition"` +} + +// LifecycleTransition describes the state of an AWS instance lauched via. AWS auto scaling group +type LifecycleTransition string + +var ( + // LifecycleTransitionTerminating is the string representing an instance in a terminating wait state + LifecycleTransitionTerminating = LifecycleTransition("autoscaling:EC2_INSTANCE_TERMINATING") +) diff --git a/pkg/cloud/services/asginstancestate/asgec2events.go b/pkg/cloud/services/asginstancestate/asgec2events.go index 0aa63cd644..d44a57865e 100644 --- a/pkg/cloud/services/asginstancestate/asgec2events.go +++ b/pkg/cloud/services/asginstancestate/asgec2events.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/helpers_test.go b/pkg/cloud/services/asginstancestate/helpers_test.go index fbf3ccc554..cd83c569b5 100644 --- a/pkg/cloud/services/asginstancestate/helpers_test.go +++ b/pkg/cloud/services/asginstancestate/helpers_test.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go index 14097cb477..1d109ed53b 100644 --- a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go +++ b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go index f5c723c472..4d533cb05d 100644 --- a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go +++ b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go b/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go index 70b8a24262..eaba8f6020 100644 --- a/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go +++ b/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go b/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go index 718c914354..1e946029dd 100644 --- a/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go +++ b/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/queue.go b/pkg/cloud/services/asginstancestate/queue.go index 8fd6027e6a..2fa3228d6b 100644 --- a/pkg/cloud/services/asginstancestate/queue.go +++ b/pkg/cloud/services/asginstancestate/queue.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/queue_test.go b/pkg/cloud/services/asginstancestate/queue_test.go index 2c8786c22a..78b1e871b7 100644 --- a/pkg/cloud/services/asginstancestate/queue_test.go +++ b/pkg/cloud/services/asginstancestate/queue_test.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/rule.go b/pkg/cloud/services/asginstancestate/rule.go index 16648c7f0d..7ac46f15ce 100644 --- a/pkg/cloud/services/asginstancestate/rule.go +++ b/pkg/cloud/services/asginstancestate/rule.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/rule_test.go b/pkg/cloud/services/asginstancestate/rule_test.go index 8fe6bd13b3..6cb7db841d 100644 --- a/pkg/cloud/services/asginstancestate/rule_test.go +++ b/pkg/cloud/services/asginstancestate/rule_test.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/service.go b/pkg/cloud/services/asginstancestate/service.go index 4afe3342f7..74d3b894e5 100644 --- a/pkg/cloud/services/asginstancestate/service.go +++ b/pkg/cloud/services/asginstancestate/service.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/interfaces.go b/pkg/cloud/services/interfaces.go index 34de3b0d30..46e52e3545 100644 --- a/pkg/cloud/services/interfaces.go +++ b/pkg/cloud/services/interfaces.go @@ -44,7 +44,7 @@ type ASGInterface interface { DeleteASGAndWait(id string) error SuspendProcesses(name string, processes []string) error ResumeProcesses(name string, processes []string) error - CompleteLifeCycleEvent(asgName string, instanceID string) errorßßß + CompleteLifeCycleEvent(asgName string, instanceID string) error } // EC2Interface encapsulates the methods exposed to the machine