Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataclients/kubernetes: change GetEndpointAddresses to return addresses without scheme and port #2917

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.19
v0.20
1 change: 1 addition & 0 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func (c *clusterClient) fetchClusterState() (*clusterState, error) {
routeGroups: routeGroups,
services: services,
cachedEndpoints: make(map[endpointID][]string),
cachedAddresses: make(map[definitions.ResourceID][]string),
enableEndpointSlices: c.enableEndpointSlices,
}

Expand Down
30 changes: 14 additions & 16 deletions dataclients/kubernetes/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type clusterState struct {
endpointSlices map[definitions.ResourceID]*skipperEndpointSlice
secrets map[definitions.ResourceID]*secret
cachedEndpoints map[endpointID][]string
cachedAddresses map[definitions.ResourceID][]string
enableEndpointSlices bool
}

Expand Down Expand Up @@ -83,38 +84,35 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin
return targets
}

// GetEndpointsByName returns the skipper endpoints for kubernetes endpoints or endpointslices.
// This function works only correctly for endpointslices (and likely endpoints) with one port with the same protocol ("TCP", "UDP").
func (state *clusterState) GetEndpointsByName(namespace, name, protocol, scheme string) []string {
epID := endpointID{
ResourceID: newResourceID(namespace, name),
Protocol: protocol,
}
Comment on lines -89 to -92
Copy link
Member Author

@AlexanderYastrebov AlexanderYastrebov Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was logically wrong because it does not contain target port unlike other places that use state.cachedEndpoints

// getEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices.
func (state *clusterState) getEndpointAddresses(namespace, name string) []string {
rID := newResourceID(namespace, name)

state.mu.Lock()
defer state.mu.Unlock()
if cached, ok := state.cachedEndpoints[epID]; ok {
if cached, ok := state.cachedAddresses[rID]; ok {
Copy link
Member Author

@AlexanderYastrebov AlexanderYastrebov Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can even drop the cache as implementation became straightforward now

return cached
}

var targets []string
var addresses []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
targets = eps.targets(protocol, scheme)
if eps, ok := state.endpointSlices[rID]; ok {
addresses = eps.addresses()
} else {
return nil
}
} else {
if ep, ok := state.endpoints[epID.ResourceID]; ok {
targets = ep.targets(scheme)
if ep, ok := state.endpoints[rID]; ok {
addresses = ep.addresses()
} else {
return nil
}
}

sort.Strings(targets)
state.cachedEndpoints[epID] = targets
return targets
sort.Strings(addresses)
state.cachedAddresses[rID] = addresses

return addresses
}

// GetEndpointsByTarget returns the skipper endpoints for kubernetes endpoints or endpointslices.
Expand Down
27 changes: 11 additions & 16 deletions dataclients/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ type endpointList struct {
Items []*endpoint `json:"items"`
}

func formatEndpointString(ip, protocol string, port int) string {
return protocol + "://" + net.JoinHostPort(ip, strconv.Itoa(port))
func formatEndpointString(ip, scheme string, port int) string {
Copy link
Member Author

@AlexanderYastrebov AlexanderYastrebov Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly required for this refactoring but I noticed the inconsistency and decided to fix it along.

return scheme + "://" + net.JoinHostPort(ip, strconv.Itoa(port))
}

func formatEndpoint(a *address, p *port, protocol string) string {
return formatEndpointString(a.IP, protocol, p.Port)
func formatEndpoint(a *address, p *port, scheme string) string {
return formatEndpointString(a.IP, scheme, p.Port)
}

func formatEndpointsForSubsetAddresses(addresses []*address, port *port, protocol string) []string {
func formatEndpointsForSubsetAddresses(addresses []*address, port *port, scheme string) []string {
result := make([]string, 0, len(addresses))
for _, address := range addresses {
result = append(result, formatEndpoint(address, port, protocol))
result = append(result, formatEndpoint(address, port, scheme))
}
return result
}
Expand All @@ -54,11 +54,10 @@ func (ep *endpoint) targetsByServicePort(protocol string, servicePort *servicePo
return formatEndpointsForSubsetAddresses(s.Addresses, p, protocol)
}
}

return nil
}

func (ep *endpoint) targetsByServiceTarget(protocol string, serviceTarget *definitions.BackendPort) []string {
func (ep *endpoint) targetsByServiceTarget(scheme string, serviceTarget *definitions.BackendPort) []string {
portName, named := serviceTarget.Value.(string)
portValue, byValue := serviceTarget.Value.(int)
for _, s := range ep.Subsets {
Expand All @@ -69,26 +68,22 @@ func (ep *endpoint) targetsByServiceTarget(protocol string, serviceTarget *defin

var result []string
for _, a := range s.Addresses {
result = append(result, formatEndpoint(a, p, protocol))
result = append(result, formatEndpoint(a, p, scheme))
}

return result
}
}

return nil
}

func (ep *endpoint) targets(protocol string) []string {
func (ep *endpoint) addresses() []string {
result := make([]string, 0)
for _, s := range ep.Subsets {
for _, p := range s.Ports {
for _, a := range s.Addresses {
result = append(result, formatEndpoint(a, p, protocol))
}
for _, a := range s.Addresses {
result = append(result, a.IP)
}
}

return result
}

Expand Down
13 changes: 2 additions & 11 deletions dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,11 @@ func (eps *skipperEndpointSlice) targetsByServiceTarget(protocol, scheme string,
return result
}

func (eps *skipperEndpointSlice) targets(protocol, scheme string) []string {
func (eps *skipperEndpointSlice) addresses() []string {
result := make([]string, 0, len(eps.Endpoints))

var port int
for _, p := range eps.Ports {
if p.Protocol == protocol {
AlexanderYastrebov marked this conversation as resolved.
Show resolved Hide resolved
port = p.Port
break
}
}
for _, ep := range eps.Endpoints {
result = append(result, formatEndpointString(ep.Address, scheme, port))
result = append(result, ep.Address)
}

return result
}

Expand Down
98 changes: 69 additions & 29 deletions dataclients/kubernetes/endpointslices_test.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,91 @@
package kubernetes

import (
"net/url"
"strconv"
"testing"

"github.com/zalando/skipper/dataclients/kubernetes/definitions"
"github.com/stretchr/testify/assert"
)

func TestTargets(t *testing.T) {
want := "http://10.0.0.1:8080"
u, err := url.Parse(want)
if err != nil {
t.Fatalf("Failed to parse: %v", err)
}
port, err := strconv.Atoi(u.Port())
if err != nil {
t.Fatalf("Failed to parse: %v", err)
}
func TestAddresses(t *testing.T) {
assert.Equal(t, []string{"10.0.0.1"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: "10.0.0.1",
Zone: "zone-1",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: 8080,
Protocol: "TCP",
},
},
}).addresses())

ses := &skipperEndpointSlice{
Meta: &definitions.Metadata{
Namespace: "ns1",
Name: "a-slice",
assert.Equal(t, []string{"10.0.0.1"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: "10.0.0.1",
Zone: "zone-1",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: 8080,
Protocol: "TCP",
},
{
Name: "support",
Port: 8081,
Protocol: "TCP",
},
},
}).addresses())

assert.Equal(t, []string{"10.0.0.1", "10.0.0.2"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: u.Hostname(),
Address: "10.0.0.1",
Zone: "zone-1",
},
{
Address: "10.0.0.2",
Zone: "zone-2",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: port,
Port: 8080,
Protocol: "TCP",
},
},
}
res := ses.targets("TCP", "http")
if l := len(res); l != 1 {
t.Fatalf("Failed to get same number of results than expected %d, got: %d", 1, l)
}
}).addresses())

for i := 0; i < len(res); i++ {
if want != res[i] {
t.Fatalf("Failed to get the right target: %s != %s", want, res[i])
}
}
assert.Equal(t, []string{"10.0.0.1", "10.0.0.2"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: "10.0.0.1",
Zone: "zone-1",
},
{
Address: "10.0.0.2",
Zone: "zone-2",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: 8080,
Protocol: "TCP",
},
{
Name: "support",
Port: 8081,
Protocol: "TCP",
},
},
}).addresses())
}
3 changes: 2 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,14 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters {
return filters
}

