From d05dd7bfc36ff62eb252b1624a3871ca3a5b3b5a Mon Sep 17 00:00:00 2001 From: Alexander Yastrebov Date: Thu, 8 Feb 2024 16:41:27 +0100 Subject: [PATCH] dataclients/kubernetes: change GetEndpointAddresses to return addresses without scheme and port * use -kubernetes-redis-service-port flag to explicitly configure redis port * use separate cluster state cache for addresses since cache key does not contain port * rename helpers protocol argument to scheme This is a breaking change for users that: * use kubernetes dataclient GetEndpointAddresses method * use dynamic redis shards discovery AND use non-standard redis port AND do not specify -kubernetes-redis-service-port For #2476 Signed-off-by: Alexander Yastrebov --- VERSION | 2 +- dataclients/kubernetes/clusterclient.go | 1 + dataclients/kubernetes/clusterstate.go | 30 +++--- dataclients/kubernetes/endpoints.go | 27 +++-- dataclients/kubernetes/endpointslices.go | 13 +-- dataclients/kubernetes/endpointslices_test.go | 98 +++++++++++++------ dataclients/kubernetes/kube.go | 3 +- dataclients/kubernetes/kube_test.go | 5 +- redis_test.go | 2 + routesrv/redishandler.go | 16 +-- routesrv/routesrv.go | 2 +- routesrv/routesrv_test.go | 2 + skipper.go | 12 ++- 13 files changed, 124 insertions(+), 89 deletions(-) diff --git a/VERSION b/VERSION index a8b26f57b4..ef27085c8a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v0.19 +v0.20 diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index f09684910d..d4e069ec9c 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -595,6 +595,7 @@ func (c *clusterClient) fetchClusterState() (*clusterState, error) { routeGroups: routeGroups, services: services, cachedEndpoints: make(map[endpointID][]string), + cachedAddresses: make(map[definitions.ResourceID][]string), enableEndpointSlices: c.enableEndpointSlices, } diff --git a/dataclients/kubernetes/clusterstate.go b/dataclients/kubernetes/clusterstate.go index f12cf82d84..4c21db091b 100644 --- a/dataclients/kubernetes/clusterstate.go +++ b/dataclients/kubernetes/clusterstate.go @@ -19,6 +19,7 @@ type clusterState struct { endpointSlices map[definitions.ResourceID]*skipperEndpointSlice secrets map[definitions.ResourceID]*secret cachedEndpoints map[endpointID][]string + cachedAddresses map[definitions.ResourceID][]string enableEndpointSlices bool } @@ -83,38 +84,35 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin return targets } -// GetEndpointsByName returns the skipper endpoints for kubernetes endpoints or endpointslices. -// This function works only correctly for endpointslices (and likely endpoints) with one port with the same protocol ("TCP", "UDP"). -func (state *clusterState) GetEndpointsByName(namespace, name, protocol, scheme string) []string { - epID := endpointID{ - ResourceID: newResourceID(namespace, name), - Protocol: protocol, - } +// getEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices. +func (state *clusterState) getEndpointAddresses(namespace, name string) []string { + rID := newResourceID(namespace, name) + state.mu.Lock() defer state.mu.Unlock() - if cached, ok := state.cachedEndpoints[epID]; ok { + if cached, ok := state.cachedAddresses[rID]; ok { return cached } - var targets []string + var addresses []string if state.enableEndpointSlices { - if eps, ok := state.endpointSlices[epID.ResourceID]; ok { - targets = eps.targets(protocol, scheme) + if eps, ok := state.endpointSlices[rID]; ok { + addresses = eps.addresses() } else { return nil } } else { - if ep, ok := state.endpoints[epID.ResourceID]; ok { - targets = ep.targets(scheme) + if ep, ok := state.endpoints[rID]; ok { + addresses = ep.addresses() } else { return nil } } - sort.Strings(targets) - state.cachedEndpoints[epID] = targets - return targets + sort.Strings(addresses) + state.cachedAddresses[rID] = addresses + return addresses } // GetEndpointsByTarget returns the skipper endpoints for kubernetes endpoints or endpointslices. diff --git a/dataclients/kubernetes/endpoints.go b/dataclients/kubernetes/endpoints.go index b293b41fed..292acbf196 100644 --- a/dataclients/kubernetes/endpoints.go +++ b/dataclients/kubernetes/endpoints.go @@ -22,18 +22,18 @@ type endpointList struct { Items []*endpoint `json:"items"` } -func formatEndpointString(ip, protocol string, port int) string { - return protocol + "://" + net.JoinHostPort(ip, strconv.Itoa(port)) +func formatEndpointString(ip, scheme string, port int) string { + return scheme + "://" + net.JoinHostPort(ip, strconv.Itoa(port)) } -func formatEndpoint(a *address, p *port, protocol string) string { - return formatEndpointString(a.IP, protocol, p.Port) +func formatEndpoint(a *address, p *port, scheme string) string { + return formatEndpointString(a.IP, scheme, p.Port) } -func formatEndpointsForSubsetAddresses(addresses []*address, port *port, protocol string) []string { +func formatEndpointsForSubsetAddresses(addresses []*address, port *port, scheme string) []string { result := make([]string, 0, len(addresses)) for _, address := range addresses { - result = append(result, formatEndpoint(address, port, protocol)) + result = append(result, formatEndpoint(address, port, scheme)) } return result } @@ -54,11 +54,10 @@ func (ep *endpoint) targetsByServicePort(protocol string, servicePort *servicePo return formatEndpointsForSubsetAddresses(s.Addresses, p, protocol) } } - return nil } -func (ep *endpoint) targetsByServiceTarget(protocol string, serviceTarget *definitions.BackendPort) []string { +func (ep *endpoint) targetsByServiceTarget(scheme string, serviceTarget *definitions.BackendPort) []string { portName, named := serviceTarget.Value.(string) portValue, byValue := serviceTarget.Value.(int) for _, s := range ep.Subsets { @@ -69,26 +68,22 @@ func (ep *endpoint) targetsByServiceTarget(protocol string, serviceTarget *defin var result []string for _, a := range s.Addresses { - result = append(result, formatEndpoint(a, p, protocol)) + result = append(result, formatEndpoint(a, p, scheme)) } return result } } - return nil } -func (ep *endpoint) targets(protocol string) []string { +func (ep *endpoint) addresses() []string { result := make([]string, 0) for _, s := range ep.Subsets { - for _, p := range s.Ports { - for _, a := range s.Addresses { - result = append(result, formatEndpoint(a, p, protocol)) - } + for _, a := range s.Addresses { + result = append(result, a.IP) } } - return result } diff --git a/dataclients/kubernetes/endpointslices.go b/dataclients/kubernetes/endpointslices.go index 18e460fb87..436c3e4a42 100644 --- a/dataclients/kubernetes/endpointslices.go +++ b/dataclients/kubernetes/endpointslices.go @@ -74,20 +74,11 @@ func (eps *skipperEndpointSlice) targetsByServiceTarget(protocol, scheme string, return result } -func (eps *skipperEndpointSlice) targets(protocol, scheme string) []string { +func (eps *skipperEndpointSlice) addresses() []string { result := make([]string, 0, len(eps.Endpoints)) - - var port int - for _, p := range eps.Ports { - if p.Protocol == protocol { - port = p.Port - break - } - } for _, ep := range eps.Endpoints { - result = append(result, formatEndpointString(ep.Address, scheme, port)) + result = append(result, ep.Address) } - return result } diff --git a/dataclients/kubernetes/endpointslices_test.go b/dataclients/kubernetes/endpointslices_test.go index 22b50e567d..d95215cc87 100644 --- a/dataclients/kubernetes/endpointslices_test.go +++ b/dataclients/kubernetes/endpointslices_test.go @@ -1,51 +1,91 @@ package kubernetes import ( - "net/url" - "strconv" "testing" - "github.com/zalando/skipper/dataclients/kubernetes/definitions" + "github.com/stretchr/testify/assert" ) -func TestTargets(t *testing.T) { - want := "http://10.0.0.1:8080" - u, err := url.Parse(want) - if err != nil { - t.Fatalf("Failed to parse: %v", err) - } - port, err := strconv.Atoi(u.Port()) - if err != nil { - t.Fatalf("Failed to parse: %v", err) - } +func TestAddresses(t *testing.T) { + assert.Equal(t, []string{"10.0.0.1"}, (&skipperEndpointSlice{ + Endpoints: []*skipperEndpoint{ + { + Address: "10.0.0.1", + Zone: "zone-1", + }, + }, + Ports: []*endpointSlicePort{ + { + Name: "main", + Port: 8080, + Protocol: "TCP", + }, + }, + }).addresses()) - ses := &skipperEndpointSlice{ - Meta: &definitions.Metadata{ - Namespace: "ns1", - Name: "a-slice", + assert.Equal(t, []string{"10.0.0.1"}, (&skipperEndpointSlice{ + Endpoints: []*skipperEndpoint{ + { + Address: "10.0.0.1", + Zone: "zone-1", + }, }, + Ports: []*endpointSlicePort{ + { + Name: "main", + Port: 8080, + Protocol: "TCP", + }, + { + Name: "support", + Port: 8081, + Protocol: "TCP", + }, + }, + }).addresses()) + + assert.Equal(t, []string{"10.0.0.1", "10.0.0.2"}, (&skipperEndpointSlice{ Endpoints: []*skipperEndpoint{ { - Address: u.Hostname(), + Address: "10.0.0.1", Zone: "zone-1", }, + { + Address: "10.0.0.2", + Zone: "zone-2", + }, }, Ports: []*endpointSlicePort{ { Name: "main", - Port: port, + Port: 8080, Protocol: "TCP", }, }, - } - res := ses.targets("TCP", "http") - if l := len(res); l != 1 { - t.Fatalf("Failed to get same number of results than expected %d, got: %d", 1, l) - } + }).addresses()) - for i := 0; i < len(res); i++ { - if want != res[i] { - t.Fatalf("Failed to get the right target: %s != %s", want, res[i]) - } - } + assert.Equal(t, []string{"10.0.0.1", "10.0.0.2"}, (&skipperEndpointSlice{ + Endpoints: []*skipperEndpoint{ + { + Address: "10.0.0.1", + Zone: "zone-1", + }, + { + Address: "10.0.0.2", + Zone: "zone-2", + }, + }, + Ports: []*endpointSlicePort{ + { + Name: "main", + Port: 8080, + Protocol: "TCP", + }, + { + Name: "support", + Port: 8081, + Protocol: "TCP", + }, + }, + }).addresses()) } diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 3500c71de0..7810fe7486 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -574,13 +574,14 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters { return filters } +// GetEndpointAddresses returns the list of all addresses for the given service. func (c *Client) GetEndpointAddresses(ns, name string) []string { c.mu.Lock() defer c.mu.Unlock() if c.state == nil { return nil } - return c.state.GetEndpointsByName(ns, name, "TCP", "http") + return c.state.getEndpointAddresses(ns, name) } func compareStringList(a, b []string) []string { diff --git a/dataclients/kubernetes/kube_test.go b/dataclients/kubernetes/kube_test.go index c11a8d15a2..74fd4d5fec 100644 --- a/dataclients/kubernetes/kube_test.go +++ b/dataclients/kubernetes/kube_test.go @@ -599,7 +599,7 @@ func TestGetEndpointAddresses(t *testing.T) { ns := "namespace1" name := "service1" got := client.GetEndpointAddresses(ns, name) - expected := []string{"http://1.1.1.0:8080"} + expected := []string{"1.1.1.0"} if len(got) != len(expected) { t.Fatalf("Failed to get same size: %d != %d", len(expected), len(got)) } @@ -646,7 +646,7 @@ func TestGetEndpointAddresses(t *testing.T) { ns := "namespace1" name := "service1" got := client.GetEndpointAddresses(ns, name) - expected := []string{"http://1.1.1.0:8080"} + expected := []string{"1.1.1.0"} if len(got) != len(expected) { t.Fatalf("Failed to get same size: %d != %d", len(expected), len(got)) } @@ -666,7 +666,6 @@ func TestGetEndpointAddresses(t *testing.T) { t.Fatalf("Failed to get cached result expected %q, got %q", expected[i], got[i]) } } - }) } diff --git a/redis_test.go b/redis_test.go index f42ef2bfdb..b4bff88256 100644 --- a/redis_test.go +++ b/redis_test.go @@ -153,6 +153,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> KubernetesURL: lb.URL, KubernetesRedisServiceNamespace: "skipper", KubernetesRedisServiceName: "redis", + KubernetesRedisServicePort: 6379, KubernetesHealthcheck: true, SourcePollTimeout: 1500 * time.Millisecond, } @@ -355,6 +356,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> KubernetesURL: lb.URL, KubernetesRedisServiceNamespace: "skipper", KubernetesRedisServiceName: "redis", + KubernetesRedisServicePort: 6379, KubernetesHealthcheck: true, SourcePollTimeout: 1500 * time.Millisecond, WaitFirstRouteLoad: true, diff --git a/routesrv/redishandler.go b/routesrv/redishandler.go index 45ad66e715..8caec16a6e 100644 --- a/routesrv/redishandler.go +++ b/routesrv/redishandler.go @@ -3,10 +3,12 @@ package routesrv import ( "encoding/json" "fmt" + "net" "net/http" - "strings" + "strconv" log "github.com/sirupsen/logrus" + "github.com/zalando/skipper" "github.com/zalando/skipper/dataclients/kubernetes" "github.com/zalando/skipper/metrics" ) @@ -38,16 +40,18 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write(address) } -func getRedisAddresses(namespace, name string, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) { +func getRedisAddresses(opts *skipper.Options, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) { return func() ([]byte, error) { - a := kdc.GetEndpointAddresses(namespace, name) + a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a) m.UpdateGauge("redis_endpoints", float64(len(a))) - result := RedisEndpoints{} + result := RedisEndpoints{ + Endpoints: make([]RedisEndpoint, len(a)), + } + port := strconv.Itoa(opts.KubernetesRedisServicePort) for i := 0; i < len(a); i++ { - a[i] = strings.TrimPrefix(a[i], "http://") - result.Endpoints = append(result.Endpoints, RedisEndpoint{Address: a[i]}) + result.Endpoints[i].Address = net.JoinHostPort(a[i], port) } data, err := json.Marshal(result) diff --git a/routesrv/routesrv.go b/routesrv/routesrv.go index a4a6aeb1a7..f1cf459aaa 100644 --- a/routesrv/routesrv.go +++ b/routesrv/routesrv.go @@ -104,7 +104,7 @@ func New(opts skipper.Options) (*RouteServer, error) { if err != nil { return nil, err } - rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient, m) + rh.AddrUpdater = getRedisAddresses(&opts, dataclient, m) mux.Handle("/swarm/redis/shards", rh) } diff --git a/routesrv/routesrv_test.go b/routesrv/routesrv_test.go index 0326932519..a85e78720e 100644 --- a/routesrv/routesrv_test.go +++ b/routesrv/routesrv_test.go @@ -252,6 +252,7 @@ func TestRedisEndpointSlices(t *testing.T) { KubernetesURL: ks.URL, KubernetesRedisServiceNamespace: "namespace1", KubernetesRedisServiceName: "service1", + KubernetesRedisServicePort: 6379, KubernetesEnableEndpointslices: true, }) @@ -274,6 +275,7 @@ func TestRedisEndpoints(t *testing.T) { KubernetesURL: ks.URL, KubernetesRedisServiceNamespace: "namespace1", KubernetesRedisServiceName: "service1", + KubernetesRedisServicePort: 6379, }) w := getRedisURLs(rs) diff --git a/skipper.go b/skipper.go index f409012a9b..95f1d5a2f0 100644 --- a/skipper.go +++ b/skipper.go @@ -279,7 +279,7 @@ type Options struct { // KubernetesRedisServiceName to be used to lookup ring shards dynamically KubernetesRedisServiceName string - // *DEPRECATED* KubernetesRedisServicePort is not used anymore + // KubernetesRedisServicePort to be used to lookup ring shards dynamically KubernetesRedisServicePort int // KubernetesForceService overrides the default Skipper functionality to route traffic using Kubernetes Endpoints, @@ -1357,15 +1357,17 @@ func findKubernetesDataclient(dataClients []routing.DataClient) *kubernetes.Clie return kdc } -func getRedisUpdaterFunc(namespace, name string, kdc *kubernetes.Client) func() ([]string, error) { +func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client) func() ([]string, error) { // TODO(sszuecs): make sure kubernetes dataclient is already initialized and // has polled the data once or kdc.GetEndpointAdresses should be blocking // call to kubernetes API return func() ([]string, error) { - a := kdc.GetEndpointAddresses(namespace, name) + a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) log.Debugf("Redis updater called and found %d redis endpoints", len(a)) + + port := strconv.Itoa(opts.KubernetesRedisServicePort) for i := 0; i < len(a); i++ { - a[i] = strings.TrimPrefix(a[i], "http://") + a[i] = net.JoinHostPort(a[i], port) } return a, nil } @@ -1686,7 +1688,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { kdc := findKubernetesDataclient(dataClients) if kdc != nil { - redisOptions.AddrUpdater = getRedisUpdaterFunc(o.KubernetesRedisServiceNamespace, o.KubernetesRedisServiceName, kdc) + redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc) _, err = redisOptions.AddrUpdater() if err != nil { log.Errorf("Failed to update redis address %v", err)