Skip to content

Commit

Permalink
Refactor syncInternalImpl, toZoneNetworkEndpointMap, and toZoneNetwor…
Browse files Browse the repository at this point in the history
…kEndpointMapDegradedMode.

Refactor syncInternalImpl, toZoneNetworkEndpointMap, and toZoneNetworkEndpointMapDegradedMode.
  • Loading branch information
sawsa307 committed Apr 4, 2023
1 parent 7ba31c9 commit db38c4d
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 149 deletions.
22 changes: 21 additions & 1 deletion pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *LocalL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand Down Expand Up @@ -166,6 +172,12 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand All @@ -176,16 +188,18 @@ type L7EndpointsCalculator struct {
zoneGetter types.ZoneGetter
servicePortName string
podLister cache.Indexer
nodeLister cache.Indexer
networkEndpointType types.NetworkEndpointType
enableDualStackNEG bool
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
nodeLister: nodeLister,
networkEndpointType: endpointType,
enableDualStackNEG: enableDualStackNEG,
logger: logger.WithName("L7EndpointsCalculator"),
Expand All @@ -203,6 +217,12 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _
return result.NetworkEndpointSet, result.EndpointPodMap, result.DupCount, err
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
var str []string
for zone, nodeList := range nodeMap {
Expand Down
8 changes: 4 additions & 4 deletions pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func TestValidateEndpoints(t *testing.T) {
zoneGetter := negtypes.NewFakeZoneGetter()
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := listers.NewNodeLister(testContext.NodeInformer.GetIndexer())
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())

testEndpointPodMap := map[negtypes.NetworkEndpoint]types.NamespacedName{
{
Expand Down
21 changes: 20 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
return NewL7EndpointsCalculator(
zoneGetter,
podLister,
nodeLister,
syncerKey.PortTuple.Name,
syncerKey.NegType,
logger,
Expand Down Expand Up @@ -253,8 +254,26 @@ func (s *transactionSyncer) syncInternalImpl() error {

endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, err = s.getEndpointsCalculation(endpointsData, currentMap)
if err != nil {

if !flags.F.EnableDegradedMode && err != nil {
return err
} else if flags.F.EnableDegradedMode {
if !s.inErrorState() && err != nil {
return err // if we encounter an error, we will return and run the next sync in degraded mode
}
degradedTargetMap, degradedPodMap, degradedModeErr := s.endpointsCalculator.CalculateEndpointsDegradedMode(endpointsData, currentMap)
if degradedModeErr != nil {
return degradedModeErr
}
notInDegraded, onlyInDegraded := calculateNetworkEndpointDifference(targetMap, degradedTargetMap)
if s.inErrorState() {
targetMap = degradedTargetMap
endpointPodMap = degradedPodMap
if len(notInDegraded) == 0 && len(onlyInDegraded) == 0 {
s.resetErrorState()
}
}
// TODO(cheungdavid): in the else branch, publish metrics if we don't encounter error and we are not in error state
}
s.logStats(targetMap, "desired NEG endpoints")

Expand Down
103 changes: 47 additions & 56 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,15 @@ func getEndpointPod(
return pod, nil
}

func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter,
podLister, nodeLister cache.Indexer, servicePortName string,
networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
targetMap := map[string]negtypes.NetworkEndpointSet{}
endpointPodMap := negtypes.EndpointPodMap{}
var dupCount int
func toZoneNetworkEndpointMapDegradedMode(
eds []negtypes.EndpointsData,
zoneGetter negtypes.ZoneGetter,
podLister, nodeLister cache.Indexer,
servicePortName string,
networkEndpointType negtypes.NetworkEndpointType,
) ZoneNetworkEndpointMapResult {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
for _, ed := range eds {
matchPort := ""
for _, port := range ed.Ports {
Expand All @@ -379,58 +382,46 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
}
if endpointAddress.TargetRef == nil {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
pod, getPodErr := getEndpointPod(endpointAddress, podLister)
if getPodErr != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod, err: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr)
continue
}
dupCount += validateAndAddEndpoints(endpointAddress, zoneGetter, podLister, nodeLister, matchPort, networkEndpointType, targetMap, endpointPodMap)
}
}
return targetMap, endpointPodMap, dupCount, nil
}

// validateAndAddEndpoints fills in missing information and creates network endpoint for each endpoint addresss
func validateAndAddEndpoints(ep negtypes.AddressData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, matchPort string, endpointType negtypes.NetworkEndpointType, targetMap map[string]negtypes.NetworkEndpointSet, endpointPodMap negtypes.EndpointPodMap) int {
var dupCount int
for _, address := range ep.Addresses {
key := fmt.Sprintf("%s/%s", ep.TargetRef.Namespace, ep.TargetRef.Name)
obj, exists, err := podLister.GetByKey(key)
if err != nil || !exists {
klog.V(2).Infof("Endpoint %q does not correspond to an existing pod. Skipping", address)
continue
}
pod, ok := obj.(*apiv1.Pod)
if !ok {
klog.V(2).Infof("Endpoint %q does not correspond to a pod object. Skipping", address)
continue
}
if !validatePod(pod, nodeLister) {
klog.V(2).Infof("Endpoint %q does not correspond to a valid pod resource. Skipping", address)
continue
}
nodeName := pod.Spec.NodeName
zone, err := zoneGetter.GetZoneForNode(nodeName)
if err != nil {
klog.V(2).Infof("Endpoint %q does not have valid zone information. Skipping", address)
continue
}
if !validatePod(pod, nodeLister) {
continue
}
nodeName := pod.Spec.NodeName
zone, err := zoneGetter.GetZoneForNode(nodeName)
if err != nil {
klog.V(2).Infof("For endpoint %q in pod %q, its corresponding node %q does not have valid zone information, skipping", endpointAddress.Addresses, pod.ObjectMeta.Name, nodeName)
continue
}
if zoneNetworkEndpointMap[zone] == nil {
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
}
for _, address := range endpointAddress.Addresses {
networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)

if endpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
nodeName = ""
}
networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName}
if targetMap[zone] == nil {
targetMap[zone] = negtypes.NewNetworkEndpointSet()
}
targetMap[zone].Insert(networkEndpoint)
// increment the count for duplicated endpoint
if _, contains := endpointPodMap[networkEndpoint]; contains {
dupCount += 1
if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains {
// if existing name is alphabetically lower than current one, continue and don't replace
if existingPod.Name < endpointAddress.TargetRef.Name {
klog.Infof("Found duplicate endpoints for %q, save the pod information from the alphabetically higher pod", address)
continue
}
}
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}
}
}
endpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name}
}
return dupCount
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: zoneNetworkEndpointMap,
EndpointPodMap: networkEndpointPodMap,
}
}

// validatePod checks if this pod is a valid pod resource
Expand All @@ -441,17 +432,17 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) bool {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
klog.V(2).Info("Pod %s/%s is a terminal pod with status %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, phase)
klog.V(2).Infof("Pod %s/%s is a terminal pod with status %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, phase)
return false
}
obj, exists, err := nodeLister.GetByKey(pod.Spec.NodeName)
if err != nil || !exists {
klog.V(2).Info("Pod %s/%s corresponds to a non-existing node %s, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName)
klog.V(2).Infof("Pod %s/%s corresponds to a non-existing node %s, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName)
return false
}
_, isNode := obj.(*apiv1.Node)
if !isNode {
klog.V(2).Info("Pod %s/%s does not correspond to a valid node resource, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
klog.V(2).Infof("Pod %s/%s does not correspond to a valid node resource, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
return false
}
return true
Expand Down
Loading

0 comments on commit db38c4d

Please sign in to comment.