Skip to content

Commit

Permalink
Filter pods that don't belong to the service in question
Browse files Browse the repository at this point in the history
Filter pods that don't have labels match to its service label selector.
  • Loading branch information
sawsa307 committed Apr 26, 2023
1 parent d202d16 commit daf13f4
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 15 deletions.
3 changes: 2 additions & 1 deletion pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg

// determine the implementation that calculates NEG endpoints on each sync.
epc := negsyncer.GetEndpointsCalculator(
manager.nodeLister,
manager.podLister,
manager.nodeLister,
manager.serviceLister,
manager.zoneGetter,
syncerKey,
portInfo.EpCalculatorMode,
Expand Down
6 changes: 4 additions & 2 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,19 @@ type L7EndpointsCalculator struct {
servicePortName string
podLister cache.Indexer
nodeLister cache.Indexer
serviceLister cache.Indexer
networkEndpointType types.NetworkEndpointType
enableDualStackNEG bool
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister, serviceLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
nodeLister: nodeLister,
serviceLister: serviceLister,
networkEndpointType: endpointType,
enableDualStackNEG: enableDualStackNEG,
logger: logger.WithName("L7EndpointsCalculator"),
Expand All @@ -219,7 +221,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.serviceLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func TestValidateEndpoints(t *testing.T) {
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
serviceLister := testContext.ServiceInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, serviceLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())

Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewTransactionSyncer(
return syncer
}

func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator {
func GetEndpointsCalculator(podLister, nodeLister, serviceLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator {
serviceKey := strings.Join([]string{syncerKey.Name, syncerKey.Namespace}, "/")
if syncerKey.NegType == negtypes.VmIpEndpointType {
nodeLister := listers.NewNodeLister(nodeLister)
Expand All @@ -195,6 +195,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
zoneGetter,
podLister,
nodeLister,
serviceLister,
syncerKey.PortTuple.Name,
syncerKey.NegType,
logger,
Expand Down
17 changes: 15 additions & 2 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,18 @@ func TestEnableDegradedMode(t *testing.T) {
},
})
}
testLabels := map[string]string{
"run": "foo",
} // this should match to pod labels
s.serviceLister.Add(&corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testServiceNamespace,
Name: testServiceName,
},
Spec: corev1.ServiceSpec{
Selector: testLabels,
},
})
for _, eps := range tc.testEndpointSlices {
s.endpointSliceLister.Add(eps)
}
Expand Down Expand Up @@ -2080,7 +2092,7 @@ func TestCollectLabelStats(t *testing.T) {

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.podLister, ts.nodeLister, ts.serviceLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false)
return negsyncer, ts
}

Expand Down Expand Up @@ -2121,7 +2133,8 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp
testContext.NodeInformer.GetIndexer(),
testContext.SvcNegInformer.GetIndexer(),
reflector,
GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG),
GetEndpointsCalculator(testContext.PodInformer.GetIndexer(), testContext.NodeInformer.GetIndexer(), testContext.ServiceInformer.GetIndexer(),
fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG),
string(kubeSystemUID),
testContext.SvcNegClient,
metrics.FakeSyncerMetrics(),
Expand Down
32 changes: 29 additions & 3 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func getEndpointPod(

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
// we will not raise error in degraded mode for misconfigured endpoints, instead they will be filtered directly
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister, serviceLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
Expand All @@ -387,6 +387,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
if len(matchPort) == 0 {
continue
}
serviceName := ed.Meta.Labels[discovery.LabelServiceName]
isCustomEPS := ed.Meta.Labels[discovery.LabelManagedBy] != "endpointslice-controller.k8s.io"
for _, endpointAddress := range ed.Addresses {
if !enableDualStackNEG && endpointAddress.AddressType != discovery.AddressTypeIPv4 {
klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
Expand Down Expand Up @@ -424,7 +426,7 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
klog.Errorf("Endpoint %q in Endpoints %s/%s has IP(s) not match to its pod %s: %w, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, pod.Name, err)
continue
}
if err := validatePod(pod, nodeLister, networkEndpoint); err != nil {
if err := validatePod(pod, nodeLister, serviceLister, networkEndpoint, serviceName, isCustomEPS); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err)
continue
}
Expand Down Expand Up @@ -456,7 +458,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
// 1. is in terminal state
// 2. corresponds to a non-existent node
// 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer, networkEndpoint negtypes.NetworkEndpoint) error {
// 4. has labels not matching to its service's label selector
func validatePod(pod *apiv1.Pod, nodeLister, serviceLister cache.Indexer, networkEndpoint negtypes.NetworkEndpoint, serviceName string, isCustomEPS bool) error {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
Expand All @@ -473,6 +476,16 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer, networkEndpoint negty
if err = nodeContainsPodIP(node, networkEndpoint); err != nil {
return err
}
service := getService(serviceLister, pod.ObjectMeta.Namespace, serviceName)
if service == nil {
return negtypes.ErrEPServiceNotFound
}
if isCustomEPS {
return nil
}
if err = podBelongsToService(pod, service); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -561,6 +574,19 @@ func nodeContainsPodIP(node *apiv1.Node, networkEndpoint negtypes.NetworkEndpoin
return nil
}

// podBelongsToService checks the pod's labels
// and return error if any label specified in the service's label selector is not in the pod's labels
func podBelongsToService(pod *apiv1.Pod, service *apiv1.Service) error {
podLabels := pod.ObjectMeta.Labels
serviceLabels := service.Spec.Selector
for key, val1 := range serviceLabels {
if val2, contains := podLabels[key]; !contains || val1 != val2 {
return fmt.Errorf("%w: pod %s has labels not match to its service %s's label selector", negtypes.ErrEPPodLabelMismatch, pod.Name, service.Name)
}
}
return nil
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) {
// Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates.
Expand Down
Loading

0 comments on commit daf13f4

Please sign in to comment.