Skip to content

Commit

Permalink
Refactor degraded mode in syncInternalImpl and toZoneNetworkEndpointM…
Browse files Browse the repository at this point in the history
…apDegradedMode.

Refactor degraded mode in syncInternalImpl and toZoneNetworkEndpointMapDegradedMode.
  • Loading branch information
sawsa307 committed Apr 5, 2023
1 parent a502236 commit b79ead5
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 149 deletions.
22 changes: 21 additions & 1 deletion pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *LocalL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand Down Expand Up @@ -166,6 +172,12 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand All @@ -176,16 +188,18 @@ type L7EndpointsCalculator struct {
zoneGetter types.ZoneGetter
servicePortName string
podLister cache.Indexer
nodeLister cache.Indexer
networkEndpointType types.NetworkEndpointType
enableDualStackNEG bool
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
nodeLister: nodeLister,
networkEndpointType: endpointType,
enableDualStackNEG: enableDualStackNEG,
logger: logger.WithName("L7EndpointsCalculator"),
Expand All @@ -203,6 +217,12 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _
return result.NetworkEndpointSet, result.EndpointPodMap, result.DupCount, err
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
var str []string
for zone, nodeList := range nodeMap {
Expand Down
8 changes: 4 additions & 4 deletions pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func TestValidateEndpoints(t *testing.T) {
zoneGetter := negtypes.NewFakeZoneGetter()
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := listers.NewNodeLister(testContext.NodeInformer.GetIndexer())
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())

testEndpointPodMap := map[negtypes.NetworkEndpoint]types.NamespacedName{
{
Expand Down
21 changes: 20 additions & 1 deletion pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
return NewL7EndpointsCalculator(
zoneGetter,
podLister,
nodeLister,
syncerKey.PortTuple.Name,
syncerKey.NegType,
logger,
Expand Down Expand Up @@ -257,8 +258,26 @@ func (s *transactionSyncer) syncInternalImpl() error {

endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, err = s.getEndpointsCalculation(endpointsData, currentMap)
if err != nil {

if !s.enableDegradedMode && err != nil {
return err
} else if s.enableDegradedMode {
if !s.inErrorState() && err != nil {
return err // if we encounter an error, we will return and run the next sync in degraded mode
}
degradedTargetMap, degradedPodMap, degradedModeErr := s.endpointsCalculator.CalculateEndpointsDegradedMode(endpointsData, currentMap)
if degradedModeErr != nil {
return degradedModeErr
}
notInDegraded, onlyInDegraded := calculateNetworkEndpointDifference(targetMap, degradedTargetMap)
if s.inErrorState() {
targetMap = degradedTargetMap
endpointPodMap = degradedPodMap
if len(notInDegraded) == 0 && len(onlyInDegraded) == 0 {
s.resetErrorState()
}
}
// TODO(cheungdavid): in the else branch, publish metrics if we don't encounter error and we are not in error state
}
s.logStats(targetMap, "desired NEG endpoints")

Expand Down
112 changes: 56 additions & 56 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/tools/record"
negv1beta1 "k8s.io/ingress-gce/pkg/apis/svcneg/v1beta1"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/flags"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -277,6 +278,14 @@ func toZoneNetworkEndpointMap(
// pod is used for label propagation
_, getPodErr := getEndpointPod(endpointAddress, podLister)
if getPodErr != nil {
if flags.F.EnableDegradedMode {
klog.Errorf("Detected unexpected error when getting pod, err: %v", getPodErr)
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: nil,
EndpointPodMap: nil,
DupCount: dupCount,
}, getPodErr
}
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
}
Expand Down Expand Up @@ -356,12 +365,15 @@ func getEndpointPod(
return pod, nil
}

func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter,
podLister, nodeLister cache.Indexer, servicePortName string,
networkEndpointType negtypes.NetworkEndpointType) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
targetMap := map[string]negtypes.NetworkEndpointSet{}
endpointPodMap := negtypes.EndpointPodMap{}
var dupCount int
func toZoneNetworkEndpointMapDegradedMode(
eds []negtypes.EndpointsData,
zoneGetter negtypes.ZoneGetter,
podLister, nodeLister cache.Indexer,
servicePortName string,
networkEndpointType negtypes.NetworkEndpointType,
) ZoneNetworkEndpointMapResult {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
for _, ed := range eds {
matchPort := ""
for _, port := range ed.Ports {
Expand All @@ -378,58 +390,46 @@ func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGett
klog.Infof("Skipping non IPv4 address in degraded mode: %q, in endpoint slice %s/%s", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
}
if endpointAddress.TargetRef == nil {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
pod, getPodErr := getEndpointPod(endpointAddress, podLister)
if getPodErr != nil {
klog.Errorf("Endpoint %q in Endpoints %s/%s receives error when getting pod, err: %v, skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name, getPodErr)
continue
}
dupCount += validateAndAddEndpoints(endpointAddress, zoneGetter, podLister, nodeLister, matchPort, networkEndpointType, targetMap, endpointPodMap)
}
}
return targetMap, endpointPodMap, dupCount, nil
}

// validateAndAddEndpoints fills in missing information and creates network endpoint for each endpoint addresss
func validateAndAddEndpoints(ep negtypes.AddressData, zoneGetter negtypes.ZoneGetter, podLister, nodeLister cache.Indexer, matchPort string, endpointType negtypes.NetworkEndpointType, targetMap map[string]negtypes.NetworkEndpointSet, endpointPodMap negtypes.EndpointPodMap) int {
var dupCount int
for _, address := range ep.Addresses {
key := fmt.Sprintf("%s/%s", ep.TargetRef.Namespace, ep.TargetRef.Name)
obj, exists, err := podLister.GetByKey(key)
if err != nil || !exists {
klog.V(2).Infof("Endpoint %q does not correspond to an existing pod. Skipping", address)
continue
}
pod, ok := obj.(*apiv1.Pod)
if !ok {
klog.V(2).Infof("Endpoint %q does not correspond to a pod object. Skipping", address)
continue
}
if !validatePod(pod, nodeLister) {
klog.V(2).Infof("Endpoint %q does not correspond to a valid pod resource. Skipping", address)
continue
}
nodeName := pod.Spec.NodeName
zone, err := zoneGetter.GetZoneForNode(nodeName)
if err != nil {
klog.V(2).Infof("Endpoint %q does not have valid zone information. Skipping", address)
continue
}
if !validatePod(pod, nodeLister) {
continue
}
nodeName := pod.Spec.NodeName
zone, err := zoneGetter.GetZoneForNode(nodeName)
if err != nil {
klog.V(2).Infof("For endpoint %q in pod %q, its corresponding node %q does not have valid zone information, skipping", endpointAddress.Addresses, pod.ObjectMeta.Name, nodeName)
continue
}
if zoneNetworkEndpointMap[zone] == nil {
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
}
for _, address := range endpointAddress.Addresses {
networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName}
if networkEndpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
networkEndpoint.Node = ""
}
zoneNetworkEndpointMap[zone].Insert(networkEndpoint)

if endpointType == negtypes.NonGCPPrivateEndpointType {
// Non-GCP network endpoints don't have associated nodes.
nodeName = ""
}
networkEndpoint := negtypes.NetworkEndpoint{IP: address, Port: matchPort, Node: nodeName}
if targetMap[zone] == nil {
targetMap[zone] = negtypes.NewNetworkEndpointSet()
}
targetMap[zone].Insert(networkEndpoint)
// increment the count for duplicated endpoint
if _, contains := endpointPodMap[networkEndpoint]; contains {
dupCount += 1
if existingPod, contains := networkEndpointPodMap[networkEndpoint]; contains {
// if existing name is alphabetically lower than current one, continue and don't replace
if existingPod.Name < endpointAddress.TargetRef.Name {
klog.Infof("Found duplicate endpoints for %q, save the pod information from the alphabetically higher pod", address)
continue
}
}
networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: endpointAddress.TargetRef.Namespace, Name: endpointAddress.TargetRef.Name}
}
}
endpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: ep.TargetRef.Namespace, Name: ep.TargetRef.Name}
}
return dupCount
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: zoneNetworkEndpointMap,
EndpointPodMap: networkEndpointPodMap,
}
}

// validatePod checks if this pod is a valid pod resource
Expand All @@ -440,17 +440,17 @@ func validatePod(pod *apiv1.Pod, nodeLister cache.Indexer) bool {
// Terminal Pod means a pod is in PodFailed or PodSucceeded phase
phase := pod.Status.Phase
if phase == apiv1.PodFailed || phase == apiv1.PodSucceeded {
klog.V(2).Info("Pod %s/%s is a terminal pod with status %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, phase)
klog.V(2).Infof("Pod %s/%s is a terminal pod with status %v, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, phase)
return false
}
obj, exists, err := nodeLister.GetByKey(pod.Spec.NodeName)
if err != nil || !exists {
klog.V(2).Info("Pod %s/%s corresponds to a non-existing node %s, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName)
klog.V(2).Infof("Pod %s/%s corresponds to a non-existing node %s, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name, pod.Spec.NodeName)
return false
}
_, isNode := obj.(*apiv1.Node)
if !isNode {
klog.V(2).Info("Pod %s/%s does not correspond to a valid node resource, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
klog.V(2).Infof("Pod %s/%s does not correspond to a valid node resource, skipping", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
return false
}
return true
Expand Down
Loading

0 comments on commit b79ead5

Please sign in to comment.