Skip to content

Commit

Permalink
Determine host for Scylla client using ScyllaCluster expose options
Browse files Browse the repository at this point in the history
  • Loading branch information
rzetelskik committed Dec 20, 2023
1 parent f5e3ea2 commit 2bd8ec7
Show file tree
Hide file tree
Showing 7 changed files with 628 additions and 121 deletions.
26 changes: 21 additions & 5 deletions pkg/controller/scyllacluster/sync_statefulsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (scc *Controller) beforeUpgrade(ctx context.Context, sc *scyllav1.ScyllaClu
klog.V(2).InfoS("Running pre-upgrade hook", "ScyllaCluster", klog.KObj(sc))
defer klog.V(2).InfoS("Finished running pre-upgrade hook", "ScyllaCluster", klog.KObj(sc))

hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services)
hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services, scc.podLister)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func (scc *Controller) afterUpgrade(ctx context.Context, sc *scyllav1.ScyllaClus
klog.V(2).InfoS("Running post-upgrade hook", "ScyllaCluster", klog.KObj(sc))
defer klog.V(2).InfoS("Finished running post-upgrade hook", "ScyllaCluster", klog.KObj(sc))

hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services)
hosts, err := controllerhelpers.GetRequiredScyllaHosts(sc, services, scc.podLister)
if err != nil {
return err
}
Expand Down Expand Up @@ -230,8 +230,13 @@ func (scc *Controller) beforeNodeUpgrade(ctx context.Context, sc *scyllav1.Scyll
}

// Drain the node.
podName := naming.PodNameFromService(svc)
pod, err := scc.podLister.Pods(sc.Namespace).Get(podName)
if err != nil {
return false, fmt.Errorf("can't get pod %q: %w", naming.ManualRef(sc.Namespace, podName), err)
}

