Skip to content

Commit

Permalink
Fix creation of NEGs in the new zone when cluster spans to the new zone
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Aug 26, 2022
1 parent cfcb825 commit 0ddfb88
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 36 deletions.
22 changes: 11 additions & 11 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func NewController(
})
}

nodeEventHandler := cache.ResourceEventHandlerFuncs{
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
Expand All @@ -301,20 +301,20 @@ func NewController(
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
},
}

if negController.runL4 {
nodeEventHandler.UpdateFunc = func(old, cur interface{}) {
UpdateFunc: func(old, cur interface{}) {
oldNode := old.(*apiv1.Node)
currentNode := cur.(*apiv1.Node)
candidateNodeCheck := utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
if candidateNodeCheck(oldNode) != candidateNodeCheck(currentNode) {
logger.Info("Node has changed, enqueueing", "node", klog.KObj(currentNode))

vmIpCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType)
vmIpPortCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType)

if vmIpCandidateNodeCheck(oldNode) != vmIpCandidateNodeCheck(currentNode) ||
vmIpPortCandidateNodeCheck(oldNode) != vmIpPortCandidateNodeCheck(currentNode) {
logger.Info("Node has changed, enqueueing", "node", currentNode.Name)
negController.enqueueNode(currentNode)
}
}
}
nodeInformer.AddEventHandler(nodeEventHandler)
},
})

if enableAsm {
negController.enableASM = enableAsm
Expand Down
66 changes: 42 additions & 24 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ type syncerManager struct {
enableNonGcpMode bool
enableEndpointSlices bool

// zoneMap keeps track of the last set of zones the neg controller
// has seen. zoneMap is protected by the mu mutex.
zoneMap map[string]struct{}

logger klog.Logger

// zone maps keep track of the last set of zones the neg controller has seen
// for their respective NEG types. zone maps are protected by the mu mutex.
vmIpZoneMap map[string]struct{}
vmIpPortZoneMap map[string]struct{}
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
Expand All @@ -117,14 +118,10 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
enableEndpointSlices bool,
logger klog.Logger) *syncerManager {

zones, err := zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
logger.V(3).Info("Unable to initialize zone map in neg manager", "err", err)
}
zoneMap := make(map[string]struct{})
for _, zone := range zones {
zoneMap[zone] = struct{}{}
}
var vmIpZoneMap, vmIpPortZoneMap map[string]struct{}
updateZoneMap(&vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), zoneGetter, logger)
updateZoneMap(&vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), zoneGetter, logger)

return &syncerManager{
namer: namer,
recorder: recorder,
Expand All @@ -142,8 +139,9 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
kubeSystemUID: kubeSystemUID,
enableNonGcpMode: enableNonGcpMode,
enableEndpointSlices: enableEndpointSlices,
zoneMap: zoneMap,
logger: logger,
vmIpZoneMap: vmIpZoneMap,
vmIpPortZoneMap: vmIpPortZoneMap,
}
}

Expand Down Expand Up @@ -289,21 +287,40 @@ func (manager *syncerManager) SyncNodes() {
defer manager.mu.Unlock()

// When a zone change occurs (new zone is added or deleted), a sync should be triggered
isZoneChange := manager.updateZoneMap()
isVmIpZoneChange := updateZoneMap(&manager.vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), manager.zoneGetter, manager.logger)
isVmIpPortZoneChange := updateZoneMap(&manager.vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), manager.zoneGetter, manager.logger)

for key, syncer := range manager.syncerMap {
needSync := isZoneChange || key.NegType == negtypes.VmIpEndpointType
if needSync && !syncer.IsStopped() {
syncer.Sync()
if syncer.IsStopped() {
continue
}

switch key.NegType {

case negtypes.VmIpEndpointType:
if isVmIpZoneChange {
syncer.Sync()
}

case negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType:
if isVmIpPortZoneChange {
syncer.Sync()
}

default:
manager.logger.Error(nil, "Not triggering sync for syncer of unknown type", "syncerType", key.NegType)
}
}
}

// updateZoneMap updates the manager's zone map with the current zones and returns true if the
// zones have changed. The caller must obtain mu mutex before calling this function
func (manager *syncerManager) updateZoneMap() bool {
zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate)
// updateZoneMap updates the existingZoneMap with the latest zones and returns
// true if the zones have changed. The caller must obtain mu mutex of the
// manager before calling this function since it modifies the passed
// existingZoneMap.
func updateZoneMap(existingZoneMap *map[string]struct{}, candidateNodePredicate utils.NodeConditionPredicate, zoneGetter negtypes.ZoneGetter, logger klog.Logger) bool {
zones, err := zoneGetter.ListZones(candidateNodePredicate)
if err != nil {
manager.logger.Error(err, "Unable to list zones")
logger.Error(err, "Unable to list zones")
return false
}

Expand All @@ -312,8 +329,9 @@ func (manager *syncerManager) updateZoneMap() bool {
newZoneMap[zone] = struct{}{}
}

zoneChange := !reflect.DeepEqual(manager.zoneMap, newZoneMap)
manager.zoneMap = newZoneMap
zoneChange := !reflect.DeepEqual(*existingZoneMap, newZoneMap)
*existingZoneMap = newZoneMap

return zoneChange
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ func TestSyncNodesConditions(t *testing.T) {
},
{
desc: "vm ip neg, zones are the same",
expectSync: true,
expectSync: false,
negType: negtypes.VmIpEndpointType,
},
{
Expand Down
8 changes: 8 additions & 0 deletions pkg/neg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,14 @@ func EndpointsDataFromEndpointSlices(slices []*discovery.EndpointSlice) []Endpoi
func NodePredicateForEndpointCalculatorMode(mode EndpointsCalculatorMode) utils.NodeConditionPredicate {
// VM_IP NEGs can include unready and upgrading nodes.
if mode == L4ClusterMode || mode == L4LocalMode {
return NodePredicateForNetworkEndpointType(VmIpEndpointType)
}
return NodePredicateForNetworkEndpointType(VmIpPortEndpointType)
}

// NodePredicateForNetworkEndpointType returns the predicate function to select candidate nodes, given the NEG type.
func NodePredicateForNetworkEndpointType(negType NetworkEndpointType) utils.NodeConditionPredicate {
if negType == VmIpEndpointType {
return utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
}
return utils.CandidateNodesPredicate
Expand Down

0 comments on commit 0ddfb88

Please sign in to comment.