// GetEndpointAddresses returns the list of all addresses for the given service.
func (c *Client) GetEndpointAddresses(ns, name string) []string {
c.mu.Lock()
defer c.mu.Unlock()
if c.state == nil {
return nil
}
return c.state.GetEndpointsByName(ns, name, "TCP", "http")
return c.state.getEndpointAddresses(ns, name)
}

func compareStringList(a, b []string) []string {
Expand Down
5 changes: 2 additions & 3 deletions dataclients/kubernetes/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func TestGetEndpointAddresses(t *testing.T) {
ns := "namespace1"
name := "service1"
got := client.GetEndpointAddresses(ns, name)
expected := []string{"http://1.1.1.0:8080"}
expected := []string{"1.1.1.0"}
if len(got) != len(expected) {
t.Fatalf("Failed to get same size: %d != %d", len(expected), len(got))
}
Expand Down Expand Up @@ -646,7 +646,7 @@ func TestGetEndpointAddresses(t *testing.T) {
ns := "namespace1"
name := "service1"
got := client.GetEndpointAddresses(ns, name)
expected := []string{"http://1.1.1.0:8080"}
expected := []string{"1.1.1.0"}
if len(got) != len(expected) {
t.Fatalf("Failed to get same size: %d != %d", len(expected), len(got))
}
Expand All @@ -666,7 +666,6 @@ func TestGetEndpointAddresses(t *testing.T) {
t.Fatalf("Failed to get cached result expected %q, got %q", expected[i], got[i])
}
}

})
}

