Skip to content

Commit

Permalink
Filter pods that don't belong to the service in question
Browse files Browse the repository at this point in the history
Filter pods that don't have labels match to its service label selector.
  • Loading branch information
sawsa307 committed Mar 15, 2023
1 parent e0a1233 commit 4d1964a
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 18 deletions.
38 changes: 33 additions & 5 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
apiv1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -292,7 +293,7 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.
}

func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter,
podLister, nodeLister cache.Indexer, servicePortName string,
podLister, nodeLister, serviceLister cache.Indexer, servicePortName string,
networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
targetMap := map[string]negtypes.NetworkEndpointSet{}
endpointPodMap := negtypes.EndpointPodMap{}
Expand All @@ -308,19 +309,21 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
if len(matchPort) == 0 {
continue
}
serviceName := ed.Meta.Labels[discovery.LabelServiceName]
isCustomEPS := ed.Meta.Labels[discovery.LabelManagedBy] != "endpointslice-controller.k8s.io"
for _, endpointAddress := range ed.Addresses {
if endpointAddress.TargetRef == nil {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
}
dupCount += validateAndAddEndpoints(endpointAddress, zoneGetter, podLister, nodeLister, matchPort, networkEndpointType, targetMap, endpointPodMap)
dupCount += validateAndAddEndpoints(endpointAddress, zoneGetter, podLister, nodeLister, serviceLister, matchPort, serviceName, networkEndpointType, targetMap, endpointPodMap, isCustomEPS)
}
}
return targetMap, endpointPodMap, dupCount, nil
}

// validateAndAddEndpoints fills in missing information and creates network endpoint for each endpoint addresss
func validateAndAddEndpoints(ep negtypes.AddressData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, matchPort string, endpointType negtypes.NetworkEndpointType, targetMap map[string]negtypes.NetworkEndpointSet, endpointPodMap negtypes.EndpointPodMap) int {
func validateAndAddEndpoints(ep negtypes.AddressData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister, serviceLister cache.Indexer, matchPort, serviceName string, endpointType negtypes.NetworkEndpointType, targetMap map[string]negtypes.NetworkEndpointSet, endpointPodMap negtypes.EndpointPodMap, isCustomEPS bool) int {
var dupCount int
for _, address := range ep.Addresses {
key := fmt.Sprintf("%s/%s", ep.TargetRef.Namespace, ep.TargetRef.Name)
Expand All @@ -340,7 +343,7 @@ func validateAndAddEndpoints(ep negtypes.AddressData, zoneGetter negtypes.ZoneGe
klog.V(2).Infof("Endpoint %q does not have an address %v that matches to the IP(s) of its pod")
continue
}
if !validatePod(pod, nodeLister) {
if !validatePod(pod, nodeLister, serviceLister, serviceName, isCustomEPS) {
klog.V(2).Infof("Endpoint %q does not correspond to a valid pod resource. Skipping", address)
continue
}
Expand Down Expand Up @@ -374,7 +377,8 @@ func validateAndAddEndpoints(ep negtypes.AddressData, zoneGetter negtypes.ZoneGe
// 1. is in terminal state
// 2. corresponds to a non-existent node
// 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) bool {
// 4. does not belong to the service in question
func validatePod(pod *apiv1.Pod, nodeLister, serviceLister cache.Indexer, serviceName string, isCustomEPS bool) bool {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
Expand All @@ -395,6 +399,19 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) bool {
klog.V(2).Info("Pod %s/%s has an IP %v that is outside of the node's allocated IP range(s) %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.PodIP, node.Spec.PodCIDRs)
return false
}
service := getService(serviceLister, pod.ObjectMeta.Namespace, serviceName)
if service == nil {
klog.V(2).Info("Endpoint does not correspond to an existing service %s, skipping", serviceName)
return false
}
// for custom endpoint slice, we won't check the pod's labels
if isCustomEPS {
return true
}
if !podBelongsToService(pod, service) {
klog.V(2).Info("Pod %s/%s labels %v does not match service %s/%s selector %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Labels, service.Namespace, service.Name, service.Spec.Selector)
return false
}
return true
}

Expand Down Expand Up @@ -436,6 +453,17 @@ func nodeContainsPodIP(node *apiv1.Node, pod *apiv1.Pod) bool {
return false
}

func podBelongsToService(pod *apiv1.Pod, service *apiv1.Service) bool {
podLabels := pod.ObjectMeta.Labels
serviceLabels := service.Spec.Selector
for key, val1 := range serviceLabels {
if val2, contains := podLabels[key]; !contains || val1 != val2 {
return false
}
}
return true
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, error) {
// Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates.
Expand Down
153 changes: 140 additions & 13 deletions pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,19 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) {
},
})
}
testLabels1 := map[string]string{
"run": "foo",
}
serviceLister := testContext.ServiceInformer.GetIndexer()
serviceLister.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testService,
},
Spec: v1.ServiceSpec{
Selector: testLabels1,
},
})

