Skip to content

Commit

Permalink
Concurrently process deletion candidates within NEG Garbage Collector
Browse files Browse the repository at this point in the history
  • Loading branch information
gauravkghildiyal committed Feb 28, 2023
1 parent 65d33bd commit 4111167
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 46 deletions.
1 change: 1 addition & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
ctx.ClusterNamer,
flags.F.ResyncPeriod,
flags.F.NegGCPeriod,
flags.F.NumNegGCWorkers,
flags.F.EnableReadinessReflector,
flags.F.RunIngressController,
flags.F.RunL4Controller,
Expand Down
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
IngressClass string
KubeConfigFile string
NegGCPeriod time.Duration
NumNegGCWorkers int
NodePortRanges PortRanges
ResyncPeriod time.Duration
L4NetLBProvisionDeadline time.Duration
Expand Down Expand Up @@ -231,6 +232,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.StringVar(&F.LeaderElection.LockObjectName, "lock-object-name", F.LeaderElection.LockObjectName, "Define the name of the lock object.")
flag.DurationVar(&F.NegGCPeriod, "neg-gc-period", 120*time.Second,
`Relist and garbage collect NEGs this often.`)
flag.IntVar(&F.NumNegGCWorkers, "num-neg-gc-workers", 10, "Number of goroutines created by NEG garbage collector. This value controls the maximum number of concurrent calls made to the GCE NEG Delete API.")
flag.BoolVar(&F.EnableReadinessReflector, "enable-readiness-reflector", true, "Enable NEG Readiness Reflector")
flag.BoolVar(&F.FinalizerAdd, "enable-finalizer-add",
F.FinalizerAdd, "Enable adding Finalizer to Ingress.")
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewController(
namer negtypes.NetworkEndpointGroupNamer,
resyncPeriod time.Duration,
gcPeriod time.Duration,
numGCWorkers int,
enableReadinessReflector bool,
runIngress bool,
runL4Controller bool,
Expand Down Expand Up @@ -166,6 +167,7 @@ func NewController(
svcNegInformer.GetIndexer(),
syncerMetrics,
enableNonGcpMode,
numGCWorkers,
logger)

var reflector readiness.Reflector
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
testContext.NegNamer,
testContext.ResyncPeriod,
testContext.ResyncPeriod,
testContext.NumGCWorkers,
// TODO(freehan): enable readiness reflector for unit tests
false, // enableReadinessReflector
true, // runIngress
Expand Down
136 changes: 90 additions & 46 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type syncerManager struct {
// This will make all NEGs created by NEG controller to be NON_GCP_PRIVATE_IP_PORT type.
enableNonGcpMode bool

// Number of goroutines created for NEG garbage collection. This value
// controls the maximum number of concurrent calls that can be made to the GCE
// NEG Delete API.
numGCWorkers int

logger klog.Logger

// zone maps keep track of the last set of zones the neg controller has seen
Expand All @@ -116,6 +121,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
svcNegLister cache.Indexer,
syncerMetrics *metrics.SyncerMetrics,
enableNonGcpMode bool,
numGCWorkers int,
logger klog.Logger) *syncerManager {

var vmIpZoneMap, vmIpPortZoneMap map[string]struct{}
Expand All @@ -138,6 +144,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
svcNegClient: svcNegClient,
kubeSystemUID: kubeSystemUID,
enableNonGcpMode: enableNonGcpMode,
numGCWorkers: numGCWorkers,
logger: logger,
vmIpZoneMap: vmIpZoneMap,
vmIpPortZoneMap: vmIpPortZoneMap,
Expand Down Expand Up @@ -528,26 +535,68 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
// The worst outcome of the race condition is that neg is deleted in the end but user actually specifies a neg.
// This would be resolved (sync neg) when the next endpoint update or resync arrives.
// TODO: avoid race condition here

var errList []error
// Deletion candidate NEGs should be deleted from all zones, even ones that currently don't have any Ready nodes.
// errListMutex protects writes to errList. It should be acquired when one
// expects concurrent writes to errList.
var errListMutex sync.Mutex

// Deletion candidate NEGs should be deleted from all zones, even ones that
// currently don't have any Ready nodes.
zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get zones during garbage collection: %w", err))
}

deletionCandidatesChan := make(chan *negv1beta1.ServiceNetworkEndpointGroup, len(deletionCandidates))
for _, dc := range deletionCandidates {
deletionCandidatesChan <- dc
}
close(deletionCandidatesChan)

wg := sync.WaitGroup{}
wg.Add(len(deletionCandidates))
for i := 0; i < manager.numGCWorkers; i++ {
go func() {
for svcNegCR := range deletionCandidatesChan {
errs := manager.processNEGDeletionCandidate(svcNegCR, zones)

errListMutex.Lock()
errList = append(errList, errs...)
errListMutex.Unlock()

wg.Done()
}
}()
}
wg.Wait()

return utilerrors.NewAggregate(errList)
}

