Skip to content

Commit

Permalink
Filter pods that don't belong to the service in question
Browse files Browse the repository at this point in the history
Filter pods that don't have labels match to its service label selector.
  • Loading branch information
sawsa307 committed Apr 17, 2023
1 parent a416fcc commit b96225c
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 23 deletions.
3 changes: 2 additions & 1 deletion pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg

// determine the implementation that calculates NEG endpoints on each sync.
epc := negsyncer.GetEndpointsCalculator(
manager.nodeLister,
manager.podLister,
manager.nodeLister,
manager.serviceLister,
manager.zoneGetter,
syncerKey,
portInfo.EpCalculatorMode,
Expand Down
13 changes: 11 additions & 2 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,26 @@ type L7EndpointsCalculator struct {
servicePortName string
podLister cache.Indexer
nodeLister cache.Indexer
serviceLister cache.Indexer
networkEndpointType types.NetworkEndpointType
enableDualStackNEG bool
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(
zoneGetter types.ZoneGetter,
podLister, nodeLister, serviceLister cache.Indexer,
svcPortName string,
endpointType types.NetworkEndpointType,
logger klog.Logger,
enableDualStackNEG bool,
) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
nodeLister: nodeLister,
serviceLister: serviceLister,
networkEndpointType: endpointType,
enableDualStackNEG: enableDualStackNEG,
logger: logger.WithName("L7EndpointsCalculator"),
Expand All @@ -219,7 +228,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.serviceLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func TestValidateEndpoints(t *testing.T) {
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
serviceLister := testContext.ServiceInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, serviceLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())

Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func NewTransactionSyncer(
return syncer
}

func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator {
func GetEndpointsCalculator(podLister, nodeLister, serviceLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator {
serviceKey := strings.Join([]string{syncerKey.Name, syncerKey.Namespace}, "/")
if syncerKey.NegType == negtypes.VmIpEndpointType {
nodeLister := listers.NewNodeLister(nodeLister)
Expand All @@ -188,6 +188,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
zoneGetter,
podLister,
nodeLister,
serviceLister,
syncerKey.PortTuple.Name,
syncerKey.NegType,
logger,
Expand Down
17 changes: 15 additions & 2 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,18 @@ func TestEnableDegradedMode(t *testing.T) {
},
})
}
testLabels := map[string]string{
"run": "foo",
} // this should match to pod labels
s.serviceLister.Add(&corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testServiceNamespace,
Name: testServiceName,
},
Spec: corev1.ServiceSpec{
Selector: testLabels,
},
})
for _, eps := range tc.testEndpointSlices {
s.endpointSliceLister.Add(eps)
}
Expand Down Expand Up @@ -2073,7 +2085,7 @@ func TestCollectLabelStats(t *testing.T) {

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG)
ts.endpointsCalculator = GetEndpointsCalculator(ts.podLister, ts.nodeLister, ts.serviceLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), ts.enableDualStackNEG)
return negsyncer, ts
}

Expand Down Expand Up @@ -2114,7 +2126,8 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp
testContext.NodeInformer.GetIndexer(),
testContext.SvcNegInformer.GetIndexer(),
reflector,
GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG),
GetEndpointsCalculator(testContext.PodInformer.GetIndexer(), testContext.NodeInformer.GetIndexer(), testContext.ServiceInformer.GetIndexer(),
fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG),
string(kubeSystemUID),
testContext.SvcNegClient,
metrics.FakeSyncerMetrics(),
Expand Down
34 changes: 31 additions & 3 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func getEndpointPod(

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
// we will not raise error in degraded mode for misconfigured endpoints, instead they will be filtered directly
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister, serviceLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
Expand All @@ -461,6 +461,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
if len(matchPort) == 0 {
continue
}
serviceName := ed.Meta.Labels[discovery.LabelServiceName]
isCustomEPS := ed.Meta.Labels[discovery.LabelManagedBy] != "endpointslice-controller.k8s.io"
for _, endpointAddress := range ed.Addresses {
if !enableDualStackNEG && endpointAddress.AddressType != discovery.AddressTypeIPv4 {
klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
Expand All @@ -471,7 +473,7 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod, err: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr)
continue
}
if err := validatePod(pod, nodeLister); err != nil {
if err := validatePod(pod, nodeLister, serviceLister, serviceName, isCustomEPS); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err)
continue
}
Expand Down Expand Up @@ -525,7 +527,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
// 1. is in terminal state
// 2. corresponds to a non-existent node
// 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error {
// 4. has labels not matching to its service's label selector
func validatePod(pod *apiv1.Pod, nodeLister, serviceLister cache.Indexer, serviceName string, isCustomEPS bool) error {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
Expand All @@ -543,6 +546,18 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) error {
klog.V(2).Info("Pod %s/%s has an IP %v that is outside of the node's allocated IP range(s) %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Status.PodIP, node.Spec.PodCIDRs)
return err
}
service := getService(serviceLister, pod.ObjectMeta.Namespace, serviceName)
if service == nil {
klog.V(2).Info("Endpoint does not correspond to an existing service %s, skipping", serviceName)
return negtypes.ErrEPServiceNotFound
}
if isCustomEPS {
return nil
}
if err = podBelongsToService(pod, service); err != nil {
klog.V(2).Info("Pod %s/%s labels %v does not match service %s/%s selector %v", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Labels, service.Namespace, service.Name, service.Spec.Selector)
return err
}
return nil
}

Expand Down Expand Up @@ -607,6 +622,19 @@ func nodeContainsPodIP(node *apiv1.Node, pod *apiv1.Pod) error {
return negtypes.ErrEPIPOutOfPodCIDR
}

// podBelongsToService checks the pod's labels
// and return error if any label specified in the service's label selector is not in the pod's labels
func podBelongsToService(pod *apiv1.Pod, service *apiv1.Service) error {
podLabels := pod.ObjectMeta.Labels
serviceLabels := service.Spec.Selector
for key, val1 := range serviceLabels {
if val2, contains := podLabels[key]; !contains || val1 != val2 {
return negtypes.ErrEPPodLabelMismatch
}
}
return nil
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) {
// Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates.
Expand Down
Loading

0 comments on commit b96225c

Please sign in to comment.