testNonExistPort := "non-exists"
testEmptyNamedPort := ""
Expand Down Expand Up @@ -1303,7 +1316,7 @@ func TestToZoneNetworkEndpointMapDegradedMode(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
targetMap, endpointPodMap, _, err := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(testEndpointSlices), fakeZoneGetter, podLister, nodeLister, tc.portName, tc.networkEndpointType)
targetMap, endpointPodMap, _, err := toZoneNetworkEndpointMapDegradedMode(negtypes.EndpointsDataFromEndpointSlices(testEndpointSlices), fakeZoneGetter, podLister, nodeLister, serviceLister, tc.portName, tc.networkEndpointType)
if err != nil {
t.Errorf("expected error=nil, but got %v", err)
}
Expand Down Expand Up @@ -1332,13 +1345,17 @@ func TestValidateAndAddEndpoints(t *testing.T) {
networkEndpointFromEncodedEndpoint("10.100.1.1||instance1||80"): types.NamespacedName{Namespace: testNamespace, Name: "pod1"},
}

testLabels1 := map[string]string{
"run": "foo",
}
fakeZoneGetter := negtypes.NewFakeZoneGetter()
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
podLister.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "pod1",
Labels: testLabels1,
},
Spec: v1.PodSpec{
NodeName: testInstance1,
Expand All @@ -1364,6 +1381,17 @@ func TestValidateAndAddEndpoints(t *testing.T) {
},
})

serviceLister := testContext.ServiceInformer.GetIndexer()
serviceLister.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testService,
},
Spec: v1.ServiceSpec{
Selector: testLabels1,
},
})