Expand Down
2 changes: 2 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") ->
KubernetesURL: lb.URL,
KubernetesRedisServiceNamespace: "skipper",
KubernetesRedisServiceName: "redis",
KubernetesRedisServicePort: 6379,
KubernetesHealthcheck: true,
SourcePollTimeout: 1500 * time.Millisecond,
}
Expand Down Expand Up @@ -355,6 +356,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") ->
KubernetesURL: lb.URL,
KubernetesRedisServiceNamespace: "skipper",
KubernetesRedisServiceName: "redis",
KubernetesRedisServicePort: 6379,
KubernetesHealthcheck: true,
SourcePollTimeout: 1500 * time.Millisecond,
WaitFirstRouteLoad: true,
Expand Down
16 changes: 10 additions & 6 deletions routesrv/redishandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package routesrv
import (
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"strconv"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper"
"github.com/zalando/skipper/dataclients/kubernetes"
"github.com/zalando/skipper/metrics"
)
Expand Down Expand Up @@ -38,16 +40,18 @@ func (rh *RedisHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Write(address)
}

func getRedisAddresses(namespace, name string, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) {
func getRedisAddresses(opts *skipper.Options, kdc *kubernetes.Client, m metrics.Metrics) func() ([]byte, error) {
return func() ([]byte, error) {
a := kdc.GetEndpointAddresses(namespace, name)
a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("Redis updater called and found %d redis endpoints: %v", len(a), a)
m.UpdateGauge("redis_endpoints", float64(len(a)))

result := RedisEndpoints{}
result := RedisEndpoints{
Endpoints: make([]RedisEndpoint, len(a)),
}
port := strconv.Itoa(opts.KubernetesRedisServicePort)
for i := 0; i < len(a); i++ {
a[i] = strings.TrimPrefix(a[i], "http://")
Copy link
Member Author

@AlexanderYastrebov AlexanderYastrebov Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result.Endpoints = append(result.Endpoints, RedisEndpoint{Address: a[i]})
result.Endpoints[i].Address = net.JoinHostPort(a[i], port)
}

data, err := json.Marshal(result)
Expand Down
2 changes: 1 addition & 1 deletion routesrv/routesrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func New(opts skipper.Options) (*RouteServer, error) {
if err != nil {
return nil, err
}
rh.AddrUpdater = getRedisAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName, dataclient, m)
rh.AddrUpdater = getRedisAddresses(&opts, dataclient, m)
mux.Handle("/swarm/redis/shards", rh)
}

Expand Down
Loading
Loading