Skip to content

Commit

Permalink
Refactor syncInternalImpl and toZoneNetworkEndpointMap
Browse files Browse the repository at this point in the history
Refactor syncInternalImpl and toZoneNetworkEndpointMap
  • Loading branch information
sawsa307 committed Mar 30, 2023
1 parent 58f3b96 commit 397f4f2
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 280 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg
sm.syncerStatusMap = make(map[negtypes.NegSyncerKey]string)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap: %v", sm.syncerStatusMap)
}
sm.syncerStatusMap[key] = syncResult.Result
sm.syncerStatusMap[key] = string(syncResult.Result)
}

// SetSyncerEPMetrics update the endpoint count based on the endpointStat
Expand Down
3 changes: 2 additions & 1 deletion pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType)
result, err := toZoneNetworkEndpointMap(eds, l.zoneGetter, l.podLister, l.servicePortName, l.networkEndpointType)
return result.NetworkEndpointSet, result.EndpointPodMap, result.DupCount, err
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
Expand Down
111 changes: 55 additions & 56 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils/endpointslices"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -106,12 +105,6 @@ type transactionSyncer struct {

// syncCollector collect sync related metrics
syncCollector metrics.SyncerMetricsCollector

// enableDegradedMode indicates whether we do endpoint calculation using degraded mode procedures
enableDegradedMode bool

// podLabelPropagationConfig configures the pod label to be propagated to NEG endpoints
podLabelPropagationConfig labels.PodLabelPropagationConfig
}

func NewTransactionSyncer(
Expand All @@ -137,27 +130,25 @@ func NewTransactionSyncer(

// TransactionSyncer implements the syncer core
ts := &transactionSyncer{
NegSyncerKey: negSyncerKey,
needInit: true,
transactions: NewTransactionTable(),
nodeLister: nodeLister,
podLister: podLister,
serviceLister: serviceLister,
endpointSliceLister: endpointSliceLister,
svcNegLister: svcNegLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
endpointsCalculator: epc,
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
syncCollector: syncerMetrics,
customName: customName,
errorState: "",
logger: logger,
enableDegradedMode: flags.F.EnableDegradedMode,
podLabelPropagationConfig: lpConfig,
NegSyncerKey: negSyncerKey,
needInit: true,
transactions: NewTransactionTable(),
nodeLister: nodeLister,
podLister: podLister,
serviceLister: serviceLister,
endpointSliceLister: endpointSliceLister,
svcNegLister: svcNegLister,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
endpointsCalculator: epc,
reflector: reflector,
kubeSystemUID: kubeSystemUID,
svcNegClient: svcNegClient,
syncCollector: syncerMetrics,
customName: customName,
errorState: "",
logger: logger,
}
// Syncer implements life cycle logic
syncer := newSyncer(negSyncerKey, serviceLister, recorder, ts, logger)
Expand Down Expand Up @@ -207,17 +198,16 @@ func (s *transactionSyncer) syncInternal() error {
}

func (s *transactionSyncer) syncInternalImpl() error {
if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
Expand All @@ -243,26 +233,12 @@ func (s *transactionSyncer) syncInternalImpl() error {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
endpointSlices := convertUntypedToEPS(slices)
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
for i, slice := range slices {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
continue
}
lastSyncTimestamp := negCR.Status.LastSyncTime
epsCreationTimestamp := endpointslice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness)

if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
} else {
s.computeEPSStaleness(endpointSlices, negCR.Status.LastSyncTime)
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
Expand Down Expand Up @@ -364,7 +340,7 @@ func (s *transactionSyncer) getErrorStateReason(err error) string {
return ""
}
if result, contains := negtypes.ErrorStateResult[err]; contains {
return result
return string(result)
}
return ""
}
Expand Down Expand Up @@ -446,11 +422,11 @@ func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
switch s.errorState {
case "":
syncResult = negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
case negtypes.ResultInvalidAPIResponse:
case string(negtypes.ResultInvalidAPIResponse):
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidAPIResponse, negtypes.ResultInvalidAPIResponse)
case negtypes.ResultInvalidEPAttach:
case string(negtypes.ResultInvalidEPAttach):
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPAttach, negtypes.ResultInvalidEPAttach)
case negtypes.ResultInvalidEPDetach:
case string(negtypes.ResultInvalidEPDetach):
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPDetach, negtypes.ResultInvalidEPDetach)
default:
syncResult = negtypes.NewNegSyncResult(errors.New("Unknown error state value"), negtypes.ResultOtherError)
Expand Down Expand Up @@ -722,6 +698,29 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
}
}

func convertUntypedToEPS(endpointSliceUntyped []interface{}) []*discovery.EndpointSlice {
endpointSlices := make([]*discovery.EndpointSlice, len(endpointSliceUntyped))
for i, slice := range endpointSliceUntyped {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
}
return endpointSlices
}

func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.EndpointSlice, lastSyncTimestamp metav1.Time) {
for _, endpointSlice := range endpointSlices {
epsCreationTimestamp := endpointSlice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointSlice.Namespace, "Name", endpointSlice.Name, "staleness", epsStaleness)
}
}

// getNegFromStore returns the neg associated with the provided namespace and neg name if it exists otherwise throws an error
func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
n, exists, err := svcNegLister.GetByKey(fmt.Sprintf("%s/%s", namespace, negName))
Expand Down
Loading

0 comments on commit 397f4f2

Please sign in to comment.