Skip to content

Commit

Permalink
Merge pull request #1754 from gauravkghildiyal/negZone
Browse files Browse the repository at this point in the history
Fix creation of NEGs in the new zone when cluster spans to the new zone
  • Loading branch information
k8s-ci-robot authored Aug 26, 2022
2 parents cfcb825 + 0ddfb88 commit 2b5cafd
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 36 deletions.
22 changes: 11 additions & 11 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func NewController(
})
}

nodeEventHandler := cache.ResourceEventHandlerFuncs{
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
Expand All @@ -301,20 +301,20 @@ func NewController(
node := obj.(*apiv1.Node)
negController.enqueueNode(node)
},
}

if negController.runL4 {
nodeEventHandler.UpdateFunc = func(old, cur interface{}) {
UpdateFunc: func(old, cur interface{}) {
oldNode := old.(*apiv1.Node)
currentNode := cur.(*apiv1.Node)
candidateNodeCheck := utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
if candidateNodeCheck(oldNode) != candidateNodeCheck(currentNode) {
logger.Info("Node has changed, enqueueing", "node", klog.KObj(currentNode))

vmIpCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType)
vmIpPortCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType)

if vmIpCandidateNodeCheck(oldNode) != vmIpCandidateNodeCheck(currentNode) ||
vmIpPortCandidateNodeCheck(oldNode) != vmIpPortCandidateNodeCheck(currentNode) {
logger.Info("Node has changed, enqueueing", "node", currentNode.Name)
negController.enqueueNode(currentNode)
}
}
}
nodeInformer.AddEventHandler(nodeEventHandler)
},
})

if enableAsm {
negController.enableASM = enableAsm
Expand Down
66 changes: 42 additions & 24 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ type syncerManager struct {
enableNonGcpMode bool
enableEndpointSlices bool

// zoneMap keeps track of the last set of zones the neg controller
// has seen. zoneMap is protected by the mu mutex.
zoneMap map[string]struct{}

logger klog.Logger

// zone maps keep track of the last set of zones the neg controller has seen
// for their respective NEG types. zone maps are protected by the mu mutex.
vmIpZoneMap map[string]struct{}
vmIpPortZoneMap map[string]struct{}
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
Expand All @@ -117,14 +118,10 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
enableEndpointSlices bool,
logger klog.Logger) *syncerManager {

zones, err := zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
logger.V(3).Info("Unable to initialize zone map in neg manager", "err", err)
}
zoneMap := make(map[string]struct{})
for _, zone := range zones {
zoneMap[zone] = struct{}{}
}
var vmIpZoneMap, vmIpPortZoneMap map[string]struct{}
updateZoneMap(&vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), zoneGetter, logger)
updateZoneMap(&vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), zoneGetter, logger)

return &syncerManager{
namer: namer,
recorder: recorder,
Expand All @@ -142,8 +139,9 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
kubeSystemUID: kubeSystemUID,
enableNonGcpMode: enableNonGcpMode,
enableEndpointSlices: enableEndpointSlices,
zoneMap: zoneMap,
logger: logger,
vmIpZoneMap: vmIpZoneMap,
vmIpPortZoneMap: vmIpPortZoneMap,
}
}

Expand Down Expand Up @@ -289,21 +287,40 @@ func (manager *syncerManager) SyncNodes() {
defer manager.mu.Unlock()

// When a zone change occurs (new zone is added or deleted), a sync should be triggered
isZoneChange := manager.updateZoneMap()
isVmIpZoneChange := updateZoneMap(&manager.vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), manager.zoneGetter, manager.logger)
isVmIpPortZoneChange := updateZoneMap(&manager.vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), manager.zoneGetter, manager.logger)

for key, syncer := range manager.syncerMap {
needSync := isZoneChange || key.NegType == negtypes.VmIpEndpointType
if needSync && !syncer.IsStopped() {
syncer.Sync()
if syncer.IsStopped() {
continue
}

switch key.NegType {

case negtypes.VmIpEndpointType:
if isVmIpZoneChange {
syncer.Sync()
}

case negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType:
if isVmIpPortZoneChange {
syncer.Sync()
}

default:
manager.logger.Error(nil, "Not triggering sync for syncer of unknown type", "syncerType", key.NegType)
}
}
}

// updateZoneMap updates the manager's zone map with the current zones and returns true if the
// zones have changed. The caller must obtain mu mutex before calling this function
func (manager *syncerManager) updateZoneMap() bool {
zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate)
// updateZoneMap updates the existingZoneMap with the latest zones and returns
// true if the zones have changed. The caller must obtain mu mutex of the
// manager before calling this function since it modifies the passed
// existingZoneMap.
func updateZoneMap(existingZoneMap *map[string]struct{}, candidateNodePredicate utils.NodeConditionPredicate, zoneGetter negtypes.ZoneGetter, logger klog.Logger) bool {
zones, err := zoneGetter.ListZones(candidateNodePredicate)
if err != nil {
manager.logger.Error(err, "Unable to list zones")
logger.Error(err, "Unable to list zones")
return false
}

Expand All @@ -312,8 +329,9 @@ func (manager *syncerManager) updateZoneMap() bool {
newZoneMap[zone] = struct{}{}
}

zoneChange := !reflect.DeepEqual(manager.zoneMap, newZoneMap)
manager.zoneMap = newZoneMap
zoneChange := !reflect.DeepEqual(*existingZoneMap, newZoneMap)
*existingZoneMap = newZoneMap

return zoneChange
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ func TestSyncNodesConditions(t *testing.T) {
},
{
desc: "vm ip neg, zones are the same",
expectSync: true,
expectSync: false,
negType: negtypes.VmIpEndpointType,
},
{
Expand Down
8 changes: 8 additions & 0 deletions pkg/neg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,14 @@ func EndpointsDataFromEndpointSlices(slices []*discovery.EndpointSlice) []Endpoi
func NodePredicateForEndpointCalculatorMode(mode EndpointsCalculatorMode) utils.NodeConditionPredicate {
// VM_IP NEGs can include unready and upgrading nodes.
if mode == L4ClusterMode || mode == L4LocalMode {
return NodePredicateForNetworkEndpointType(VmIpEndpointType)
}
return NodePredicateForNetworkEndpointType(VmIpPortEndpointType)
}

// NodePredicateForNetworkEndpointType returns the predicate function to select candidate nodes, given the NEG type.
func NodePredicateForNetworkEndpointType(negType NetworkEndpointType) utils.NodeConditionPredicate {
if negType == VmIpEndpointType {
return utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes
}
return utils.CandidateNodesPredicate
Expand Down

0 comments on commit 2b5cafd

Please sign in to comment.