Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor syncInternalImpl and toZoneNetworkEndpointMap #2044

Merged
merged 6 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg
sm.syncerStatusMap = make(map[negtypes.NegSyncerKey]string)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap: %v", sm.syncerStatusMap)
}
sm.syncerStatusMap[key] = syncResult.Result
sm.syncerStatusMap[key] = string(syncResult.Result)
}

// SetSyncerEPMetrics update the endpoint count based on the endpointStat
Expand Down
27 changes: 24 additions & 3 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *LocalL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand Down Expand Up @@ -166,6 +172,12 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand All @@ -176,16 +188,18 @@ type L7EndpointsCalculator struct {
zoneGetter types.ZoneGetter
servicePortName string
podLister cache.Indexer
nodeLister cache.Indexer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of the indexer, use the lister like in the L4 case.

not in this PR, but we should change the podLister to also be the lister instead of the indexer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I'll create a separate PR for that

networkEndpointType types.NetworkEndpointType
enableDualStackNEG bool
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
nodeLister: nodeLister,
networkEndpointType: endpointType,
enableDualStackNEG: enableDualStackNEG,
logger: logger.WithName("L7EndpointsCalculator"),
Expand All @@ -199,7 +213,14 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType)
result, err := toZoneNetworkEndpointMap(eds, l.zoneGetter, l.podLister, l.servicePortName, l.networkEndpointType)
return result.NetworkEndpointSet, result.EndpointPodMap, result.DupCount, err
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
Expand Down Expand Up @@ -235,7 +256,7 @@ func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.Endpoints
}

