diff --git a/exp/controllers/awsmachinepool_controller.go b/exp/controllers/awsmachinepool_controller.go index cc88861f64..4744c087c4 100644 --- a/exp/controllers/awsmachinepool_controller.go +++ b/exp/controllers/awsmachinepool_controller.go @@ -18,15 +18,24 @@ package controllers import ( "context" + "encoding/json" "fmt" + "strings" + "sync" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/go-logr/logr" "github.com/google/go-cmp/cmp" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" @@ -44,9 +53,11 @@ import ( "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud" "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/scope" "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services" + "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/asginstancestate" asg "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/autoscaling" "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/services/ec2" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "sigs.k8s.io/cluster-api/controllers/remote" expclusterv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1" "sigs.k8s.io/cluster-api/util" "sigs.k8s.io/cluster-api/util/conditions" @@ -295,12 +306,18 @@ func (r *AWSMachinePoolReconciler) reconcileNormal(ctx context.Context, machineP machinePoolScope.AWSMachinePool.Spec.ProviderID = asg.ID providerIDList := make([]string, len(asg.Instances)) + instancestateSvc := asginstancestate.NewService(ec2Scope) + if err := instancestateSvc.ReconcileASGEC2Events(); err != nil { + // non fatal error, so we continue + clusterScope.Error(err, "non-fatal: failed to set up EventBridge") + } + for i, ec2 := range asg.Instances { providerIDList[i] = fmt.Sprintf("aws:///%s/%s", ec2.AvailabilityZone, ec2.ID) } machinePoolScope.SetAnnotation("cluster-api-provider-aws", "true") - + r.checkForTerminatingInstancesInMachinePool(ctx, clusterScope, asgsvc, machinePoolScope.Name()) machinePoolScope.AWSMachinePool.Spec.ProviderIDList = providerIDList machinePoolScope.AWSMachinePool.Status.Replicas = int32(len(providerIDList)) machinePoolScope.AWSMachinePool.Status.Ready = true @@ -370,6 +387,11 @@ func (r *AWSMachinePoolReconciler) reconcileDelete(machinePoolScope *scope.Machi // remove finalizer controllerutil.RemoveFinalizer(machinePoolScope.AWSMachinePool, expinfrav1.MachinePoolFinalizer) + instancestateSvc := asginstancestate.NewService(ec2Scope) + if err := instancestateSvc.DeleteASGEC2Events(); err != nil { + clusterScope.Error(err, "non-fatal: failed to delete EventBridge notifications") + } + return ctrl.Result{}, nil } @@ -604,3 +626,254 @@ func (r *AWSMachinePoolReconciler) getInfraCluster(ctx context.Context, log logr return clusterScope, nil } + +func (r *AWSMachinePoolReconciler) checkForTerminatingInstancesInMachinePool(ctx context.Context, clusterScope cloud.ClusterScoper, asgsvc services.ASGInterface, asgName string) { + + sqsSvs := scope.NewGlobalSQSClient(clusterScope, clusterScope) + queueURL, _ := r.getQueueURL(clusterScope.Name(), sqsSvs) + resp, err := sqsSvs.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: aws.String(queueURL)}) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to receive messages from instance state queue") + return + } + for _, msg := range resp.Messages { + + m := message{} + err := json.Unmarshal([]byte(*msg.Body), &m) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to receive messages from instance state queue") + return + } + if r.isInstanceTerminating(m) { + clusterScope.Info("Instance shutting down", "instance-id", m.Detail.EC2InstanceId) + err = r.cordonNode(ctx, clusterScope, m.Detail.EC2InstanceId) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to cordon the node", "instance-id", m.Detail.EC2InstanceId) + } else { + clusterScope.Info("Node cordoned sucessfully", "instance-id", m.Detail.EC2InstanceId) + } + errs := r.evictPods(ctx, clusterScope, m.Detail.EC2InstanceId) + if len(errs) > 0 { + for i := 0; i < len(errs); i++ { + clusterScope.Error(err, "non-fatal: failed to evict pod from the node", "instance-id", m.Detail.EC2InstanceId) + } + } else { + clusterScope.Info("All pods evicted sucessfully from the node", "instance-id", m.Detail.EC2InstanceId) + } + err = asgsvc.CompleteLifeCycleEvent(asgName, m.Detail.EC2InstanceId) + if err != nil { + clusterScope.Error(err, "non-fatal: failed to remove pre deletion hook for the node", "instance-id", m.Detail.EC2InstanceId) + } else { + clusterScope.Info("Pre deletion hook removed sucessfully from the node", "instance-id", m.Detail.EC2InstanceId) + } + _, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: aws.String(queueURL), + ReceiptHandle: msg.ReceiptHandle, + }) + if err != nil { + clusterScope.Error(err, "error deleting message", "queueURL", queueURL, "messageReceiptHandle", msg.ReceiptHandle) + } + } else { + _, err = sqsSvs.DeleteMessage(&sqs.DeleteMessageInput{ + QueueUrl: aws.String(queueURL), + ReceiptHandle: msg.ReceiptHandle, + }) + + if err != nil { + clusterScope.Error(err, "error deleting message", "queueURL", queueURL, "messageReceiptHandle", msg.ReceiptHandle) + } + } + } +} + +func (r *AWSMachinePoolReconciler) getQueueURL(clusterName string, sqsSvs sqsiface.SQSAPI) (string, error) { + queueName := asginstancestate.GenerateQueueName(clusterName) + resp, err := sqsSvs.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: aws.String(queueName)}) + if err != nil { + return "", err + } + + return *resp.QueueUrl, nil +} + +func (r *AWSMachinePoolReconciler) isInstanceTerminating(msg message) bool { + + if msg.Source != "aws.autoscaling" || msg.Detail.LifecycleTransition == "" { + return false + } + if msg.Detail.LifecycleTransition == LifecycleTransitionTerminating { + return true + } + return false +} + +func (r *AWSMachinePoolReconciler) cordonNode(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) error { + + node, err := r.getNodeFromInstanceID(ctx, clusterScope, nodeInstanceID) + if err != nil { + return err + } + + // disable schedulabling of pods on the node + node.Spec.Unschedulable = true + + workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return err + } + + if err := workloadClient.Update(context.Background(), node); err != nil { + return err + } + + return nil +} + +func (r *AWSMachinePoolReconciler) evictPods(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) []error { + pods, err := r.podsToEvict(ctx, clusterScope, nodeInstanceID) + if err != nil { + return []error{fmt.Errorf("failed to get the list of pods to evict: %v", err)} + } + errCh := make(chan error, len(pods)) + retErrs := []error{} + + var wg sync.WaitGroup + var isDone bool + defer func() { isDone = true }() + + wg.Add(len(pods)) + + for _, pod := range pods { + go func(p corev1.Pod) { + defer wg.Done() + for { + if isDone { + return + } + err := r.evictPod(ctx, clusterScope, &p) + if err == nil { + clusterScope.Info("Successfully evicted pod", "namespace", p.Namespace, "pod", p.Name, "instance-id", nodeInstanceID) + return + } else { + errCh <- fmt.Errorf("error evicting pod %s/%s on node : %v", p.Namespace, p.Name, err) + return + } + } + }(pod) + } + + finished := make(chan struct{}) + go func() { wg.Wait(); finished <- struct{}{} }() + + select { + case <-finished: + break + case err := <-errCh: + retErrs = append(retErrs, err) + } + + return retErrs +} + +func (r *AWSMachinePoolReconciler) podsToEvict(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) ([]corev1.Pod, error) { + + var podsToEvict []corev1.Pod + node, err := r.getNodeFromInstanceID(ctx, clusterScope, nodeInstanceID) + if err != nil { + return podsToEvict, nil + } + workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return podsToEvict, nil + } + // filter to list all the pods on the node. + listOptions := client.ListOptions{ + Namespace: metav1.NamespaceAll, + Raw: &metav1.ListOptions{ + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": node.Name}).String()}, + } + + listOptions.ApplyToList(&listOptions) + + podList := &corev1.PodList{} + + if err := workloadClient.List(context.Background(), podList, &listOptions); err != nil { + return podsToEvict, nil + } + for _, pod := range podList.Items { + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + continue + } + if controllerRef := metav1.GetControllerOf(&pod); controllerRef != nil && controllerRef.Kind == "DaemonSet" { + continue + } + if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { + continue + } + podsToEvict = append(podsToEvict, pod) + } + + return podsToEvict, nil +} + +func (r *AWSMachinePoolReconciler) evictPod(ctx context.Context, clusterScope cloud.ClusterScoper, pod *corev1.Pod) error { + + // get the config of the cluster where we need to evict pods + clusterConfig, err := remote.RESTConfig(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return err + } + + kubeClient, err := kubernetes.NewForConfig(clusterConfig) + if err != nil { + return err + } + + eviction := &policy.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + } + + return kubeClient.PolicyV1beta1().Evictions(eviction.Namespace).Evict(ctx, eviction) +} + +func (r *AWSMachinePoolReconciler) getNodeFromInstanceID(ctx context.Context, clusterScope cloud.ClusterScoper, nodeInstanceID string) (*corev1.Node, error) { + + nodes := &corev1.NodeList{} + workloadClient, err := remote.NewClusterClient(ctx, "", r.Client, util.ObjectKey(clusterScope.ClusterObj())) + if err != nil { + return &corev1.Node{}, nil + } + if err := workloadClient.List(context.Background(), nodes); err != nil { + return &corev1.Node{}, nil + } + node := corev1.Node{} + for _, node = range nodes.Items { + // provider ID is of the format aws:///us-east-1c/i-016bbceabf8257d39 + if strings.Contains(node.Spec.ProviderID, nodeInstanceID) { + break + } + } + return &node, nil +} + +type message struct { + Source string `json:"source"` + DetailType string `json:"detail-type,omitempty"` + Detail *messageDetail `json:"detail,omitempty"` +} + +type messageDetail struct { + EC2InstanceId string `json:"EC2InstanceId,omitempty"` + LifecycleTransition LifecycleTransition `json:"LifecycleTransition"` +} + +// LifecycleTransition describes the state of an AWS instance lauched via. AWS auto scaling group +type LifecycleTransition string + +var ( + // LifecycleTransitionTerminating is the string representing an instance in a terminating wait state + LifecycleTransitionTerminating = LifecycleTransition("autoscaling:EC2_INSTANCE_TERMINATING") +) diff --git a/pkg/cloud/services/asginstancestate/asgec2events.go b/pkg/cloud/services/asginstancestate/asgec2events.go index 0aa63cd644..d44a57865e 100644 --- a/pkg/cloud/services/asginstancestate/asgec2events.go +++ b/pkg/cloud/services/asginstancestate/asgec2events.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/helpers_test.go b/pkg/cloud/services/asginstancestate/helpers_test.go index fbf3ccc554..cd83c569b5 100644 --- a/pkg/cloud/services/asginstancestate/helpers_test.go +++ b/pkg/cloud/services/asginstancestate/helpers_test.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go index 14097cb477..1d109ed53b 100644 --- a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go +++ b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/doc.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go index f5c723c472..4d533cb05d 100644 --- a/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go +++ b/pkg/cloud/services/asginstancestate/mock_eventbridgeiface/eventbridgeiface_mock.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go b/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go index 70b8a24262..eaba8f6020 100644 --- a/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go +++ b/pkg/cloud/services/asginstancestate/mock_sqsiface/doc.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go b/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go index 718c914354..1e946029dd 100644 --- a/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go +++ b/pkg/cloud/services/asginstancestate/mock_sqsiface/sqsiface_mock.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/queue.go b/pkg/cloud/services/asginstancestate/queue.go index 8fd6027e6a..2fa3228d6b 100644 --- a/pkg/cloud/services/asginstancestate/queue.go +++ b/pkg/cloud/services/asginstancestate/queue.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/queue_test.go b/pkg/cloud/services/asginstancestate/queue_test.go index 2c8786c22a..78b1e871b7 100644 --- a/pkg/cloud/services/asginstancestate/queue_test.go +++ b/pkg/cloud/services/asginstancestate/queue_test.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/rule.go b/pkg/cloud/services/asginstancestate/rule.go index 16648c7f0d..7ac46f15ce 100644 --- a/pkg/cloud/services/asginstancestate/rule.go +++ b/pkg/cloud/services/asginstancestate/rule.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/rule_test.go b/pkg/cloud/services/asginstancestate/rule_test.go index 8fe6bd13b3..6cb7db841d 100644 --- a/pkg/cloud/services/asginstancestate/rule_test.go +++ b/pkg/cloud/services/asginstancestate/rule_test.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/asginstancestate/service.go b/pkg/cloud/services/asginstancestate/service.go index 4afe3342f7..74d3b894e5 100644 --- a/pkg/cloud/services/asginstancestate/service.go +++ b/pkg/cloud/services/asginstancestate/service.go @@ -1,11 +1,11 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/cloud/services/interfaces.go b/pkg/cloud/services/interfaces.go index 34de3b0d30..46e52e3545 100644 --- a/pkg/cloud/services/interfaces.go +++ b/pkg/cloud/services/interfaces.go @@ -44,7 +44,7 @@ type ASGInterface interface { DeleteASGAndWait(id string) error SuspendProcesses(name string, processes []string) error ResumeProcesses(name string, processes []string) error - CompleteLifeCycleEvent(asgName string, instanceID string) errorßßß + CompleteLifeCycleEvent(asgName string, instanceID string) error } // EC2Interface encapsulates the methods exposed to the machine