Skip to content

Commit

Permalink
Refactor syncInternalImpl and toZoneNetworkEndpointMap
Browse files Browse the repository at this point in the history
Refactor syncInternalImpl and toZoneNetworkEndpointMap
  • Loading branch information
sawsa307 committed Mar 28, 2023
1 parent 9eaef22 commit 776bb3f
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg
sm.syncerStatusMap = make(map[negtypes.NegSyncerKey]string)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap: %v", sm.syncerStatusMap)
}
sm.syncerStatusMap[key] = syncResult.Result
sm.syncerStatusMap[key] = string(syncResult.Result)
}

// SetSyncerEPMetrics update the endpoint count based on the endpointStat
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType, l.lpConfig)
result, err := toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType, l.lpConfig)
return result.NetworkEndpointSet, result.EndpointPodMap, result.DupCount, err
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
Expand Down
69 changes: 36 additions & 33 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils/endpointslices"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -105,9 +104,6 @@ type transactionSyncer struct {

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector

// enableDegradedMode indicates whether we do endpoint calculation using degraded mode procedures
enableDegradedMode bool
}

func NewTransactionSyncer(
Expand Down Expand Up @@ -151,7 +147,6 @@ func NewTransactionSyncer(
customName: customName,
errorState: "",
logger: logger,
enableDegradedMode: flags.F.EnableDegradedMode,
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger)
Expand Down Expand Up @@ -201,17 +196,16 @@ func (s *transactionSyncer) syncInternal() error {
}

func (s *transactionSyncer) syncInternalImpl() error {
if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
Expand All @@ -237,26 +231,12 @@ func (s *transactionSyncer) syncInternalImpl() error {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
endpointSlices := convertUntypedToEPS(slices)
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
for i, slice := range slices {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
continue
}
lastSyncTimestamp := negCR.Status.LastSyncTime
epsCreationTimestamp := endpointslice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness)

if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
} else {
s.computeEPSStaleness(endpointSlices, negCR.Status.LastSyncTime)
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
Expand Down Expand Up @@ -358,7 +338,7 @@ func (s *transactionSyncer) getErrorStateReason(err error) string {
return ""
}
if result, contains := negtypes.ErrorStateResult[err]; contains {
return result
return string(result)
}
return ""
}
Expand Down Expand Up @@ -440,11 +420,11 @@ func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
switch s.errorState {
case "":
syncResult = negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
case negtypes.ResultInvalidAPIResponse:
case string(negtypes.ResultInvalidAPIResponse):
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidAPIResponse, negtypes.ResultInvalidAPIResponse)
case negtypes.ResultInvalidEPAttach:
case string(negtypes.ResultInvalidEPAttach):
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPAttach, negtypes.ResultInvalidEPAttach)
case negtypes.ResultInvalidEPDetach:
case string(negtypes.ResultInvalidEPDetach):
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPDetach, negtypes.ResultInvalidEPDetach)
default:
syncResult = negtypes.NewNegSyncResult(errors.New("Unknown error state value"), negtypes.ResultOtherError)
Expand Down Expand Up @@ -716,6 +696,29 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
}
}

func convertUntypedToEPS(endpointSliceUntyped []interface{}) []*discovery.EndpointSlice {
endpointSlices := make([]*discovery.EndpointSlice, len(endpointSliceUntyped))
for i, slice := range endpointSliceUntyped {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
}
return endpointSlices
}

func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.EndpointSlice, lastSyncTimestamp metav1.Time) {
for _, endpointSlice := range endpointSlices {
epsCreationTimestamp := endpointSlice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointSlice.Namespace, "Name", endpointSlice.Name, "staleness", epsStaleness)
}
}

// getNegFromStore returns the neg associated with the provided namespace and neg name if it exists otherwise throws an error
func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
n, exists, err := svcNegLister.GetByKey(fmt.Sprintf("%s/%s", namespace, negName))
Expand Down
51 changes: 43 additions & 8 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,31 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
return negRef, nil
}

