From 8f7b8c97b9f690f8d27e07f629694ebcc3422458 Mon Sep 17 00:00:00 2001 From: Joe Stephenson Date: Mon, 6 Jun 2022 14:26:16 +0100 Subject: [PATCH] Failover ETCD quicker - Report if etcd peer is unreachable (#20) --- api/v1alpha1/etcdpeer_types.go | 8 + api/v1alpha1/zz_generated.deepcopy.go | 4 +- .../bases/etcd.improbable.io_etcdpeers.yaml | 8 + config/rbac/role.yaml | 15 ++ controllers/etcdpeer_controller.go | 170 +++++++++++++++++- controllers/suite_test.go | 1 + main.go | 6 +- 7 files changed, 204 insertions(+), 8 deletions(-) diff --git a/api/v1alpha1/etcdpeer_types.go b/api/v1alpha1/etcdpeer_types.go index 1c0c5d15..eb405024 100644 --- a/api/v1alpha1/etcdpeer_types.go +++ b/api/v1alpha1/etcdpeer_types.go @@ -96,6 +96,14 @@ type EtcdPeerStatus struct { // ServerVersion contains the Member server version ServerVersion string `json:"serverVersion"` + // Healthy states whether the peer is reachable from the controller + // +optional + Healthy bool `json:"healthy"` + + // LastHealthy states when the peer was last healthy + // +optional + LastHealthy metav1.Time `json:"lastHealthy"` + // TLS configuration // +optional TLSEnabled bool `json:"tlsEnabled"` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 5e8ee65a..4d11c65f 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1,3 +1,4 @@ +//go:build !ignore_autogenerated // +build !ignore_autogenerated // Code generated by controller-gen. DO NOT EDIT. @@ -399,7 +400,7 @@ func (in *EtcdPeer) DeepCopyInto(out *EtcdPeer) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EtcdPeer. @@ -490,6 +491,7 @@ func (in *EtcdPeerSpec) DeepCopy() *EtcdPeerSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EtcdPeerStatus) DeepCopyInto(out *EtcdPeerStatus) { *out = *in + in.LastHealthy.DeepCopyInto(&out.LastHealthy) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EtcdPeerStatus. diff --git a/config/bases/crd/bases/etcd.improbable.io_etcdpeers.yaml b/config/bases/crd/bases/etcd.improbable.io_etcdpeers.yaml index 67a774a1..86a2eade 100644 --- a/config/bases/crd/bases/etcd.improbable.io_etcdpeers.yaml +++ b/config/bases/crd/bases/etcd.improbable.io_etcdpeers.yaml @@ -1068,6 +1068,14 @@ spec: status: description: EtcdPeerStatus defines the observed state of EtcdPeer properties: + healthy: + description: Healthy states whether the peer is reachable from the + controller + type: boolean + lastHealthy: + description: LastHealthy states when the peer was last healthy + format: date-time + type: string serverVersion: description: ServerVersion contains the Member server version type: string diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 8d4c9735..6ed8bebc 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -43,6 +43,13 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list - apiGroups: - "" resources: @@ -69,6 +76,7 @@ rules: - pods verbs: - create + - delete - get - list - watch @@ -210,3 +218,10 @@ rules: - list - patch - watch +- apiGroups: + - storage + resources: + - volumeattachments + verbs: + - delete + - list diff --git a/controllers/etcdpeer_controller.go b/controllers/etcdpeer_controller.go index b464bcab..0e54ea5f 100644 --- a/controllers/etcdpeer_controller.go +++ b/controllers/etcdpeer_controller.go @@ -9,6 +9,10 @@ import ( "strings" "time" + "github.com/pkg/errors" + + storage "k8s.io/api/storage/v1" + "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -16,10 +20,12 @@ import ( apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -40,6 +46,7 @@ type EtcdPeerReconciler struct { Log logr.Logger EtcdRepository string Etcd etcd.APIBuilder + Recorder record.EventRecorder } const ( @@ -47,10 +54,15 @@ const ( pvcCleanupFinalizer = "etcdpeer.etcd.improbable.io/pvc-cleanup" ) +var NodeNotOffline = errors.New("node not offline") + // +kubebuilder:rbac:groups=etcd.improbable.io,resources=etcdpeers,verbs=get;list;watch;patch // +kubebuilder:rbac:groups=etcd.improbable.io,resources=etcdpeers/status;etcdpeers/finalizers,verbs=get;update;patch // +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=list;get;create;watch +// +kubebuilder:rbac:groups=core,resources=pods,verbs=list;delete // +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=list;get;create;watch;delete +// +kubebuilder:rbac:groups=storage,resources=volumeattachments,verbs=list;delete +// +kubebuilder:rbac:groups=core,resources=nodes,verbs=list;get func initialMemberURL(member etcdv1alpha1.InitialClusterMember, etcdScheme string) *url.URL { return &url.URL{ @@ -580,6 +592,133 @@ func (r *EtcdPeerReconciler) updateStatus(peer *etcdv1alpha1.EtcdPeer, serverVer peer.Status.ServerVersion = serverVersion } +func (r *EtcdPeerReconciler) updateHealth(peer *etcdv1alpha1.EtcdPeer, healthy bool) { + peer.Status.Healthy = healthy +} + +func (r *EtcdPeerReconciler) updateLastHealthy(peer *etcdv1alpha1.EtcdPeer, lastHealthy time.Time) { + peer.Status.LastHealthy = metav1.NewTime(lastHealthy) +} + +func (r *EtcdPeerReconciler) getPVCForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*corev1.PersistentVolumeClaim, error) { + expectedPvc := pvcForPeer(peer) + expectedPvcNamespacedName := client.ObjectKeyFromObject(expectedPvc) + + var actualPvc corev1.PersistentVolumeClaim + err := r.Client.Get(ctx, expectedPvcNamespacedName, &actualPvc) + if err != nil { + return nil, err + } + + return &actualPvc, nil +} + +func (r *EtcdPeerReconciler) findPodForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*corev1.Pod, error) { + var podList corev1.PodList + err := r.Client.List(ctx, &podList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + peerLabel: peer.Name, + clusterLabel: peer.Spec.ClusterName, + }), + Namespace: peer.Namespace, + }) + + if err != nil { + return nil, errors.Wrap(err, "failed to list pods") + } + + if len(podList.Items) != 1 { + return nil, errors.Errorf("number of pods for peer not as expected, found %v, expected to find 1", len(podList.Items)) + } + + return &podList.Items[0], nil +} + +func (r *EtcdPeerReconciler) findVolumeAttachmentForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*storage.VolumeAttachment, error) { + pvc, err := r.getPVCForPeer(ctx, peer) + if err != nil { + return nil, errors.Wrap(err, "failed to find pvc whilst looking for volume attachment") + } + + volumeAttachments := &storage.VolumeAttachmentList{} + err = r.Client.List(ctx, volumeAttachments, &client.ListOptions{ + // VolumeAttachments aren't namespaced (i.e they all exist in the "" namespace) + // so this needs to be here - otherwise it defaults to "default", which doesn't contain any VolumeAttachments + Namespace: "", + }) + if err != nil { + return nil, errors.Wrap(err, "failed to list volume attachments") + } + + for _, va := range volumeAttachments.Items { + if va.Spec.Source.PersistentVolumeName != nil && *va.Spec.Source.PersistentVolumeName == pvc.Spec.VolumeName { + return &va, nil + } + } + return nil, fmt.Errorf("failed to find volume attachment for pvc: %v", pvc) +} + +func (r *EtcdPeerReconciler) cleanupOrphanedVolumeAttachmentForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer, expectedNodeName string) error { + va, err := r.findVolumeAttachmentForPeer(ctx, peer) + if err != nil { + return errors.Wrap(err, "failed to find volume attachment to delete") + } + + // perform a sanity check to ensure we're deleting the old volume attachment and not the new one + if va.Spec.NodeName != expectedNodeName { + return errors.New("volume attachment is not on the expected node - not deleting") + } + + gracePeriod := int64(0) + return r.Client.Delete(ctx, va, &client.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + }) +} + +// restartUnhealthyPod attempts to kickstart a new etcd pod It does this by +// deleting the volume attachment to the old associated pod (the volume +// attachment can take a while to be deleted in the event of a node going down +// which is why we do it manually). It then deletes the old associated pod +func (r *EtcdPeerReconciler) restartUnhealthyPod(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) error { + pod, err := r.findPodForPeer(ctx, peer) + if err != nil { + return errors.Wrap(err, "failed to find pod for peer") + } + + nodeKey := client.ObjectKey{Namespace: "", Name: pod.Spec.NodeName} + var node corev1.Node + err = r.Client.Get(ctx, nodeKey, &node) + if err != nil { + return errors.Wrap(err, "failed to get node hosting broken etcd peer") + } + nodeOffline := false + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady && condition.Status != corev1.ConditionTrue { + nodeOffline = true + break + } + } + + if !nodeOffline { + return NodeNotOffline + } + + err = r.cleanupOrphanedVolumeAttachmentForPeer(ctx, peer, pod.Spec.NodeName) + if err != nil { + return errors.Wrap(err, "failed to cleanup volume attachment for unhealthy pod") + } + + gracePeriod := int64(0) + err = r.Client.Delete(ctx, pod, &client.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + }) + if err != nil { + return errors.Wrap(err, "failed to delete unhealthy pod") + } + + return nil +} + func (r *EtcdPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() @@ -646,8 +785,30 @@ func (r *EtcdPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ TLS: tlsConfig, } + // Always requeue after ten seconds, as we don't watch on the membership list. So we don't auto-detect changes made + // to the etcd membership API. + // TODO(#76) Implement custom watch on etcd membership API, and remove this `requeueAfter` + result := ctrl.Result{RequeueAfter: time.Second * 10} + + var healthy bool + lastHealthy := peer.Status.LastHealthy.Time + if c, err := r.Etcd.New(etcdConfig); err != nil { log.Error(err, "Unable to connect to etcd") + healthy = false + r.Recorder.Event(&peer, "Warning", "MemberConnectionFailure", "Cannot connect to etcd member") + if lastHealthy.IsZero() { + log.Info("Peer has never been healthy") + } else if time.Since(lastHealthy) > time.Second*30 { + log.Info("Peer unhealthy for more than 30s - attempting to delete pod", "unhealthy_time", time.Since(lastHealthy).Seconds()) + err := r.restartUnhealthyPod(ctx, &peer) + if err != nil { + log.Error(err, "failed to restart unhealthy pod - continuing") + } else { + r.updateLastHealthy(&peer, time.Time{}) + } + + } } else { defer c.Close() if version, err := c.GetVersion(ctx); err != nil { @@ -655,14 +816,13 @@ func (r *EtcdPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ } else { serverVersion = version.Server } + healthy = true + r.updateLastHealthy(&peer, time.Now()) + } r.updateStatus(&peer, serverVersion) - - // Always requeue after ten seconds, as we don't watch on the membership list. So we don't auto-detect changes made - // to the etcd membership API. - // TODO(#76) Implement custom watch on etcd membership API, and remove this `requeueAfter` - result := ctrl.Result{RequeueAfter: time.Second * 10} + r.updateHealth(&peer, healthy) // Check if the peer has been marked for deletion if !peer.ObjectMeta.DeletionTimestamp.IsZero() { diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 6cca5fe6..e27b607b 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -102,6 +102,7 @@ func (s *controllerSuite) setupTest(t *testing.T, etcdAPI etcd.APIBuilder) (tear Log: logger.WithName("EtcdPeer"), Etcd: etcdAPI, EtcdRepository: "quay.io/coreos/etcd", + Recorder: mgr.GetEventRecorderFor("etcdpeer-reconciler"), } err = peerController.SetupWithManager(mgr) require.NoError(t, err, "failed to set up EtcdPeer controller") diff --git a/main.go b/main.go index ae95014c..bb4f5b63 100644 --- a/main.go +++ b/main.go @@ -114,8 +114,10 @@ func main() { )) if err = (&controllers.EtcdPeerReconciler{ - Client: mgr.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("EtcdPeer"), + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("EtcdPeer"), + Recorder: mgr.GetEventRecorderFor("etcdpeer-reconciler"), + Etcd: &etcd.ClientEtcdAPIBuilder{}, EtcdRepository: etcdRepository, }).SetupWithManager(mgr); err != nil {