// processNEGDeletionCandidate attempts to delete `svcNegCR` and all NEGs
// associated with it. In case when `svcNegCR` does not have ample information
// about the zones associated with this NEG, it will attempt to delete the NEG
// from all zones specified through the `zones` slice.
func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.ServiceNetworkEndpointGroup, zones []string) []error {
manager.logger.V(2).Info("Count of NEGs referenced by SvcNegCR", "svcneg", klog.KObj(svcNegCR), "count", len(svcNegCR.Status.NetworkEndpointGroups))
var errList []error
shouldDeleteNegCR := true
deleteByZone := len(svcNegCR.Status.NetworkEndpointGroups) == 0

// deleteNegOrReportErr will attempt to delete the specified NEG resource in the cloud. If an error
// occurs, it will report an error as an event on the given CR. If an error does occur, false will
// be returned to indicate that the CR should not be deleted.
deleteNegOrReportErr := func(name, zone string, cr *negv1beta1.ServiceNetworkEndpointGroup) bool {
deleteNegOrReportErr := func(name, zone string, svcNegCR *negv1beta1.ServiceNetworkEndpointGroup) bool {
expectedDesc := &utils.NegDescription{
ClusterUID: string(manager.kubeSystemUID),
Namespace: cr.Namespace,
ServiceName: cr.GetLabels()[negtypes.NegCRServiceNameKey],
Port: cr.GetLabels()[negtypes.NegCRServicePortKey],
Namespace: svcNegCR.Namespace,
ServiceName: svcNegCR.GetLabels()[negtypes.NegCRServiceNameKey],
Port: svcNegCR.GetLabels()[negtypes.NegCRServicePortKey],
}
if err := manager.ensureDeleteNetworkEndpointGroup(name, zone, expectedDesc); err != nil {
err = fmt.Errorf("failed to delete NEG %s in %s: %s", name, zone, err)
manager.recorder.Eventf(cr, v1.EventTypeWarning, negtypes.NegGCError, err.Error())
manager.recorder.Eventf(svcNegCR, v1.EventTypeWarning, negtypes.NegGCError, err.Error())
errList = append(errList, err)

// Error when deleting NEG and return false to indicate not to delete Neg CR
Expand All @@ -557,55 +606,50 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
return true
}

for _, cr := range deletionCandidates {
shouldDeleteNegCR := true
deleteByZone := len(cr.Status.NetworkEndpointGroups) == 0
manager.logger.V(2).Info("Count of NEG references for deletion candidate", "count", len(cr.Status.NetworkEndpointGroups), "svcneg", klog.KObj(cr))
for _, negRef := range cr.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", cr.Namespace, cr.Name, err))
deleteByZone = true
continue
}

shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, cr)
for _, negRef := range svcNegCR.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", svcNegCR.Namespace, svcNegCR.Name, err))
deleteByZone = true
continue
}

if deleteByZone {
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(cr), "cr", cr)
for _, zone := range zones {
shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(cr.Name, zone, cr)
}
}
shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR)
}

if !shouldDeleteNegCR {
continue
if deleteByZone {
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(svcNegCR), "svcNegCR", svcNegCR)
for _, zone := range zones {
shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR)
}
}

func() {
manager.mu.Lock()
defer manager.mu.Unlock()
if !shouldDeleteNegCR {
return errList
}

// Verify that the NEG is still not wanted before deleting the CR. Mitigates the possibility of the race
// condition mentioned above
svcKey := getServiceKey(cr.Namespace, cr.GetLabels()[negtypes.NegCRServiceNameKey])
portInfoMap := manager.svcPortMap[svcKey]
for _, portInfo := range portInfoMap {
if portInfo.NegName == cr.Name {
manager.logger.V(2).Info("NEG CR is still desired, skipping deletion", "svcneg", klog.KObj(cr))
return
}
}
func() {
manager.mu.Lock()
defer manager.mu.Unlock()

manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(cr))
if err := deleteSvcNegCR(manager.svcNegClient, cr, manager.logger); err != nil {
errList = append(errList, err)
// Verify that the NEG is still not wanted before deleting the CR. Mitigates the possibility of the race
// condition mentioned above
svcKey := getServiceKey(svcNegCR.Namespace, svcNegCR.GetLabels()[negtypes.NegCRServiceNameKey])
portInfoMap := manager.svcPortMap[svcKey]
for _, portInfo := range portInfoMap {
if portInfo.NegName == svcNegCR.Name {
manager.logger.V(2).Info("NEG CR is still desired, skipping deletion", "svcneg", klog.KObj(svcNegCR))
return
}
}()
}
}

return utilerrors.NewAggregate(errList)
manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(svcNegCR))
if err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger); err != nil {
errList = append(errList, err)
}
}()

return errList
}

// ensureDeleteNetworkEndpointGroup ensures neg is delete from zone
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce
testContext.SvcNegInformer.GetIndexer(),
metrics.FakeSyncerMetrics(),
false, //enableNonGcpMode
testContext.NumGCWorkers,
klog.TODO(),
)
return manager, testContext.Cloud
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/types/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
resyncPeriod = 1 * time.Second
kubeSystemUID = "kube-system-uid"
clusterID = "clusterid"
numGCWorkers = 5
)

// TestContext provides controller context for testing
Expand All @@ -61,6 +62,7 @@ type TestContext struct {

KubeSystemUID types.UID
ResyncPeriod time.Duration
NumGCWorkers int
}

func NewTestContext() *TestContext {
Expand Down Expand Up @@ -91,5 +93,6 @@ func NewTestContextWithKubeClient(kubeClient kubernetes.Interface) *TestContext
SvcNegInformer: informersvcneg.NewServiceNetworkEndpointGroupInformer(negClient, namespace, resyncPeriod, utils.NewNamespaceIndexer()),
KubeSystemUID: kubeSystemUID,
ResyncPeriod: resyncPeriod,
NumGCWorkers: numGCWorkers,
}
}

0 comments on commit 4111167

Please sign in to comment.