type ZoneNetworkEndpointMapResult struct {
NetworkEndpointSet map[string]negtypes.NetworkEndpointSet
EndpointPodMap negtypes.EndpointPodMap
DupCount int
Misconfig bool
}

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, lpConfig negtypes.PodLabelPropagationConfig) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
func toZoneNetworkEndpointMap(
eds []negtypes.EndpointsData,
zoneGetter negtypes.ZoneGetter,
servicePortName string,
networkEndpointType negtypes.NetworkEndpointType,
lpConfig negtypes.PodLabelPropagationConfig,
) (ZoneNetworkEndpointMapResult, error) {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
if eds == nil {
klog.Errorf("Endpoint object is nil")
return zoneNetworkEndpointMap, networkEndpointPodMap, dupCount, nil
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: zoneNetworkEndpointMap,
EndpointPodMap: networkEndpointPodMap,
DupCount: dupCount,
}, nil
}
var foundMatchingPort bool
for _, ed := range eds {
Expand All @@ -250,20 +267,33 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.
continue
}
if endpointAddress.NodeName == nil || len(*endpointAddress.NodeName) == 0 {
klog.V(2).Infof("Detected unexpected error when checking missing nodeName. Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return nil, nil, dupCount, negtypes.ErrEPMissingNodeName
klog.Error("Detected unexpected error when checking nodeName. Endpoint %q in Endpoints %s/%s does not have an associated node", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: nil,
EndpointPodMap: nil,
DupCount: dupCount,
}, negtypes.ErrEPNodeMissing
}
if endpointAddress.TargetRef == nil {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
}
zone, err := zoneGetter.GetZoneForNode(*endpointAddress.NodeName)
if err != nil {
return nil, nil, dupCount, negtypes.ErrNodeNotFound
klog.Error("Detected unexpected error when checking zone. Endpoint %q in Endpoints %s/%s does not correspond to an existing node", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: nil,
EndpointPodMap: nil,
DupCount: dupCount,
}, negtypes.ErrEPNodeNotFound
}
if zone == "" {
klog.V(2).Info("Detected unexpected error when checking missing zone")
return nil, nil, dupCount, negtypes.ErrEPMissingZone
klog.Error("Detected unexpected error when checking zone. Endpoint %q in Endpoints %s/%s correspond to empty zone", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: nil,
EndpointPodMap: nil,
DupCount: dupCount,
}, negtypes.ErrEPZoneMissing
}
if zoneNetworkEndpointMap[zone] == nil {
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
Expand Down Expand Up @@ -295,7 +325,12 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.
if len(zoneNetworkEndpointMap) == 0 || len(networkEndpointPodMap) == 0 {
klog.V(3).Infof("Generated empty endpoint maps (zoneNetworkEndpointMap: %+v, networkEndpointPodMap: %v) from Endpoints object: %+v", zoneNetworkEndpointMap, networkEndpointPodMap, eds)
}
return zoneNetworkEndpointMap, networkEndpointPodMap, dupCount, nil
return ZoneNetworkEndpointMapResult{
NetworkEndpointSet: zoneNetworkEndpointMap,
EndpointPodMap: networkEndpointPodMap,
DupCount: dupCount,
}, nil

}

func toZoneNetworkEndpointMapDegradedMode(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter,
Expand Down
10 changes: 5 additions & 5 deletions pkg/neg/syncers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,17 +531,17 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) {

// TODO(songrx1997): Add endpoint annotations for the test after calculation code is in
for _, tc := range testCases {
retSet, retMap, _, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), zoneGetter, tc.portName, tc.networkEndpointType, negtypes.PodLabelPropagationConfig{})
result, err := toZoneNetworkEndpointMap(negtypes.EndpointsDataFromEndpointSlices(getDefaultEndpointSlices()), zoneGetter, tc.portName, tc.networkEndpointType, negtypes.PodLabelPropagationConfig{})
if err != nil {
t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err)
}

if !reflect.DeepEqual(retSet, tc.endpointSets) {
t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, retSet)
if !reflect.DeepEqual(result.NetworkEndpointSet, tc.endpointSets) {
t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, result.NetworkEndpointSet)
}

if !reflect.DeepEqual(retMap, tc.expectMap) {
t.Errorf("For case %q, expecting endpoint map %v, but got %v.", tc.desc, tc.expectMap, retMap)
if !reflect.DeepEqual(result.EndpointPodMap, tc.expectMap) {
t.Errorf("For case %q, expecting endpoint map %v, but got %v.", tc.desc, tc.expectMap, result.EndpointPodMap)
}
}
}
Expand Down
58 changes: 35 additions & 23 deletions pkg/neg/types/sync_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,53 @@ package types

import "errors"

type Result string

