Skip to content

Commit

Permalink
skipper: use kubernetes cluster client to discover redis shards
Browse files Browse the repository at this point in the history
Fixes #2476

Signed-off-by: Alexander Yastrebov <alexander.yastrebov@zalando.de>
  • Loading branch information
AlexanderYastrebov committed Feb 12, 2024
1 parent f919253 commit f56d901
Showing 1 changed file with 33 additions and 13 deletions.
46 changes: 33 additions & 13 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,20 +1357,33 @@ func findKubernetesDataclient(dataClients []routing.DataClient) *kubernetes.Clie
return kdc
}

func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client) func() ([]string, error) {
func getKubernetesDataclientRedisUpdater(opts *Options, kdc *kubernetes.Client) func() ([]string, error) {
// TODO(sszuecs): make sure kubernetes dataclient is already initialized and
// has polled the data once or kdc.GetEndpointAdresses should be blocking
// call to kubernetes API
return func() ([]string, error) {
a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("Redis updater called and found %d redis endpoints", len(a))

port := strconv.Itoa(opts.KubernetesRedisServicePort)
for i := 0; i < len(a); i++ {
a[i] = net.JoinHostPort(a[i], port)
}
return a, nil
return joinPort(a, opts.KubernetesRedisServicePort), nil
}
}

func getKubernetesClusterClientRedisUpdater(opts *Options, kc *kubernetes.ClusterClient) func() ([]string, error) {
return func() ([]string, error) {
a, err := kc.LoadEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("LoadEndpointAddresses found %d redis endpoints, err: %v", len(a), err)

return joinPort(a, opts.KubernetesRedisServicePort), err
}
}

func joinPort(addrs []string, port int) []string {
p := strconv.Itoa(port)
for i := 0; i < len(addrs); i++ {
addrs[i] = net.JoinHostPort(addrs[i], p)
}
return addrs
}

type RedisEndpoint struct {
Expand Down Expand Up @@ -1688,25 +1701,32 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {

kdc := findKubernetesDataclient(dataClients)
if kdc != nil {
redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc)
_, err = redisOptions.AddrUpdater()
redisOptions.AddrUpdater = getKubernetesDataclientRedisUpdater(&o, kdc)
} else {
cc, err := kubernetes.NewClusterClient(o.KubernetesDataClientOptions())
if err != nil {
log.Errorf("Failed to update redis address %v", err)
log.Errorf("Failed to create kubernetes cluster client: %v", err)
return err
}
} else {
log.Errorf("Failed to find kubernetes dataclient, but redis shards should be get by kubernetes svc %s/%s", o.KubernetesRedisServiceNamespace, o.KubernetesRedisServiceName)
defer cc.Close()

redisOptions.AddrUpdater = getKubernetesClusterClientRedisUpdater(&o, cc)
}

_, err = redisOptions.AddrUpdater()
if err != nil {
log.Errorf("Failed to update redis addresses: %v", err)
return err
}
} else if redisOptions != nil && o.SwarmRedisEndpointsRemoteURL != "" {
log.Infof("Use remote address %s to fetch updates redis shards", o.SwarmRedisEndpointsRemoteURL)
redisOptions.AddrUpdater = updateEndpointsFromURL(o.SwarmRedisEndpointsRemoteURL)
_, err = redisOptions.AddrUpdater()
if err != nil {
log.Errorf("Failed to update redis endpoints from URL %v", err)
log.Errorf("Failed to update redis endpoints from URL: %v", err)
return err
}
}

}

var ratelimitRegistry *ratelimit.Registry
Expand Down

0 comments on commit f56d901

Please sign in to comment.