Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

Commit

Permalink
Failover ETCD quicker - Report if etcd peer is unreachable (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mojachieee authored Jun 6, 2022
1 parent c8851aa commit 8f7b8c9
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 8 deletions.
8 changes: 8 additions & 0 deletions api/v1alpha1/etcdpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ type EtcdPeerStatus struct {
// ServerVersion contains the Member server version
ServerVersion string `json:"serverVersion"`

// Healthy states whether the peer is reachable from the controller
// +optional
Healthy bool `json:"healthy"`

// LastHealthy states when the peer was last healthy
// +optional
LastHealthy metav1.Time `json:"lastHealthy"`

// TLS configuration
// +optional
TLSEnabled bool `json:"tlsEnabled"`
Expand Down
4 changes: 3 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions config/bases/crd/bases/etcd.improbable.io_etcdpeers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,14 @@ spec:
status:
description: EtcdPeerStatus defines the observed state of EtcdPeer
properties:
healthy:
description: Healthy states whether the peer is reachable from the
controller
type: boolean
lastHealthy:
description: LastHealthy states when the peer was last healthy
format: date-time
type: string
serverVersion:
description: ServerVersion contains the Member server version
type: string
Expand Down
15 changes: 15 additions & 0 deletions config/rbac/role.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

170 changes: 165 additions & 5 deletions controllers/etcdpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ import (
"strings"
"time"

"github.com/pkg/errors"

storage "k8s.io/api/storage/v1"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -40,17 +46,23 @@ type EtcdPeerReconciler struct {
Log logr.Logger
EtcdRepository string
Etcd etcd.APIBuilder
Recorder record.EventRecorder
}

const (
peerLabel = "etcd.improbable.io/peer-name"
pvcCleanupFinalizer = "etcdpeer.etcd.improbable.io/pvc-cleanup"
)

var NodeNotOffline = errors.New("node not offline")

// +kubebuilder:rbac:groups=etcd.improbable.io,resources=etcdpeers,verbs=get;list;watch;patch
// +kubebuilder:rbac:groups=etcd.improbable.io,resources=etcdpeers/status;etcdpeers/finalizers,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=list;get;create;watch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=list;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=list;get;create;watch;delete
// +kubebuilder:rbac:groups=storage,resources=volumeattachments,verbs=list;delete
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=list;get

func initialMemberURL(member etcdv1alpha1.InitialClusterMember, etcdScheme string) *url.URL {
return &url.URL{
Expand Down Expand Up @@ -580,6 +592,133 @@ func (r *EtcdPeerReconciler) updateStatus(peer *etcdv1alpha1.EtcdPeer, serverVer
peer.Status.ServerVersion = serverVersion
}

func (r *EtcdPeerReconciler) updateHealth(peer *etcdv1alpha1.EtcdPeer, healthy bool) {
peer.Status.Healthy = healthy
}

func (r *EtcdPeerReconciler) updateLastHealthy(peer *etcdv1alpha1.EtcdPeer, lastHealthy time.Time) {
peer.Status.LastHealthy = metav1.NewTime(lastHealthy)
}

func (r *EtcdPeerReconciler) getPVCForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*corev1.PersistentVolumeClaim, error) {
expectedPvc := pvcForPeer(peer)
expectedPvcNamespacedName := client.ObjectKeyFromObject(expectedPvc)

var actualPvc corev1.PersistentVolumeClaim
err := r.Client.Get(ctx, expectedPvcNamespacedName, &actualPvc)
if err != nil {
return nil, err
}

return &actualPvc, nil
}

func (r *EtcdPeerReconciler) findPodForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*corev1.Pod, error) {
var podList corev1.PodList
err := r.Client.List(ctx, &podList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
peerLabel: peer.Name,
clusterLabel: peer.Spec.ClusterName,
}),
Namespace: peer.Namespace,
})

if err != nil {
return nil, errors.Wrap(err, "failed to list pods")
}

if len(podList.Items) != 1 {
return nil, errors.Errorf("number of pods for peer not as expected, found %v, expected to find 1", len(podList.Items))
}

return &podList.Items[0], nil
}

func (r *EtcdPeerReconciler) findVolumeAttachmentForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*storage.VolumeAttachment, error) {
pvc, err := r.getPVCForPeer(ctx, peer)
if err != nil {
return nil, errors.Wrap(err, "failed to find pvc whilst looking for volume attachment")
}

volumeAttachments := &storage.VolumeAttachmentList{}
err = r.Client.List(ctx, volumeAttachments, &client.ListOptions{
// VolumeAttachments aren't namespaced (i.e they all exist in the "" namespace)
// so this needs to be here - otherwise it defaults to "default", which doesn't contain any VolumeAttachments
Namespace: "",
})
if err != nil {
return nil, errors.Wrap(err, "failed to list volume attachments")
}

for _, va := range volumeAttachments.Items {
if va.Spec.Source.PersistentVolumeName != nil && *va.Spec.Source.PersistentVolumeName == pvc.Spec.VolumeName {
return &va, nil
}
}
return nil, fmt.Errorf("failed to find volume attachment for pvc: %v", pvc)
}

