From f56d901060f5497430969bb83a8aec62f05284da Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Thu, 8 Feb 2024 19:13:23 +0100 Subject: [PATCH] skipper: use kubernetes cluster client to discover redis shards Fixes #2476 Signed-off-by: Alexander Yastrebov --- skipper.go | 46 +++++++++++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/skipper.go b/skipper.go index 95f1d5a2f0..767f3b5ee9 100644 --- a/skipper.go +++ b/skipper.go @@ -1357,7 +1357,7 @@ func findKubernetesDataclient(dataClients []routing.DataClient) *kubernetes.Clie return kdc } -func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client) func() ([]string, error) { +func getKubernetesDataclientRedisUpdater(opts *Options, kdc *kubernetes.Client) func() ([]string, error) { // TODO(sszuecs): make sure kubernetes dataclient is already initialized and // has polled the data once or kdc.GetEndpointAdresses should be blocking // call to kubernetes API @@ -1365,12 +1365,25 @@ func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client) func() ([]string a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) log.Debugf("Redis updater called and found %d redis endpoints", len(a)) - port := strconv.Itoa(opts.KubernetesRedisServicePort) - for i := 0; i < len(a); i++ { - a[i] = net.JoinHostPort(a[i], port) - } - return a, nil + return joinPort(a, opts.KubernetesRedisServicePort), nil + } +} + +func getKubernetesClusterClientRedisUpdater(opts *Options, kc *kubernetes.ClusterClient) func() ([]string, error) { + return func() ([]string, error) { + a, err := kc.LoadEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + log.Debugf("LoadEndpointAddresses found %d redis endpoints, err: %v", len(a), err) + + return joinPort(a, opts.KubernetesRedisServicePort), err + } +} + +func joinPort(addrs []string, port int) []string { + p := strconv.Itoa(port) + for i := 0; i < len(addrs); i++ { + addrs[i] = net.JoinHostPort(addrs[i], p) } + return addrs } type RedisEndpoint struct { @@ -1688,25 +1701,32 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { kdc := findKubernetesDataclient(dataClients) if kdc != nil { - redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc) - _, err = redisOptions.AddrUpdater() + redisOptions.AddrUpdater = getKubernetesDataclientRedisUpdater(&o, kdc) + } else { + cc, err := kubernetes.NewClusterClient(o.KubernetesDataClientOptions()) if err != nil { - log.Errorf("Failed to update redis address %v", err) + log.Errorf("Failed to create kubernetes cluster client: %v", err) return err } - } else { - log.Errorf("Failed to find kubernetes dataclient, but redis shards should be get by kubernetes svc %s/%s", o.KubernetesRedisServiceNamespace, o.KubernetesRedisServiceName) + defer cc.Close() + + redisOptions.AddrUpdater = getKubernetesClusterClientRedisUpdater(&o, cc) + } + + _, err = redisOptions.AddrUpdater() + if err != nil { + log.Errorf("Failed to update redis addresses: %v", err) + return err } } else if redisOptions != nil && o.SwarmRedisEndpointsRemoteURL != "" { log.Infof("Use remote address %s to fetch updates redis shards", o.SwarmRedisEndpointsRemoteURL) redisOptions.AddrUpdater = updateEndpointsFromURL(o.SwarmRedisEndpointsRemoteURL) _, err = redisOptions.AddrUpdater() if err != nil { - log.Errorf("Failed to update redis endpoints from URL %v", err) + log.Errorf("Failed to update redis endpoints from URL: %v", err) return err } } - } var ratelimitRegistry *ratelimit.Registry