host, err := controllerhelpers.GetScyllaIPFromService(svc)
host, err := controllerhelpers.GetScyllaHost(sc, svc, pod)
if err != nil {
return true, err
}
Expand Down Expand Up @@ -294,7 +299,6 @@ func (scc *Controller) beforeNodeUpgrade(ctx context.Context, sc *scyllav1.Scyll
// https://github.com/kubernetes/kubernetes/issues/67250
// Kubernetes can't evict pods when DesiredHealthy == 0 and it's already down, so we need to use DELETE
// to succeed even when having just one replica.
podName := svcName
klog.V(2).InfoS("Deleting Pod", "ScyllaCluster", klog.KObj(sc), "Pod", naming.ManualRef(sc.Namespace, podName))
err = scc.kubeClient.CoreV1().Pods(sc.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
Expand All @@ -311,7 +315,19 @@ func (scc *Controller) beforeNodeUpgrade(ctx context.Context, sc *scyllav1.Scyll
}

func (scc *Controller) afterNodeUpgrade(ctx context.Context, sc *scyllav1.ScyllaCluster, sts *appsv1.StatefulSet, ordinal int32, services map[string]*corev1.Service) error {
host, err := controllerhelpers.GetScyllaHost(sts.Name, ordinal, services)
svcName := fmt.Sprintf("%s-%d", sts.Name, ordinal)
svc, ok := services[svcName]
if !ok {
return fmt.Errorf("missing service %q", naming.ManualRef(sc.Namespace, svcName))
}

podName := naming.PodNameFromService(svc)
pod, err := scc.podLister.Pods(sc.Namespace).Get(podName)
if err != nil {
return fmt.Errorf("can't get pod %q: %w", naming.ManualRef(sc.Namespace, podName), err)
}

host, err := controllerhelpers.GetScyllaHost(sc, svc, pod)
if err != nil {
return err
}
Expand Down
79 changes: 56 additions & 23 deletions pkg/controllerhelpers/scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,79 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/errors"
corev1listers "k8s.io/client-go/listers/core/v1"
corev1schedulinghelpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
)

func GetScyllaIPFromService(svc *corev1.Service) (string, error) {
if svc.Spec.Type != corev1.ServiceTypeClusterIP {
return "", fmt.Errorf("service %s is of type %q instead of %q", naming.ObjRef(svc), svc.Spec.Type, corev1.ServiceTypeClusterIP)
func GetScyllaHost(sc *scyllav1.ScyllaCluster, svc *corev1.Service, pod *corev1.Pod) (string, error) {
// Assume API's default.
nodeBroadcastAddressType := scyllav1.BroadcastAddressTypeServiceClusterIP
if sc.Spec.ExposeOptions != nil && sc.Spec.ExposeOptions.BroadcastOptions != nil {
nodeBroadcastAddressType = sc.Spec.ExposeOptions.BroadcastOptions.Nodes.Type
}

if svc.Spec.ClusterIP == corev1.ClusterIPNone {
return "", fmt.Errorf("service %s doesn't have a ClusterIP", naming.ObjRef(svc))
}

return svc.Spec.ClusterIP, nil
return GetScyllaBroadcastAddress(nodeBroadcastAddressType, svc, pod)
}

func GetScyllaHost(statefulsetName string, ordinal int32, services map[string]*corev1.Service) (string, error) {
svcName := fmt.Sprintf("%s-%d", statefulsetName, ordinal)
svc, found := services[svcName]
if !found {
return "", fmt.Errorf("missing service %q", svcName)
}
func GetScyllaBroadcastAddress(broadcastAddressType scyllav1.BroadcastAddressType, svc *corev1.Service, pod *corev1.Pod) (string, error) {
switch broadcastAddressType {
case scyllav1.BroadcastAddressTypeServiceLoadBalancerIngress:
if len(svc.Status.LoadBalancer.Ingress) < 1 {
return "", fmt.Errorf("service %q does not have an ingress status", naming.ObjRef(svc))
}

ip, err := GetScyllaIPFromService(svc)
if err != nil {
return "", err
}
if len(svc.Status.LoadBalancer.Ingress[0].IP) != 0 {
return svc.Status.LoadBalancer.Ingress[0].IP, nil
}

if len(svc.Status.LoadBalancer.Ingress[0].Hostname) != 0 {
return svc.Status.LoadBalancer.Ingress[0].Hostname, nil
}

return "", fmt.Errorf("service %q does not have an external address", naming.ObjRef(svc))

case scyllav1.BroadcastAddressTypeServiceClusterIP:
if svc.Spec.ClusterIP == corev1.ClusterIPNone {
return "", fmt.Errorf("service %q does not have a ClusterIP address", naming.ObjRef(svc))
}

return ip, nil
return svc.Spec.ClusterIP, nil

case scyllav1.BroadcastAddressTypePodIP:
if len(pod.Status.PodIP) == 0 {
return "", fmt.Errorf("pod %q does not have a PodIP address", naming.ObjRef(pod))
}

return pod.Status.PodIP, nil

default:
return "", fmt.Errorf("unsupported broadcast address type: %q", broadcastAddressType)
}
}

func GetRequiredScyllaHosts(sc *scyllav1.ScyllaCluster, services map[string]*corev1.Service) ([]string, error) {
func GetRequiredScyllaHosts(sc *scyllav1.ScyllaCluster, services map[string]*corev1.Service, podLister corev1listers.PodLister) ([]string, error) {
var hosts []string
var errs []error
for _, rack := range sc.Spec.Datacenter.Racks {
for ord := int32(0); ord < rack.Members; ord++ {
stsName := naming.StatefulSetNameForRack(rack, sc)
host, err := GetScyllaHost(stsName, ord, services)
svcName := naming.MemberServiceName(rack, sc, int(ord))
svc, exists := services[svcName]
if !exists {
errs = append(errs, fmt.Errorf("service %q does not exist", naming.ManualRef(sc.Namespace, svcName)))
continue
}

podName := naming.PodNameFromService(svc)
pod, err := podLister.Pods(sc.Namespace).Get(podName)
if err != nil {
errs = append(errs, fmt.Errorf("can't get pod %q: %w", naming.ManualRef(sc.Namespace, podName), err))
continue
}

host, err := GetScyllaHost(sc, svc, pod)
if err != nil {
errs = append(errs, err)
errs = append(errs, fmt.Errorf("can't get scylla host for service %q: %w", naming.ObjRef(svc), err))
continue
}

Expand Down
Loading

0 comments on commit 2bd8ec7

Please sign in to comment.