Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove IP Address based node replace procedure #2027

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions pkg/controller/scyllacluster/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,6 @@ func MemberService(sc *scyllav1.ScyllaCluster, rackName, name string, oldService
svcLabels[naming.RackNameLabel] = rackName
svcLabels[naming.ScyllaServiceTypeLabel] = string(naming.ScyllaServiceTypeMember)

var replaceAddr string
var hasReplaceLabel bool
if oldService != nil {
replaceAddr, hasReplaceLabel = oldService.Labels[naming.ReplaceLabel]
}

// Only new service should get the replace address, old service keeps "" until deleted.
if !hasReplaceLabel || len(replaceAddr) != 0 {
rackStatus, ok := sc.Status.Racks[rackName]
if ok {
replaceAddr := rackStatus.ReplaceAddressFirstBoot[name]
if len(replaceAddr) != 0 {
svcLabels[naming.ReplaceLabel] = replaceAddr
}
}
}

svcAnnotations := map[string]string{}
if sc.Spec.ExposeOptions != nil && sc.Spec.ExposeOptions.NodeService.Annotations != nil {
maps.Copy(svcAnnotations, sc.Spec.ExposeOptions.NodeService.Annotations)
Expand Down
90 changes: 4 additions & 86 deletions pkg/controller/scyllacluster/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,80 +158,6 @@ func TestMemberService(t *testing.T) {
},
},
},
{
name: "new service with saved IP",
scyllaCluster: func() *scyllav1.ScyllaCluster {
sc := basicSC.DeepCopy()
sc.Status.Racks[basicRackName] = scyllav1.RackStatus{
ReplaceAddressFirstBoot: map[string]string{
basicSVCName: "10.0.0.1",
},
}
return sc
}(),
rackName: basicRackName,
svcName: basicSVCName,
oldService: nil,
jobs: nil,
expectedService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: basicSVCName,
Labels: func() map[string]string {
labels := basicSVCLabels()
labels[naming.ReplaceLabel] = "10.0.0.1"
return labels
}(),
Annotations: basicSVCAnnotations(),
OwnerReferences: basicSCOwnerRefs,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: basicSVCSelector,
PublishNotReadyAddresses: true,
Ports: basicPorts,
},
},
},
{
name: "new service with saved IP and existing replace address",
scyllaCluster: func() *scyllav1.ScyllaCluster {
sc := basicSC.DeepCopy()
sc.Status.Racks[basicRackName] = scyllav1.RackStatus{
ReplaceAddressFirstBoot: map[string]string{
basicSVCName: "10.0.0.1",
},
}
return sc
}(),
rackName: basicRackName,
svcName: basicSVCName,
oldService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
naming.ReplaceLabel: "10.0.0.1",
},
},
},
jobs: nil,
expectedService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: basicSVCName,
Labels: func() map[string]string {
labels := basicSVCLabels()
labels[naming.ReplaceLabel] = "10.0.0.1"
return labels
}(),
Annotations: basicSVCAnnotations(),
OwnerReferences: basicSCOwnerRefs,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Selector: basicSVCSelector,
PublishNotReadyAddresses: true,
Ports: basicPorts,
},
},
},
// This behaviour is based on the fact the we merge labels on apply.
// TODO: to be addressed with https://github.com/scylladb/scylla-operator/issues/1440.
{
Expand Down Expand Up @@ -263,18 +189,10 @@ func TestMemberService(t *testing.T) {
},
},
{
name: "existing initial service with IP",
scyllaCluster: func() *scyllav1.ScyllaCluster {
sc := basicSC.DeepCopy()
sc.Status.Racks[basicRackName] = scyllav1.RackStatus{
ReplaceAddressFirstBoot: map[string]string{
basicSVCName: "10.0.0.1",
},
}
return sc
}(),
rackName: basicRackName,
svcName: basicSVCName,
name: "existing initial service with IP",
scyllaCluster: basicSC.DeepCopy(),
rackName: basicRackName,
svcName: basicSVCName,
oldService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand Down
180 changes: 9 additions & 171 deletions pkg/controller/scyllacluster/sync_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1"
"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/naming"
"github.com/scylladb/scylla-operator/pkg/pointer"
"github.com/scylladb/scylla-operator/pkg/resourceapply"
"github.com/scylladb/scylla-operator/pkg/scyllafeatures"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -251,24 +250,17 @@ func (scc *Controller) syncServices(
if err != nil {
return progressingConditions, fmt.Errorf("can't determine if ScyllaCluster %q supports replacing using hostID: %w", naming.ObjRef(sc), err)
}
if !supportsReplaceUsingHostID {
return progressingConditions, fmt.Errorf("can't replace node %q, ScyllaDB version of %q ScyllaCluster doesn't support HostID based replace procedure", naming.ObjRef(svc), naming.ObjRef(sc))
}

if supportsReplaceUsingHostID {
klog.V(4).InfoS("Replacing node using HostID", "ScyllaCluster", klog.KObj(sc), "Service", klog.KObj(svc))
pcs, err := scc.replaceNodeUsingHostID(ctx, sc, svc)
if err != nil {
return progressingConditions, fmt.Errorf("can't replace node using Host ID: %w", err)
}

progressingConditions = append(progressingConditions, pcs...)
} else {
klog.V(4).InfoS("Replacing node using ClusterIP", "ScyllaCluster", klog.KObj(sc), "Service", klog.KObj(svc))
pcs, err := scc.replaceNodeUsingClusterIP(ctx, sc, status, svc, rackName)
if err != nil {
return progressingConditions, fmt.Errorf("can't replace node using ClusterIP: %w", err)
}

progressingConditions = append(progressingConditions, pcs...)
klog.V(4).InfoS("Replacing node using HostID", "ScyllaCluster", klog.KObj(sc), "Service", klog.KObj(svc))
pcs, err := scc.replaceNodeUsingHostID(ctx, sc, svc)
if err != nil {
return progressingConditions, fmt.Errorf("can't replace node using Host ID: %w", err)
}

progressingConditions = append(progressingConditions, pcs...)
}

return progressingConditions, nil
Expand Down Expand Up @@ -426,160 +418,6 @@ func (scc *Controller) finishOngoingReplaceNodeUsingHostID(ctx context.Context,
return progressingConditions, nil
}

func (scc *Controller) replaceNodeUsingClusterIP(ctx context.Context, sc *scyllav1.ScyllaCluster, status *scyllav1.ScyllaClusterStatus, svc *corev1.Service, rackName string) ([]metav1.Condition, error) {
var progressingConditions []metav1.Condition

replaceAddr, ok := svc.Labels[naming.ReplaceLabel]
if !ok {
return progressingConditions, nil
}

if len(replaceAddr) == 0 {
pcs, err := scc.initializeReplaceNodeUsingClusterIP(ctx, sc, status, svc, rackName)
if err != nil {
return progressingConditions, fmt.Errorf("can't initialize replace using ClusterIP: %w", err)
}
progressingConditions = append(progressingConditions, pcs...)
} else {
pcs, err := scc.finishOngoingReplaceNodeUsingClusterIP(ctx, sc, status, svc, rackName)
if err != nil {
return progressingConditions, fmt.Errorf("can't finish replace using ClusterIP: %w", err)
}
progressingConditions = append(progressingConditions, pcs...)
}

return progressingConditions, nil
}

func (scc *Controller) initializeReplaceNodeUsingClusterIP(ctx context.Context, sc *scyllav1.ScyllaCluster, status *scyllav1.ScyllaClusterStatus, svc *corev1.Service, rackName string) ([]metav1.Condition, error) {
var progressingConditions []metav1.Condition

// Member needs to be replaced.
// Save replace address in RackStatus
if len(status.Racks[rackName].ReplaceAddressFirstBoot[svc.Name]) == 0 {
status.Racks[rackName].ReplaceAddressFirstBoot[svc.Name] = svc.Spec.ClusterIP
klog.V(2).InfoS("Adding member address to replace address list", "Member", svc.Name, "IP", svc.Spec.ClusterIP, "ReplaceAddresses", status.Racks[rackName].ReplaceAddressFirstBoot)

// Make sure the address is stored before proceeding further.
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Status: metav1.ConditionTrue,
Reason: "WaitingToPickUpReplaceAddress",
Message: fmt.Sprintf("Service %q is waiting to pick up stored replace address.", naming.ObjRef(svc)),
ObservedGeneration: sc.Generation,
})
return progressingConditions, nil
}

pcs, err := scc.removePodAndAssociatedPVC(ctx, sc, svc)
if err != nil {
return progressingConditions, fmt.Errorf("can't remove ScyllaCluster %q Pod %q and associated PVC: %w", naming.ObjRef(sc), naming.ObjRef(svc), err)
}
progressingConditions = append(progressingConditions, pcs...)

// Delete the member Service.
klog.V(2).InfoS("Deleting the member service to replace member",
"ScyllaCluster", klog.KObj(sc),
"Service", klog.KObj(svc),
)
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, serviceControllerProgressingCondition, svc, "delete", sc.Generation)
err = scc.kubeClient.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{
PropagationPolicy: pointer.Ptr(metav1.DeletePropagationBackground),
Preconditions: &metav1.Preconditions{
UID: &svc.UID,
// Delete only if its the version we see in cache. (Has the same replace label state.)
ResourceVersion: &svc.ResourceVersion,
},
})
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).InfoS("Pod not found", "Pod", klog.ObjectRef{Namespace: svc.Namespace, Name: svc.Name})
} else {
resourceapply.ReportDeleteEvent(scc.eventRecorder, svc, err)
return progressingConditions, err
}
} else {
resourceapply.ReportDeleteEvent(scc.eventRecorder, svc, nil)

// FIXME: The pod could have been already up and read the old ClusterIP - make sure it will restart.
// We can't delete the pod here as it wouldn't retry failures.

// We have deleted the service. Wait for re-queue.
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Status: metav1.ConditionTrue,
Reason: "WaitingToObserveDeletedService",
Message: fmt.Sprintf("Waiting to observe service %q as deleted to proceeed with other services.", naming.ObjRef(svc)),
ObservedGeneration: sc.Generation,
})
}

