From 2bd8ec7f82a40327c3b3a24967c621b56450da1a Mon Sep 17 00:00:00 2001 From: Kacper Rzetelski Date: Thu, 14 Dec 2023 10:55:11 +0100 Subject: [PATCH] Determine host for Scylla client using ScyllaCluster expose options --- .../scyllacluster/sync_statefulsets.go | 26 +- pkg/controllerhelpers/scylla.go | 79 ++- pkg/controllerhelpers/scylla_test.go | 468 ++++++++++++++++++ pkg/naming/names.go | 5 + pkg/sidecar/identity/member.go | 86 +--- pkg/sidecar/identity/member_test.go | 76 ++- pkg/sidecar/probes.go | 9 - 7 files changed, 628 insertions(+), 121 deletions(-) diff --git a/pkg/controller/scyllacluster/sync_statefulsets.go b/pkg/controller/scyllacluster/sync_statefulsets.go index 88c2652447d..7a2c167ead5 100644 --- a/pkg/controller/scyllacluster/sync_statefulsets.go +++ b/pkg/controller/scyllacluster/sync_statefulsets.go @@ -145,7 +145,7 @@ func (scc *Controller) beforeUpgrade(ctx context.Context, sc *scyllav1.ScyllaClu klog.V(2).InfoS("Running pre-upgrade hook", "ScyllaCluster", klog.KObj(sc)) defer klog.V(2).InfoS("Finished running pre-upgrade hook", "ScyllaCluster", klog.KObj(sc)) - hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services) + hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services, scc.podLister) if err != nil { return true, err } @@ -184,7 +184,7 @@ func (scc *Controller) afterUpgrade(ctx context.Context, sc *scyllav1.ScyllaClus klog.V(2).InfoS("Running post-upgrade hook", "ScyllaCluster", klog.KObj(sc)) defer klog.V(2).InfoS("Finished running post-upgrade hook", "ScyllaCluster", klog.KObj(sc)) - hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services) + hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services, scc.podLister) if err != nil { return err } @@ -230,8 +230,13 @@ func (scc *Controller) beforeNodeUpgrade(ctx context.Context, sc *scyllav1.Scyll } // Drain the node. + podName := naming.PodNameFromService(svc) + pod, err := scc.podLister.Pods(sc.Namespace).Get(podName) + if err != nil { + return false, fmt.Errorf("can't get pod %q: %w", naming.ManualRef(sc.Namespace, podName), err) + } - host, err := controllerhelpers.GetScyllaIPFromService(svc) + host, err := controllerhelpers.GetScyllaHost(sc, svc, pod) if err != nil { return true, err } @@ -294,7 +299,6 @@ func (scc *Controller) beforeNodeUpgrade(ctx context.Context, sc *scyllav1.Scyll // https://github.com/kubernetes/kubernetes/issues/67250 // Kubernetes can't evict pods when DesiredHealthy == 0 and it's already down, so we need to use DELETE // to succeed even when having just one replica. - podName := svcName klog.V(2).InfoS("Deleting Pod", "ScyllaCluster", klog.KObj(sc), "Pod", naming.ManualRef(sc.Namespace, podName)) err = scc.kubeClient.CoreV1().Pods(sc.Namespace).Delete(ctx, podName, metav1.DeleteOptions{}) if err != nil { @@ -311,7 +315,19 @@ func (scc *Controller) beforeNodeUpgrade(ctx context.Context, sc *scyllav1.Scyll } func (scc *Controller) afterNodeUpgrade(ctx context.Context, sc *scyllav1.ScyllaCluster, sts *appsv1.StatefulSet, ordinal int32, services map[string]*corev1.Service) error { - host, err := controllerhelpers.GetScyllaHost(sts.Name, ordinal, services) + svcName := fmt.Sprintf("%s-%d", sts.Name, ordinal) + svc, ok := services[svcName] + if !ok { + return fmt.Errorf("missing service %q", naming.ManualRef(sc.Namespace, svcName)) + } + + podName := naming.PodNameFromService(svc) + pod, err := scc.podLister.Pods(sc.Namespace).Get(podName) + if err != nil { + return fmt.Errorf("can't get pod %q: %w", naming.ManualRef(sc.Namespace, podName), err) + } + + host, err := controllerhelpers.GetScyllaHost(sc, svc, pod) if err != nil { return err } diff --git a/pkg/controllerhelpers/scylla.go b/pkg/controllerhelpers/scylla.go index 5531e98e550..88d80d4b04c 100644 --- a/pkg/controllerhelpers/scylla.go +++ b/pkg/controllerhelpers/scylla.go @@ -15,46 +15,79 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/errors" + corev1listers "k8s.io/client-go/listers/core/v1" corev1schedulinghelpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" ) -func GetScyllaIPFromService(svc *corev1.Service) (string, error) { - if svc.Spec.Type != corev1.ServiceTypeClusterIP { - return "", fmt.Errorf("service %s is of type %q instead of %q", naming.ObjRef(svc), svc.Spec.Type, corev1.ServiceTypeClusterIP) +func GetScyllaHost(sc *scyllav1.ScyllaCluster, svc *corev1.Service, pod *corev1.Pod) (string, error) { + // Assume API's default. + nodeBroadcastAddressType := scyllav1.BroadcastAddressTypeServiceClusterIP + if sc.Spec.ExposeOptions != nil && sc.Spec.ExposeOptions.BroadcastOptions != nil { + nodeBroadcastAddressType = sc.Spec.ExposeOptions.BroadcastOptions.Nodes.Type } - if svc.Spec.ClusterIP == corev1.ClusterIPNone { - return "", fmt.Errorf("service %s doesn't have a ClusterIP", naming.ObjRef(svc)) - } - - return svc.Spec.ClusterIP, nil + return GetScyllaBroadcastAddress(nodeBroadcastAddressType, svc, pod) } -func GetScyllaHost(statefulsetName string, ordinal int32, services map[string]*corev1.Service) (string, error) { - svcName := fmt.Sprintf("%s-%d", statefulsetName, ordinal) - svc, found := services[svcName] - if !found { - return "", fmt.Errorf("missing service %q", svcName) - } +func GetScyllaBroadcastAddress(broadcastAddressType scyllav1.BroadcastAddressType, svc *corev1.Service, pod *corev1.Pod) (string, error) { + switch broadcastAddressType { + case scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress: + if len(svc.Status.LoadBalancer.Ingress) < 1 { + return "", fmt.Errorf("service %q does not have an ingress status", naming.ObjRef(svc)) + } - ip, err := GetScyllaIPFromService(svc) - if err != nil { - return "", err - } + if len(svc.Status.LoadBalancer.Ingress[0].IP) != 0 { + return svc.Status.LoadBalancer.Ingress[0].IP, nil + } + + if len(svc.Status.LoadBalancer.Ingress[0].Hostname) != 0 { + return svc.Status.LoadBalancer.Ingress[0].Hostname, nil + } + + return "", fmt.Errorf("service %q does not have an external address", naming.ObjRef(svc)) + + case scyllav1.BroadcastAddressTypeServiceClusterIP: + if svc.Spec.ClusterIP == corev1.ClusterIPNone { + return "", fmt.Errorf("service %q does not have a ClusterIP address", naming.ObjRef(svc)) + } - return ip, nil + return svc.Spec.ClusterIP, nil + + case scyllav1.BroadcastAddressTypePodIP: + if len(pod.Status.PodIP) == 0 { + return "", fmt.Errorf("pod %q does not have a PodIP address", naming.ObjRef(pod)) + } + + return pod.Status.PodIP, nil + + default: + return "", fmt.Errorf("unsupported broadcast address type: %q", broadcastAddressType) + } } -func GetRequiredScyllaHosts(sc *scyllav1.ScyllaCluster, services map[string]*corev1.Service) ([]string, error) { +func GetRequiredScyllaHosts(sc *scyllav1.ScyllaCluster, services map[string]*corev1.Service, podLister corev1listers.PodLister) ([]string, error) { var hosts []string var errs []error for _, rack := range sc.Spec.Datacenter.Racks { for ord := int32(0); ord < rack.Members; ord++ { - stsName := naming.StatefulSetNameForRack(rack, sc) - host, err := GetScyllaHost(stsName, ord, services) + svcName := naming.MemberServiceName(rack, sc, int(ord)) + svc, exists := services[svcName] + if !exists { + errs = append(errs, fmt.Errorf("service %q does not exist", naming.ManualRef(sc.Namespace, svcName))) + continue + } + + podName := naming.PodNameFromService(svc) + pod, err := podLister.Pods(sc.Namespace).Get(podName) + if err != nil { + errs = append(errs, fmt.Errorf("can't get pod %q: %w", naming.ManualRef(sc.Namespace, podName), err)) + continue + } + + host, err := GetScyllaHost(sc, svc, pod) if err != nil { - errs = append(errs, err) + errs = append(errs, fmt.Errorf("can't get scylla host for service %q: %w", naming.ObjRef(svc), err)) continue } diff --git a/pkg/controllerhelpers/scylla_test.go b/pkg/controllerhelpers/scylla_test.go index 09e3cc06514..d96ed4a78af 100644 --- a/pkg/controllerhelpers/scylla_test.go +++ b/pkg/controllerhelpers/scylla_test.go @@ -1,14 +1,20 @@ package controllerhelpers import ( + "fmt" "reflect" "testing" "time" "github.com/google/go-cmp/cmp" + scyllav1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1" scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1alpha1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" ) func TestIsNodeConfigSelectingNode(t *testing.T) { @@ -313,3 +319,465 @@ func TestFindNodeConfigCondition(t *testing.T) { }) } } + +func TestGetScyllaHost(t *testing.T) { + t.Parallel() + + sc := &scyllav1.ScyllaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster", + }, + Spec: scyllav1.ScyllaClusterSpec{ + Datacenter: scyllav1.DatacenterSpec{ + Name: "us-east1", + Racks: []scyllav1.RackSpec{ + { + Name: "us-east1-b", + Members: 1, + }, + }, + }, + }, + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-0", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Type: corev1.ServiceTypeClusterIP, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-0", + }, + Status: corev1.PodStatus{ + PodIP: "10.1.0.1", + }, + } + + tt := []struct { + name string + scyllaCluster *scyllav1.ScyllaCluster + svc *corev1.Service + pod *corev1.Pod + expected string + expectedError error + }{ + { + name: "service ClusterIP for nil expose options", + scyllaCluster: func() *scyllav1.ScyllaCluster { + sc := sc.DeepCopy() + sc.Spec.ExposeOptions = nil + return sc + }(), + svc: svc, + pod: pod, + expected: "10.0.0.1", + }, + { + name: "service ClusterIP for nil broadcast options", + scyllaCluster: func() *scyllav1.ScyllaCluster { + sc := sc.DeepCopy() + sc.Spec.ExposeOptions = &scyllav1.ExposeOptions{} + return sc + }(), + svc: svc, + pod: pod, + expected: "10.0.0.1", + }, + } + + for i := range tt { + tc := tt[i] + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + actual, err := GetScyllaHost(tc.scyllaCluster, tc.svc, tc.pod) + + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("expected error %#+v, got %#+v", tc.expectedError, err) + } + if !reflect.DeepEqual(actual, tc.expected) { + t.Errorf("expected host %q, got %q", tc.expected, actual) + } + }) + } +} + +func TestGetScyllaNodeBroadcastAddress(t *testing.T) { + t.Parallel() + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-0", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Type: corev1.ServiceTypeClusterIP, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-0", + }, + Status: corev1.PodStatus{ + PodIP: "10.1.0.1", + }, + } + + tt := []struct { + name string + nodeBroadcastAddressType scyllav1.BroadcastAddressType + svc *corev1.Service + pod *corev1.Pod + expected string + expectedError error + }{ + + { + name: "service ClusterIP for ClusterIP broadcast address type", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypeServiceClusterIP, + pod: pod, + svc: svc, + expected: "10.0.0.1", + }, + { + name: "error for ClusterIP broadcast address type and none ClusterIP", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypeServiceClusterIP, + pod: pod, + svc: func() *corev1.Service { + svc := svc.DeepCopy() + + svc.Spec.ClusterIP = corev1.ClusterIPNone + + return svc + }(), + expected: "", + expectedError: fmt.Errorf(`service "simple-cluster-us-east1-us-east1-b-0" does not have a ClusterIP address`), + }, + { + name: "PodIP broadcast address type", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypePodIP, + pod: pod, + svc: svc, + expected: "10.1.0.1", + expectedError: nil, + }, + { + name: "error for PodIP broadcast address type and empty PodIP", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypePodIP, + pod: func() *corev1.Pod { + pod := pod.DeepCopy() + + pod.Status.PodIP = "" + + return pod + }(), + svc: svc, + expected: "", + expectedError: fmt.Errorf(`pod "simple-cluster-us-east1-us-east1-b-0" does not have a PodIP address`), + }, + { + name: "error for broadcast address type service load balancer ingress and no service ingress status", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress, + pod: pod, + svc: func() *corev1.Service { + svc := svc.DeepCopy() + + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{} + + return svc + }(), + expected: "", + expectedError: fmt.Errorf(`service "simple-cluster-us-east1-us-east1-b-0" does not have an ingress status`), + }, + { + name: "ip for broadcast address type service load balancer ingress and non-empty ip in service load balancer status", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress, + pod: pod, + svc: func() *corev1.Service { + svc := svc.DeepCopy() + + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ + { + IP: "10.2.0.1", + Hostname: "test.scylla.com", + }, + } + + return svc + }(), + expected: "10.2.0.1", + expectedError: nil, + }, + { + name: "hostname for broadcast address type service load balancer ingress, empty ip and non-empty hostname in service load balancer status", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress, + pod: pod, + svc: func() *corev1.Service { + svc := svc.DeepCopy() + + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ + { + IP: "", + Hostname: "test.scylla.com", + }, + } + + return svc + }(), + expected: "test.scylla.com", + expectedError: nil, + }, + { + name: "error for broadcast address type service load balancer ingress and no external address in service load balancer status", + nodeBroadcastAddressType: scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress, + pod: pod, + svc: func() *corev1.Service { + svc := svc.DeepCopy() + + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ + { + IP: "", + Hostname: "", + }, + } + + return svc + }(), + expected: "", + expectedError: fmt.Errorf(`service "simple-cluster-us-east1-us-east1-b-0" does not have an external address`), + }, + { + name: "error for unsupported broadcast address type", + nodeBroadcastAddressType: scyllav1.BroadcastAddressType("Unsupported"), + pod: pod, + svc: svc, + expected: "", + expectedError: fmt.Errorf(`unsupported broadcast address type: "Unsupported"`), + }, + } + + for i := range tt { + tc := tt[i] + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + actual, err := GetScyllaBroadcastAddress(tc.nodeBroadcastAddressType, tc.svc, tc.pod) + + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("expected error %#+v, got %#+v", tc.expectedError, err) + } + if !reflect.DeepEqual(actual, tc.expected) { + t.Errorf("expected host %q, got %q", tc.expected, actual) + } + }) + } +} + +func TestGetRequiredScyllaHosts(t *testing.T) { + t.Parallel() + + sc := &scyllav1.ScyllaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster", + Namespace: "test", + }, + Spec: scyllav1.ScyllaClusterSpec{ + Datacenter: scyllav1.DatacenterSpec{ + Name: "us-east1", + Racks: []scyllav1.RackSpec{ + { + Name: "us-east1-b", + Members: 2, + }, + }, + }, + }, + Status: scyllav1.ScyllaClusterStatus{}, + } + + firstPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-0", + Namespace: "test", + }, + Status: corev1.PodStatus{ + PodIP: "10.1.0.1", + }, + } + + secondPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-1", + Namespace: "test", + }, + Status: corev1.PodStatus{ + PodIP: "10.1.0.2", + }, + } + + firstService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-0", + Namespace: "test", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Type: corev1.ServiceTypeClusterIP, + }, + } + + secondService := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "simple-cluster-us-east1-us-east1-b-1", + Namespace: "test", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.0.0.2", + Type: corev1.ServiceTypeClusterIP, + }, + } + + tt := []struct { + name string + sc *scyllav1.ScyllaCluster + services map[string]*corev1.Service + existingPods []*corev1.Pod + expected []string + expectedError error + }{ + { + name: "missing service", + sc: sc, + services: map[string]*corev1.Service{ + "simple-cluster-us-east1-us-east1-b-0": firstService, + }, + existingPods: []*corev1.Pod{ + firstPod, + secondPod, + }, + expected: nil, + expectedError: utilerrors.NewAggregate([]error{ + fmt.Errorf(`service "test/simple-cluster-us-east1-us-east1-b-1" does not exist`), + }), + }, + { + name: "missing pod", + sc: sc, + services: map[string]*corev1.Service{ + "simple-cluster-us-east1-us-east1-b-0": firstService, + "simple-cluster-us-east1-us-east1-b-1": secondService, + }, + existingPods: []*corev1.Pod{ + firstPod, + }, + expected: nil, + expectedError: utilerrors.NewAggregate([]error{ + fmt.Errorf(`can't get pod "test/simple-cluster-us-east1-us-east1-b-1": %w`, apierrors.NewNotFound(corev1.Resource("pod"), "simple-cluster-us-east1-us-east1-b-1")), + }), + }, + { + name: "ClusterIP aggregate", + sc: sc, + services: map[string]*corev1.Service{ + "simple-cluster-us-east1-us-east1-b-0": firstService, + "simple-cluster-us-east1-us-east1-b-1": secondService, + }, + existingPods: []*corev1.Pod{ + firstPod, + secondPod, + }, + expected: []string{ + "10.0.0.1", + "10.0.0.2", + }, + expectedError: nil, + }, + { + name: "PodIP aggregate", + sc: func() *scyllav1.ScyllaCluster { + sc := sc.DeepCopy() + + sc.Spec.ExposeOptions = &scyllav1.ExposeOptions{ + BroadcastOptions: &scyllav1.NodeBroadcastOptions{ + Nodes: scyllav1.BroadcastOptions{ + Type: scyllav1.BroadcastAddressTypePodIP, + }, + }, + } + + return sc + }(), + services: map[string]*corev1.Service{ + "simple-cluster-us-east1-us-east1-b-0": firstService, + "simple-cluster-us-east1-us-east1-b-1": secondService, + }, + existingPods: []*corev1.Pod{ + firstPod, + secondPod, + }, + expected: []string{ + "10.1.0.1", + "10.1.0.2", + }, + expectedError: nil, + }, + { + name: "service missing ClusterIP", + sc: sc, + services: map[string]*corev1.Service{ + "simple-cluster-us-east1-us-east1-b-0": firstService, + "simple-cluster-us-east1-us-east1-b-1": func() *corev1.Service { + svc := secondService.DeepCopy() + + svc.Spec.ClusterIP = corev1.ClusterIPNone + + return svc + }(), + }, + existingPods: []*corev1.Pod{ + firstPod, + secondPod, + }, + expected: nil, + expectedError: utilerrors.NewAggregate([]error{ + fmt.Errorf(`can't get scylla host for service "test/simple-cluster-us-east1-us-east1-b-1": %w`, fmt.Errorf(`service "test/simple-cluster-us-east1-us-east1-b-1" does not have a ClusterIP address`)), + }), + }, + } + + for i := range tt { + tc := tt[i] + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + podCache := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, obj := range tc.existingPods { + err := podCache.Add(obj) + if err != nil { + t.Fatal(err) + } + } + podLister := corev1listers.NewPodLister(podCache) + + actual, err := GetRequiredScyllaHosts(tc.sc, tc.services, podLister) + + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("expected error %#+v, got %#+v", tc.expectedError, err) + } + if !reflect.DeepEqual(actual, tc.expected) { + t.Errorf("expected host %q, got %q", tc.expected, actual) + } + }) + } +} diff --git a/pkg/naming/names.go b/pkg/naming/names.go index c944a296d85..ed6ef4413ef 100644 --- a/pkg/naming/names.go +++ b/pkg/naming/names.go @@ -38,6 +38,11 @@ func ServiceNameFromPod(pod *corev1.Pod) string { return pod.Name } +func PodNameFromService(svc *corev1.Service) string { + // Pod and its corresponding Service have the same name + return svc.Name +} + func AgentAuthTokenSecretName(clusterName string) string { return fmt.Sprintf("%s-auth-token", clusterName) } diff --git a/pkg/sidecar/identity/member.go b/pkg/sidecar/identity/member.go index a00dae51431..5378a377933 100644 --- a/pkg/sidecar/identity/member.go +++ b/pkg/sidecar/identity/member.go @@ -21,13 +21,7 @@ type Member struct { // Name of the Pod Name string // Namespace of the Pod - Namespace string - // PodIP IP of the Pod - PodIP string - // ExternalAddress is an external IP or hostname of a LoadBalancer Service - ExternalAddress string - // ClusterIP of the member's Service - ClusterIP string + Namespace string Rack string RackOrdinal int Datacenter string @@ -52,32 +46,9 @@ func NewMember(service *corev1.Service, pod *corev1.Pod, nodesAddressType, clien return nil, fmt.Errorf("can't get rack ordinal from label %q: %w", rackOrdinalString, err) } - var externalIP string - if service.Spec.Type == corev1.ServiceTypeLoadBalancer { - if len(service.Status.LoadBalancer.Ingress) == 0 { - return nil, fmt.Errorf("LoadBalancer Service %q is missing endpoints in status", naming.ObjRef(service)) - } - - firstIngress := service.Status.LoadBalancer.Ingress[0] - if len(firstIngress.Hostname) != 0 { - externalIP = firstIngress.Hostname - } - - if len(firstIngress.IP) != 0 { - externalIP = firstIngress.IP - } - - if len(externalIP) == 0 { - return nil, fmt.Errorf("service first ingress status has empty external address") - } - } - m := &Member{ Namespace: service.Namespace, Name: service.Name, - PodIP: pod.Status.PodIP, - ClusterIP: service.Spec.ClusterIP, - ExternalAddress: externalIP, Rack: pod.Labels[naming.RackNameLabel], RackOrdinal: rackOrdinal, Datacenter: pod.Labels[naming.DatacenterNameLabel], @@ -88,26 +59,14 @@ func NewMember(service *corev1.Service, pod *corev1.Pod, nodesAddressType, clien NodesBroadcastAddressType: nodesAddressType, } - switch nodesAddressType { - case scyllav1.BroadcastAddressTypePodIP: - m.BroadcastAddress = m.PodIP - case scyllav1.BroadcastAddressTypeServiceClusterIP: - m.BroadcastAddress = m.ClusterIP - case scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress: - m.BroadcastAddress = m.ExternalAddress - default: - return nil, fmt.Errorf("unsupported nodes address type: %q", nodesAddressType) + m.BroadcastAddress, err = controllerhelpers.GetScyllaBroadcastAddress(nodesAddressType, service, pod) + if err != nil { + return nil, fmt.Errorf("can't get node broadcast address: %w", err) } - switch clientAddressType { - case scyllav1.BroadcastAddressTypePodIP: - m.BroadcastRPCAddress = m.PodIP - case scyllav1.BroadcastAddressTypeServiceClusterIP: - m.BroadcastRPCAddress = m.ClusterIP - case scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress: - m.BroadcastRPCAddress = m.ExternalAddress - default: - return nil, fmt.Errorf("unsupported clients address type: %q", clientAddressType) + m.BroadcastRPCAddress, err = controllerhelpers.GetScyllaBroadcastAddress(clientAddressType, service, pod) + if err != nil { + return nil, fmt.Errorf("can't get client broadcast address: %w", err) } return m, nil @@ -173,32 +132,13 @@ func (m *Member) GetSeeds(ctx context.Context, coreClient v1.CoreV1Interface, ex res := make([]string, 0, len(externalSeeds)+1) res = append(res, externalSeeds...) - // Assume nodes share broadcast address type, and it's immutable. - switch m.NodesBroadcastAddressType { - case scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress: - if len(svc.Status.LoadBalancer.Ingress) < 1 { - return nil, fmt.Errorf("service %q does not have ingress status, despite external address being set as broadcasted", naming.ObjRef(svc)) - } - if len(svc.Status.LoadBalancer.Ingress[0].IP) != 0 { - res = append(res, svc.Status.LoadBalancer.Ingress[0].IP) - } else if len(svc.Status.LoadBalancer.Ingress[0].Hostname) != 0 { - res = append(res, svc.Status.LoadBalancer.Ingress[0].Hostname) - } else { - return nil, fmt.Errorf("service %q does not have external address, despite being set as broadcasted", naming.ObjRef(svc)) - } - case scyllav1.BroadcastAddressTypeServiceClusterIP: - if svc.Spec.ClusterIP == corev1.ClusterIPNone { - return nil, fmt.Errorf("service %q does not have ClusterIP address, despite being set as broadcasted", naming.ObjRef(svc)) - } - res = append(res, svc.Spec.ClusterIP) - case scyllav1.BroadcastAddressTypePodIP: - if len(pod.Status.PodIP) == 0 { - return nil, fmt.Errorf("pod %q does not have IP address, despite being set as broadcasted", naming.ObjRef(pod)) - } - res = append(res, pod.Status.PodIP) - default: - return nil, fmt.Errorf("unsupported node broadcast address type %q", m.NodesBroadcastAddressType) + // Assume nodes share broadcast address type and it's immutable. + localSeed, err := controllerhelpers.GetScyllaBroadcastAddress(m.NodesBroadcastAddressType, svc, pod) + if err != nil { + return nil, fmt.Errorf("can't get node broadcast address for service %q: %w", naming.ObjRef(svc), err) } + res = append(res, localSeed) + return res, nil } diff --git a/pkg/sidecar/identity/member_test.go b/pkg/sidecar/identity/member_test.go index e122c406a49..75d6696a417 100644 --- a/pkg/sidecar/identity/member_test.go +++ b/pkg/sidecar/identity/member_test.go @@ -201,13 +201,21 @@ func TestMember_GetSeeds(t *testing.T) { expectSeeds: []string{secondService.Spec.ClusterIP}, }, { - name: "use PodIP from status when node broadcast address type is PodIP", - memberPod: firstPod, + name: "use PodIP from status when node broadcast address type is PodIP", + memberPod: func() *corev1.Pod { + pod := firstPod.DeepCopy() + pod.Status.PodIP = "10.0.0.1" + return pod + }(), memberService: firstService, memberClientsBroadcastType: scyllav1.BroadcastAddressTypeServiceClusterIP, memberNodesBroadcastType: scyllav1.BroadcastAddressTypePodIP, objects: []runtime.Object{ - firstPod, + func() runtime.Object { + pod := firstPod.DeepCopy() + pod.Status.PodIP = "10.0.0.1" + return pod + }(), firstService, func() runtime.Object { pod := secondPod.DeepCopy() @@ -246,17 +254,40 @@ func TestMember_GetSeeds(t *testing.T) { expectSeeds: []string{"1.2.3.4"}, }, { - name: "use preferred IP address from first Service ingress status when node broadcast address type is LoadBalancer Ingress", - memberPod: firstPod, - memberService: firstService, + name: "use preferred IP address from first Service ingress status when node broadcast address type is LoadBalancer Ingress", + memberPod: firstPod, + memberService: func() *corev1.Service { + svc := firstService.DeepCopy() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer = corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + { + Hostname: "first.service.scylladb.com", + }, + }, + } + return svc + }(), memberClientsBroadcastType: scyllav1.BroadcastAddressTypeServiceClusterIP, memberNodesBroadcastType: scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress, objects: []runtime.Object{ firstPod, - firstService, + func() runtime.Object { + svc := firstService.DeepCopy() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer = corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + { + Hostname: "first.service.scylladb.com", + }, + }, + } + return svc + }(), secondPod, func() runtime.Object { svc := secondService.DeepCopy() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer svc.Status.LoadBalancer = corev1.LoadBalancerStatus{ Ingress: []corev1.LoadBalancerIngress{ { @@ -273,17 +304,40 @@ func TestMember_GetSeeds(t *testing.T) { expectSeeds: []string{"1.2.3.4"}, }, { - name: "use hostname from first Service ingress status when node broadcast address type is LoadBalancer Ingress and IP is not available", - memberPod: firstPod, - memberService: firstService, + name: "use hostname from first Service ingress status when node broadcast address type is LoadBalancer Ingress and IP is not available", + memberPod: firstPod, + memberService: func() *corev1.Service { + svc := firstService.DeepCopy() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer = corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + { + Hostname: "first.service.scylladb.com", + }, + }, + } + return svc + }(), memberClientsBroadcastType: scyllav1.BroadcastAddressTypeServiceClusterIP, memberNodesBroadcastType: scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress, objects: []runtime.Object{ firstPod, - firstService, + func() runtime.Object { + svc := firstService.DeepCopy() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer + svc.Status.LoadBalancer = corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{ + { + Hostname: "first.service.scylladb.com", + }, + }, + } + return svc + }(), secondPod, func() runtime.Object { svc := secondService.DeepCopy() + svc.Spec.Type = corev1.ServiceTypeLoadBalancer svc.Status.LoadBalancer = corev1.LoadBalancerStatus{ Ingress: []corev1.LoadBalancerIngress{ { diff --git a/pkg/sidecar/probes.go b/pkg/sidecar/probes.go index c10f5468ee6..efd52051fad 100644 --- a/pkg/sidecar/probes.go +++ b/pkg/sidecar/probes.go @@ -50,15 +50,6 @@ func (p *Prober) isNodeUnderMaintenance() (bool, error) { return hasLabel, nil } -func (p *Prober) getNodeAddress() (string, error) { - svc, err := p.serviceLister.Services(p.namespace).Get(p.serviceName) - if err != nil { - return "", err - } - - return controllerhelpers.GetScyllaIPFromService(svc) -} - func (p *Prober) Readyz(w http.ResponseWriter, req *http.Request) { ctx, ctxCancel := context.WithTimeout(req.Context(), p.timeout) defer ctxCancel()