const (
ResultEPCountsDiffer = "EPCountsDiffer"
ResultEPMissingNodeName = "EPMissingNodeName"
ResultNodeNotFound = "NodeNotFound"
ResultEPMissingZone = "EPMissingZone"
ResultEPSEndpointCountZero = "EPSEndpointCountZero"
ResultEPCalculationCountZero = "EPCalculationCountZero"
ResultInvalidAPIResponse = "InvalidAPIResponse"
ResultInvalidEPAttach = "InvalidEPAttach"
ResultInvalidEPDetach = "InvalidEPDetach"
ResultEPCountsDiffer = Result("EPCountsDiffer")
ResultEPNodeMissing = Result("EPNodeMissing")
ResultEPNodeNotFound = Result("EPNodeNotFound")
ResultEPPodMissing = Result("EPPodMissing")
ResultEPPodNotFound = Result("EPPodNotFound")
ResultTypeAssertionToPod = Result("TypeAssertionToPod")
ResultEPZoneMissing = Result("EPZoneMissing")
ResultEPSEndpointCountZero = Result("EPSEndpointCountZero")
ResultEPCalculationCountZero = Result("EPCalculationCountZero")
ResultInvalidAPIResponse = Result("InvalidAPIResponse")
ResultInvalidEPAttach = Result("InvalidEPAttach")
ResultInvalidEPDetach = Result("InvalidEPDetach")

// these results have their own errors
ResultNegNotFound = "NegNotFound"
ResultCurrentEPNotFound = "CurrentEPNotFound"
ResultEPSNotFound = "EPSNotFound"
ResultOtherError = "OtherError"
ResultInProgress = "InProgress"
ResultSuccess = "Success"
ResultNegNotFound = Result("NegNotFound")
ResultCurrentNegEPNotFound = Result("CurrentNegEPNotFound")
ResultEPSNotFound = Result("EPSNotFound")
ResultOtherError = Result("OtherError")
ResultInProgress = Result("InProgress")
ResultSuccess = Result("Success")
)

var (
ErrEPCountsDiffer = errors.New("endpoint counts from endpointData and endpointPodMap differ")
ErrEPMissingNodeName = errors.New("endpoint has empty nodeName field")
ErrNodeNotFound = errors.New("failed to retrieve associated zone of node")
ErrEPMissingZone = errors.New("endpoint has empty zone field")
ErrEPNodeMissing = errors.New("endpoint has missing nodeName field")
ErrEPNodeNotFound = errors.New("endpoint corresponds to an non-existing node")
ErrEPPodMissing = errors.New("endpoint has missing pod field")
ErrEPPodNotFound = errors.New("endpoint corresponds to an non-existing pod")
ErrTypeAssertionToPod = errors.New("type assertion to pod failed")
ErrEPZoneMissing = errors.New("endpoint has missing zone field")
ErrEPSEndpointCountZero = errors.New("endpoint count from endpointData cannot be zero")
ErrEPCalculationCountZero = errors.New("endpoint count from endpointPodMap cannot be zero")
ErrInvalidAPIResponse = errors.New("received response error doesn't match googleapi.Error type")
ErrInvalidEPAttach = errors.New("endpoint information for attach operation is incorrect")
ErrInvalidEPDetach = errors.New("endpoint information for detach operation is incorrect")

// use this map for conversion between errors and sync results
ErrorStateResult = map[error]string{
ErrEPMissingNodeName: ResultEPMissingNodeName,
ErrEPMissingZone: ResultEPMissingZone,
ErrorStateResult = map[error]Result{
ErrEPNodeMissing: ResultEPNodeMissing,
ErrEPNodeNotFound: ResultEPNodeNotFound,
ErrEPPodMissing: ResultEPPodMissing,
ErrEPPodNotFound: ResultEPPodNotFound,
ErrTypeAssertionToPod: ResultTypeAssertionToPod,
ErrEPZoneMissing: ResultEPZoneMissing,
ErrEPCalculationCountZero: ResultEPCalculationCountZero,
ErrEPSEndpointCountZero: ResultEPSEndpointCountZero,
ErrEPCountsDiffer: ResultEPCountsDiffer,
Expand All @@ -61,10 +73,10 @@ var (

type NegSyncResult struct {
Error error
Result string
Result Result
}

func NewNegSyncResult(err error, result string) *NegSyncResult {
func NewNegSyncResult(err error, result Result) *NegSyncResult {
return &NegSyncResult{
Error: err,
Result: result,
Expand Down

0 comments on commit 776bb3f

Please sign in to comment.