func (r *EtcdPeerReconciler) cleanupOrphanedVolumeAttachmentForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer, expectedNodeName string) error {
va, err := r.findVolumeAttachmentForPeer(ctx, peer)
if err != nil {
return errors.Wrap(err, "failed to find volume attachment to delete")
}

// perform a sanity check to ensure we're deleting the old volume attachment and not the new one
if va.Spec.NodeName != expectedNodeName {
return errors.New("volume attachment is not on the expected node - not deleting")
}

gracePeriod := int64(0)
return r.Client.Delete(ctx, va, &client.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
})
}

// restartUnhealthyPod attempts to kickstart a new etcd pod It does this by
// deleting the volume attachment to the old associated pod (the volume
// attachment can take a while to be deleted in the event of a node going down
// which is why we do it manually). It then deletes the old associated pod
func (r *EtcdPeerReconciler) restartUnhealthyPod(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) error {
pod, err := r.findPodForPeer(ctx, peer)
if err != nil {
return errors.Wrap(err, "failed to find pod for peer")
}

nodeKey := client.ObjectKey{Namespace: "", Name: pod.Spec.NodeName}
var node corev1.Node
err = r.Client.Get(ctx, nodeKey, &node)
if err != nil {
return errors.Wrap(err, "failed to get node hosting broken etcd peer")
}
nodeOffline := false
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status != corev1.ConditionTrue {
nodeOffline = true
break
}
}

if !nodeOffline {
return NodeNotOffline
}

err = r.cleanupOrphanedVolumeAttachmentForPeer(ctx, peer, pod.Spec.NodeName)
if err != nil {
return errors.Wrap(err, "failed to cleanup volume attachment for unhealthy pod")
}

gracePeriod := int64(0)
err = r.Client.Delete(ctx, pod, &client.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
})
if err != nil {
return errors.Wrap(err, "failed to delete unhealthy pod")
}

return nil
}

func (r *EtcdPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -646,23 +785,44 @@ func (r *EtcdPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_
TLS: tlsConfig,
}

// Always requeue after ten seconds, as we don't watch on the membership list. So we don't auto-detect changes made
// to the etcd membership API.
// TODO(#76) Implement custom watch on etcd membership API, and remove this `requeueAfter`
result := ctrl.Result{RequeueAfter: time.Second * 10}

var healthy bool
lastHealthy := peer.Status.LastHealthy.Time

if c, err := r.Etcd.New(etcdConfig); err != nil {
log.Error(err, "Unable to connect to etcd")
healthy = false
r.Recorder.Event(&peer, "Warning", "MemberConnectionFailure", "Cannot connect to etcd member")
if lastHealthy.IsZero() {
log.Info("Peer has never been healthy")
} else if time.Since(lastHealthy) > time.Second*30 {
log.Info("Peer unhealthy for more than 30s - attempting to delete pod", "unhealthy_time", time.Since(lastHealthy).Seconds())
err := r.restartUnhealthyPod(ctx, &peer)
if err != nil {
log.Error(err, "failed to restart unhealthy pod - continuing")
} else {
r.updateLastHealthy(&peer, time.Time{})
}

}
} else {
defer c.Close()
if version, err := c.GetVersion(ctx); err != nil {
log.Error(err, "Unable to get Etcd version", "error", err)
} else {
serverVersion = version.Server
}
healthy = true
r.updateLastHealthy(&peer, time.Now())

}

r.updateStatus(&peer, serverVersion)

// Always requeue after ten seconds, as we don't watch on the membership list. So we don't auto-detect changes made
// to the etcd membership API.
// TODO(#76) Implement custom watch on etcd membership API, and remove this `requeueAfter`
result := ctrl.Result{RequeueAfter: time.Second * 10}
r.updateHealth(&peer, healthy)

// Check if the peer has been marked for deletion
if !peer.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down
1 change: 1 addition & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *controllerSuite) setupTest(t *testing.T, etcdAPI etcd.APIBuilder) (tear
Log: logger.WithName("EtcdPeer"),
Etcd: etcdAPI,
EtcdRepository: "quay.io/coreos/etcd",
Recorder: mgr.GetEventRecorderFor("etcdpeer-reconciler"),
}
err = peerController.SetupWithManager(mgr)
require.NoError(t, err, "failed to set up EtcdPeer controller")
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ func main() {
))

if err = (&controllers.EtcdPeerReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("EtcdPeer"),
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("EtcdPeer"),
Recorder: mgr.GetEventRecorderFor("etcdpeer-reconciler"),

Etcd: &etcd.ClientEtcdAPIBuilder{},
EtcdRepository: etcdRepository,
}).SetupWithManager(mgr); err != nil {
Expand Down

0 comments on commit 8f7b8c9

Please sign in to comment.