Skip to content

Commit

Permalink
skipper: discover redis shards without fetching full cluster state
Browse files Browse the repository at this point in the history
Add LoadEndpointAddresses to kubernetes dataclient that is
similar to caching GetEndpointAddresses but does not rely on
previous call to Load or LoadAll to allow discovery of
redis shards without fetching full cluster state.

Fixes #2476

Signed-off-by: Alexander Yastrebov <alexander.yastrebov@zalando.de>
  • Loading branch information
AlexanderYastrebov committed Feb 14, 2024
1 parent ecb8c0b commit 7f9f986
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 104 deletions.
26 changes: 26 additions & 0 deletions dataclients/kubernetes/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,29 @@ func TestClientGetEndpointAddresses(t *testing.T) {
assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs)
})
}

func TestClientLoadEndpointAddresses(t *testing.T) {
t.Run("from endpoints", func(t *testing.T) {
client := newTestClient(t,
kubernetes.Options{},
"testdata/ingressV1/ingress-data/lb-target-multi.yaml",
)

addrs, err := client.LoadEndpointAddresses("namespace1", "service1")
assert.NoError(t, err)
assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs)
})

t.Run("from endpointslices", func(t *testing.T) {
client := newTestClient(t,
kubernetes.Options{
KubernetesEnableEndpointslices: true,
},
"testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-ready.yaml",
)

addrs, err := client.LoadEndpointAddresses("namespace1", "service1")
assert.NoError(t, err)
assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs)
})
}
38 changes: 38 additions & 0 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
}
log.Debugf("all endpointslices received: %d", len(endpointSlices.Items))

return mapEndpointSlices(&endpointSlices), nil
}

func mapEndpointSlices(endpointSlices *endpointSliceList) map[definitions.ResourceID]*skipperEndpointSlice {
mapSlices := make(map[definitions.ResourceID][]*endpointSlice)
for _, endpointSlice := range endpointSlices.Items {
resID := endpointSlice.ToResourceID() // service resource ID
Expand Down Expand Up @@ -550,6 +554,40 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
result[resID].Endpoints = append(result[resID].Endpoints, o)
}
}
return result
}

// LoadEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices API.
func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, error) {
var result []string
if c.enableEndpointSlices {
url := fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace) +
toLabelSelectorQuery(map[string]string{endpointSliceServiceNameLabel: name})

var endpointSlices endpointSliceList
if err := c.getJSON(url, &endpointSlices); err != nil {
return nil, fmt.Errorf("requesting endpointslices for %s/%s failed: %v", namespace, name, err)
}

mapped := mapEndpointSlices(&endpointSlices)
if len(mapped) != 1 {
return nil, fmt.Errorf("unexpected number of endpoint slices for %s/%s: %d", namespace, name, len(mapped))
}

for _, eps := range mapped {
result = eps.addresses()
break
}
} else {
url := fmt.Sprintf(EndpointsNamespaceFmt, namespace) + "/" + name

var ep endpoint
if err := c.getJSON(url, &ep); err != nil {
return nil, fmt.Errorf("requesting endpoints for %s/%s failed: %v", namespace, name, err)
}
result = ep.addresses()
}
sort.Strings(result)

return result, nil
}
Expand Down
4 changes: 3 additions & 1 deletion dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/zalando/skipper/dataclients/kubernetes/definitions"
)

const endpointSliceServiceNameLabel = "kubernetes.io/service-name"

// There are [1..N] Kubernetes endpointslices created for a single Kubernetes service.
// Kubernetes endpointslices of a given service can have duplicates with different states.
// Therefore Kubernetes endpointslices need to be de-duplicated before usage.
Expand Down Expand Up @@ -97,7 +99,7 @@ type endpointSlice struct {

// ToResourceID returns the same string for a group endpointlisces created for the same svc
func (eps *endpointSlice) ToResourceID() definitions.ResourceID {
svcName := eps.Meta.Labels["kubernetes.io/service-name"]
svcName := eps.Meta.Labels[endpointSliceServiceNameLabel]
namespace := eps.Meta.Namespace
return newResourceID(namespace, svcName)
}
Expand Down
8 changes: 7 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters {
return filters
}

// GetEndpointAddresses returns the list of all addresses for the given service.
// GetEndpointAddresses returns the list of all addresses for the given service
// loaded by previous call to LoadAll or LoadUpdate.
func (c *Client) GetEndpointAddresses(ns, name string) []string {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -584,6 +585,11 @@ func (c *Client) GetEndpointAddresses(ns, name string) []string {
return c.state.getEndpointAddresses(ns, name)
}

// LoadEndpointAddresses returns the list of all addresses for the given service.
func (c *Client) LoadEndpointAddresses(namespace, name string) ([]string, error) {
return c.ClusterClient.loadEndpointAddresses(namespace, name)
}

func compareStringList(a, b []string) []string {
c := make([]string, 0)
for i := len(a) - 1; i >= 0; i-- {
Expand Down
Loading

0 comments on commit 7f9f986

Please sign in to comment.