Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter pods that don't belong to the service in question #1966

Merged
merged 1 commit into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg

// determine the implementation that calculates NEG endpoints on each sync.
epc := negsyncer.GetEndpointsCalculator(
manager.nodeLister,
manager.podLister,
manager.nodeLister,
manager.serviceLister,
manager.zoneGetter,
syncerKey,
portInfo.EpCalculatorMode,
Expand Down
6 changes: 4 additions & 2 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,19 @@ type L7EndpointsCalculator struct {
servicePortName string
podLister cache.Indexer
nodeLister cache.Indexer
serviceLister cache.Indexer
networkEndpointType types.NetworkEndpointType
enableDualStackNEG bool
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister, serviceLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
nodeLister: nodeLister,
serviceLister: serviceLister,
networkEndpointType: endpointType,
enableDualStackNEG: enableDualStackNEG,
logger: logger.WithName("L7EndpointsCalculator"),
Expand All @@ -219,7 +221,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.serviceLister, l.servicePortName, l.networkEndpointType, l.enableDualStackNEG)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func TestValidateEndpoints(t *testing.T) {
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
serviceLister := testContext.ServiceInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, serviceLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())

Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewTransactionSyncer(
return syncer
}

func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator {
func GetEndpointsCalculator(podLister, nodeLister, serviceLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, mode negtypes.EndpointsCalculatorMode, logger klog.Logger, enableDualStackNEG bool) negtypes.NetworkEndpointsCalculator {
swetharepakula marked this conversation as resolved.
Show resolved Hide resolved
serviceKey := strings.Join([]string{syncerKey.Name, syncerKey.Namespace}, "/")
if syncerKey.NegType == negtypes.VmIpEndpointType {
nodeLister := listers.NewNodeLister(nodeLister)
Expand All @@ -195,6 +195,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
zoneGetter,
podLister,
nodeLister,
serviceLister,
syncerKey.PortTuple.Name,
syncerKey.NegType,
logger,
Expand Down
17 changes: 15 additions & 2 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,18 @@ func TestEnableDegradedMode(t *testing.T) {
},
})
}
testLabels := map[string]string{
"run": "foo",
} // this should match to pod labels
s.serviceLister.Add(&corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: testServiceNamespace,
Name: testServiceName,
},
Spec: corev1.ServiceSpec{
Selector: testLabels,
},
})
for _, eps := range tc.testEndpointSlices {
s.endpointSliceLister.Add(eps)
}
Expand Down Expand Up @@ -2080,7 +2092,7 @@ func TestCollectLabelStats(t *testing.T) {

func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, mode negtypes.EndpointsCalculatorMode) (negtypes.NegSyncer, *transactionSyncer) {
negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmIpEndpointType, false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false)
ts.endpointsCalculator = GetEndpointsCalculator(ts.podLister, ts.nodeLister, ts.serviceLister, ts.zoneGetter, ts.NegSyncerKey, mode, klog.TODO(), false)
return negsyncer, ts
}

Expand Down Expand Up @@ -2121,7 +2133,8 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp
testContext.NodeInformer.GetIndexer(),
testContext.SvcNegInformer.GetIndexer(),
reflector,
GetEndpointsCalculator(testContext.NodeInformer.GetIndexer(), testContext.PodInformer.GetIndexer(), fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG),
GetEndpointsCalculator(testContext.PodInformer.GetIndexer(), testContext.NodeInformer.GetIndexer(), testContext.ServiceInformer.GetIndexer(),
fakeZoneGetter, svcPort, mode, klog.TODO(), testContext.EnableDualStackNEG),
string(kubeSystemUID),
testContext.SvcNegClient,
metrics.FakeSyncerMetrics(),
Expand Down
36 changes: 33 additions & 3 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ const (
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
separator = "||"

// managedByEPSControllerValue is a unique value used with LabelManagedBy to indicate
// the EndpointSlice is managed by the endpoint slice controller.
managedByEPSControllerValue = "endpointslice-controller.k8s.io"
)

// encodeEndpoint encodes ip and instance into a single string
Expand Down Expand Up @@ -371,7 +375,7 @@ func getEndpointPod(

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
// we will not raise error in degraded mode for misconfigured endpoints, instead they will be filtered directly
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister, serviceLister cache.Indexer, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, enableDualStackNEG bool) ZoneNetworkEndpointMapResult {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
Expand All @@ -387,6 +391,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
if len(matchPort) == 0 {
continue
}
serviceName := ed.Meta.Labels[discovery.LabelServiceName]
isCustomEPS := ed.Meta.Labels[discovery.LabelManagedBy] != managedByEPSControllerValue
for _, endpointAddress := range ed.Addresses {
if !enableDualStackNEG && endpointAddress.AddressType != discovery.AddressTypeIPv4 {
klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
Expand Down Expand Up @@ -424,7 +430,7 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
klog.Errorf("Endpoint %q in Endpoints %s/%s has IP(s) not match to its pod %s: %w, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, pod.Name, err)
continue
}
if err := validatePod(pod, nodeLister, networkEndpoint); err != nil {
if err := validatePod(pod, nodeLister, serviceLister, networkEndpoint, serviceName, isCustomEPS); err != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s correponds to an invalid pod: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, err)
continue
}
Expand Down Expand Up @@ -457,7 +463,8 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
// 1. is in terminal state
// 2. corresponds to a non-existent node
// 3. have an IP that matches to a podIP, but is outside of the node's allocated IP range
func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer, networkEndpoint negtypes.NetworkEndpoint) error {
// 4. has labels not matching to its service's label selector
func validatePod(pod *apiv1.Pod, nodeLister, serviceLister cache.Indexer, networkEndpoint negtypes.NetworkEndpoint, serviceName string, isCustomEPS bool) error {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
Expand All @@ -474,6 +481,16 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer, networkEndpoint negty
if err = nodeContainsPodIP(node, networkEndpoint); err != nil {
return err
}
service := getService(serviceLister, pod.ObjectMeta.Namespace, serviceName)
if service == nil {
return negtypes.ErrEPServiceNotFound
}
if isCustomEPS {
return nil
}
if err = podBelongsToService(pod, service); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -563,6 +580,19 @@ func nodeContainsPodIP(node *apiv1.Node, networkEndpoint negtypes.NetworkEndpoin
return nil
}

// podBelongsToService checks the pod's labels
// and return error if any label specified in the service's label selector is not in the pod's labels
func podBelongsToService(pod *apiv1.Pod, service *apiv1.Service) error {
podLabels := pod.ObjectMeta.Labels
serviceLabels := service.Spec.Selector
for key, val1 := range serviceLabels {
if val2, contains := podLabels[key]; !contains || val1 != val2 {
return fmt.Errorf("%w: pod %s/%s has labels not match to its service %s/%s's label selector", negtypes.ErrEPPodLabelMismatch, pod.Namespace, pod.Name, service.Namespace, service.Name)
}
}
return nil
}

// retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map
func retrieveExistingZoneNetworkEndpointMap(negName string, zoneGetter negtypes.ZoneGetter, cloud negtypes.NetworkEndpointGroupCloud, version meta.Version, mode negtypes.EndpointsCalculatorMode) (map[string]negtypes.NetworkEndpointSet, labels.EndpointPodLabelMap, error) {
// Include zones that have non-candidate nodes currently. It is possible that NEGs were created in those zones previously and the endpoints now became non-candidates.
Expand Down
Loading