testCases := []struct {
desc string
ep negtypes.AddressData
Expand Down Expand Up @@ -1491,7 +1519,7 @@ func TestValidateAndAddEndpoints(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
targetMap := map[string]negtypes.NetworkEndpointSet{}
endpointPodMap := negtypes.EndpointPodMap{}
validateAndAddEndpoints(tc.ep, fakeZoneGetter, podLister, nodeLister, matchPort, tc.endpointType, targetMap, endpointPodMap)
validateAndAddEndpoints(tc.ep, fakeZoneGetter, podLister, nodeLister, serviceLister, matchPort, testService, tc.endpointType, targetMap, endpointPodMap, false)

if !reflect.DeepEqual(targetMap, tc.expectedEndpointMap) {
t.Errorf("degraded mode endpoint set is not calculated correctly:\ngot %+v,\n expected %+v", targetMap, tc.expectedEndpointMap)
Expand Down Expand Up @@ -1522,17 +1550,38 @@ func TestValidatePod(t *testing.T) {
PodCIDRs: []string{"2001:db8::/48", "10.100.1.0/24"},
},
})
testLabels1 := map[string]string{
"run": "foo",
}
testLabels2 := map[string]string{
"run": "bar",
}

serviceLister := testContext.ServiceInformer.GetIndexer()
serviceLister.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: testService,
},
Spec: v1.ServiceSpec{
Selector: testLabels1,
},
})
testServiceNameNotFound := "foo"
testCases := []struct {
desc string
pod *v1.Pod
expect bool
desc string
pod *v1.Pod
serviceName string
isCustomEPS bool
expect bool
}{
{
desc: "a valid pod with IPv4 address and phase running",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "pod1",
Labels: testLabels1,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Expand All @@ -1542,7 +1591,9 @@ func TestValidatePod(t *testing.T) {
NodeName: instance1,
},
},
expect: true,
serviceName: testService,
isCustomEPS: false,
expect: true,
},
{
desc: "a valid pod with IPv6 address and phase running",
Expand All @@ -1559,7 +1610,9 @@ func TestValidatePod(t *testing.T) {
NodeName: instance1,
},
},
expect: true,
serviceName: testService,
isCustomEPS: false,
expect: true,
},
{
desc: "a terminal pod with phase failed",
Expand All @@ -1576,7 +1629,9 @@ func TestValidatePod(t *testing.T) {
NodeName: instance1,
},
},
expect: false,
serviceName: testService,
isCustomEPS: false,
expect: false,
},
{
desc: "a terminal pod with phase succeeded",
Expand All @@ -1593,7 +1648,9 @@ func TestValidatePod(t *testing.T) {
NodeName: instance1,
},
},
expect: false,
serviceName: testService,
isCustomEPS: false,
expect: false,
},
{
desc: "a pod from non-existent node",
Expand All @@ -1610,7 +1667,9 @@ func TestValidatePod(t *testing.T) {
NodeName: testNodeNonExistent,
},
},
expect: false,
serviceName: testService,
isCustomEPS: false,
expect: false,
},
{
desc: "a pod with IPv4 IP adress outside of the node's allocated pod range",
Expand All @@ -1627,7 +1686,9 @@ func TestValidatePod(t *testing.T) {
NodeName: instance1,
},
},
expect: false,
serviceName: testService,
isCustomEPS: false,
expect: false,
},
{
desc: "a pod with IPv6 IP address outside of the node's allocated pod range",
Expand All @@ -1644,19 +1705,84 @@ func TestValidatePod(t *testing.T) {
NodeName: instance1,
},
},
expect: false,
serviceName: testService,
isCustomEPS: false,
expect: false,
},
{
desc: "a pod with non-existing service name",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "pod6",
Labels: testLabels2,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: testPodIPv4,
},
Spec: v1.PodSpec{
NodeName: instance1,
},
},
serviceName: testServiceNameNotFound,
isCustomEPS: false,
expect: false,
},
{
desc: "a pod referenced by non-custom endpoint slice, with labels not matching to service's label selector ",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "pod7",
Labels: testLabels2,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: testPodIPv4,
},
Spec: v1.PodSpec{
NodeName: instance1,
},
},
serviceName: testService,
isCustomEPS: false,
expect: false,
},
{
desc: "a pod referenced by a custom endpoint slice, with labels not matching to service's label selector",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "pod7",
Labels: testLabels2,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: testPodIPv4,
},
Spec: v1.PodSpec{
NodeName: instance1,
},
},
serviceName: testService,
isCustomEPS: true, // for custom endpoint slice, we won't check the pod's labels
expect: true,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
if got := validatePod(tc.pod, nodeLister); got != tc.expect {
if got := validatePod(tc.pod, nodeLister, serviceLister, testService, tc.isCustomEPS); got != tc.expect {
t.Errorf("validatePod() = %t, expected %t", got, tc.expect)
}
})
}
}

func addPodsToLister(podLister cache.Indexer, endpointSlices []*discovery.EndpointSlice) {
testLabels1 := map[string]string{
"run": "foo",
}
for _, eps := range endpointSlices {
addressType := eps.AddressType
for _, ep := range eps.Endpoints {
Expand All @@ -1675,6 +1801,7 @@ func addPodsToLister(podLister cache.Indexer, endpointSlices []*discovery.Endpoi
ObjectMeta: metav1.ObjectMeta{
Namespace: podNamespace,
Name: podName,
Labels: testLabels1,
},
Spec: v1.PodSpec{
NodeName: *ep.NodeName,
Expand Down

0 comments on commit 4d1964a

Please sign in to comment.