return progressingConditions, nil
}

func (scc *Controller) finishOngoingReplaceNodeUsingClusterIP(ctx context.Context, sc *scyllav1.ScyllaCluster, status *scyllav1.ScyllaClusterStatus, svc *corev1.Service, rackName string) ([]metav1.Condition, error) {
var progressingConditions []metav1.Condition

// Member is being replaced. Wait for readiness and clear the replace label.

pod, err := scc.podLister.Pods(svc.Namespace).Get(svc.Name)
if err != nil {
if !apierrors.IsNotFound(err) {
return progressingConditions, err
}

klog.V(2).InfoS("Pod has not been recreated by the StatefulSet controller yet",
"ScyllaCluster", klog.KObj(sc),
"Pod", klog.KObj(svc),
)
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Status: metav1.ConditionTrue,
Reason: "WaitingForPodRecreation",
Message: fmt.Sprintf("Service %q is waiting for StatefulSet controller to recreate pod.", naming.ObjRef(svc)),
ObservedGeneration: sc.Generation,
})
} else {
sc := scc.resolveScyllaClusterControllerThroughStatefulSet(pod)
if sc == nil {
return progressingConditions, fmt.Errorf("pod %q is not owned by us anymore", naming.ObjRef(pod))
}

// We could still see an old pod in the caches - verify with a live call.
podReady, pod, err := controllerhelpers.IsPodReadyWithPositiveLiveCheck(ctx, scc.kubeClient.CoreV1(), pod)
if err != nil {
return progressingConditions, err
}
if podReady {
scc.eventRecorder.Eventf(svc, corev1.EventTypeNormal, "FinishedReplacingNode", "New pod %s/%s is ready.", pod.Namespace, pod.Name)
svcCopy := svc.DeepCopy()
delete(svcCopy.Labels, naming.ReplaceLabel)
controllerhelpers.AddGenericProgressingStatusCondition(&progressingConditions, serviceControllerProgressingCondition, svcCopy, "update", sc.Generation)
_, err := scc.kubeClient.CoreV1().Services(svcCopy.Namespace).Update(ctx, svcCopy, metav1.UpdateOptions{})
resourceapply.ReportUpdateEvent(scc.eventRecorder, svc, err)
if err != nil {
return progressingConditions, err
}

delete(status.Racks[rackName].ReplaceAddressFirstBoot, svc.Name)
} else {
klog.V(2).InfoS("Pod isn't ready yet",
"ScyllaCluster", klog.KObj(sc),
"Pod", klog.KObj(pod),
)
progressingConditions = append(progressingConditions, metav1.Condition{
Type: serviceControllerProgressingCondition,
Status: metav1.ConditionTrue,
Reason: "WaitingForPodReadiness",
Message: fmt.Sprintf("Service %q is waiting for Pod %q to become ready.", naming.ObjRef(svc), naming.ObjRef(pod)),
ObservedGeneration: sc.Generation,
})
}
}

return progressingConditions, nil
}

func (scc *Controller) removePodAndAssociatedPVC(ctx context.Context, sc *scyllav1.ScyllaCluster, svc *corev1.Service) ([]metav1.Condition, error) {
var progressingConditions []metav1.Condition
backgroundPropagationPolicy := metav1.DeletePropagationBackground
Expand Down
8 changes: 0 additions & 8 deletions pkg/sidecar/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,6 @@ func (s *ScyllaConfig) setupEntrypoint(ctx context.Context) (*exec.Cmd, error) {
"broadcast-rpc-address": &m.BroadcastRPCAddress,
}

// If node is being replaced
if addr, ok := m.ServiceLabels[naming.ReplaceLabel]; ok {
if len(addr) == 0 {
klog.Warningf("Service %q have unexpectedly empty label %q, skipping replace", m.Name, naming.ReplaceLabel)
} else {
args["replace-address-first-boot"] = pointer.Ptr(addr)
}
}
if hostID, ok := m.ServiceLabels[naming.ReplacingNodeHostIDLabel]; ok {
if len(hostID) == 0 {
klog.Warningf("Service %q have unexpectedly empty label %q, skipping replace", m.Name, naming.ReplacingNodeHostIDLabel)
Expand Down
Loading