if countFromEndpointData != countFromPodMap {
l.logger.Info("Detected error when comparing endpoint counts", "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
l.logger.Info("Detected error when comparing endpoint counts", "countFromEndpointData", countFromEndpointData, "countFromPodMap", countFromPodMap, "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return types.ErrEPCountsDiffer
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func TestValidateEndpoints(t *testing.T) {
zoneGetter := negtypes.NewFakeZoneGetter()
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := listers.NewNodeLister(testContext.NodeInformer.GetIndexer())
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())

testEndpointPodMap := map[negtypes.NetworkEndpoint]types.NamespacedName{
{
Expand Down
159 changes: 86 additions & 73 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package syncers

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -187,6 +186,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
return NewL7EndpointsCalculator(
zoneGetter,
podLister,
nodeLister,
syncerKey.PortTuple.Name,
syncerKey.NegType,
logger,
Expand Down Expand Up @@ -220,17 +220,16 @@ func (s *transactionSyncer) syncInternal() error {
}

func (s *transactionSyncer) syncInternalImpl() error {
if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
swetharepakula marked this conversation as resolved.
Show resolved Hide resolved
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
Expand All @@ -246,8 +245,6 @@ func (s *transactionSyncer) syncInternalImpl() error {

var targetMap map[string]negtypes.NetworkEndpointSet
var endpointPodMap negtypes.EndpointPodMap
var dupCount int

slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return err
Expand All @@ -256,37 +253,31 @@ func (s *transactionSyncer) syncInternalImpl() error {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
for i, slice := range slices {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
continue
}
lastSyncTimestamp := negCR.Status.LastSyncTime
epsCreationTimestamp := endpointslice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness)
endpointSlices := convertUntypedToEPS(slices)
s.computeEPSStaleness(endpointSlices)

}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
targetMap, endpointPodMap, err = s.getEndpointsCalculation(endpointsData, currentMap)

if !s.enableDegradedMode && err != nil {
return err
}
err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount)
if err != nil {
// TODO(cheungdavid): return error from ValidateEndpoint after degraded mode is implemented
// for now we don't return error so it won't break the sync
s.setErrorState(s.getErrorStateReason(err))
} else if s.enableDegradedMode {
if !s.inErrorState() && err != nil {
return err // if we encounter an error, we will return and run the next sync in degraded mode
}
degradedTargetMap, degradedPodMap, degradedModeErr := s.endpointsCalculator.CalculateEndpointsDegradedMode(endpointsData, currentMap)
if degradedModeErr != nil {
return degradedModeErr
}
notInDegraded, onlyInDegraded := calculateNetworkEndpointDifference(targetMap, degradedTargetMap)
if s.inErrorState() {
targetMap = degradedTargetMap
endpointPodMap = degradedPodMap
if len(notInDegraded) == 0 && len(onlyInDegraded) == 0 {
s.resetErrorState()
}
}
// TODO(cheungdavid): in the else branch, publish metrics if we don't encounter error and we are not in error state
}
s.logStats(targetMap, "desired NEG endpoints")

Expand Down Expand Up @@ -317,6 +308,23 @@ func (s *transactionSyncer) syncInternalImpl() error {
return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
}

func (s *transactionSyncer) getEndpointsCalculation(
endpointsData []negtypes.EndpointsData,
currentMap map[string]negtypes.NetworkEndpointSet,
) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) {
targetMap, endpointPodMap, dupCount, err := s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return nil, nil, err
}
if s.enableDegradedMode {
err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount)
if err != nil {
return nil, nil, err
}
}
return targetMap, endpointPodMap, nil
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) inErrorState() bool {
return s.errorState != ""
Expand All @@ -327,6 +335,11 @@ func (s *transactionSyncer) setErrorState(errorState string) {
s.errorState = errorState
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) resetErrorState() {
swetharepakula marked this conversation as resolved.
Show resolved Hide resolved
s.errorState = ""
}

// ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones.
func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
var err error
Expand Down Expand Up @@ -376,7 +389,7 @@ func (s *transactionSyncer) getErrorStateReason(err error) string {
return ""
}
if result, contains := negtypes.ErrorStateResult[err]; contains {
return result
return string(result)
}
return ""
}
Expand All @@ -403,8 +416,6 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
for zone, endpointSet := range endpointMap {
if endpointSet.Len() == 0 {
Expand All @@ -428,10 +439,10 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if operation == attachOp {
s.attachNetworkEndpoints(zone, batch, &wg)
s.attachNetworkEndpoints(zone, batch)
}
if operation == detachOp {
s.detachNetworkEndpoints(zone, batch, &wg)
s.detachNetworkEndpoints(zone, batch)
}
}
return nil
Expand All @@ -444,52 +455,25 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
}
go s.collectSyncResult(&wg)
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
wg.Wait()
s.syncLock.Lock()
defer s.syncLock.Unlock()

var syncResult *negtypes.NegSyncResult
switch s.errorState {
case "":
syncResult = negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
case negtypes.ResultInvalidAPIResponse:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidAPIResponse, negtypes.ResultInvalidAPIResponse)
case negtypes.ResultInvalidEPAttach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPAttach, negtypes.ResultInvalidEPAttach)
case negtypes.ResultInvalidEPDetach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPDetach, negtypes.ResultInvalidEPDetach)
default:
syncResult = negtypes.NewNegSyncResult(errors.New("Unknown error state value"), negtypes.ResultOtherError)
}

s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult)
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap, wg)
go s.operationInternal(attachOp, zone, networkEndpointMap)
}

// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap, wg)
go s.operationInternal(detachOp, zone, networkEndpointMap)
}

// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
defer wg.Done()
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
Expand Down Expand Up @@ -734,6 +718,35 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
}
}

func convertUntypedToEPS(endpointSliceUntyped []interface{}) []*discovery.EndpointSlice {
endpointSlices := make([]*discovery.EndpointSlice, len(endpointSliceUntyped))
for i, slice := range endpointSliceUntyped {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
}
return endpointSlices
}

func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.EndpointSlice) {
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
return
}
lastSyncTimestamp := negCR.Status.LastSyncTime
for _, endpointSlice := range endpointSlices {
epsCreationTimestamp := endpointSlice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointSlice.Namespace, "Name", endpointSlice.Name, "staleness", epsStaleness)
}
}

// getNegFromStore returns the neg associated with the provided namespace and neg name if it exists otherwise throws an error
func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
n, exists, err := svcNegLister.GetByKey(fmt.Sprintf("%s/%s", namespace, negName))
Expand Down
Loading