diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 86955d9c7e..1a53d44b82 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -333,6 +333,7 @@ func runControllers(ctx *ingctx.ControllerContext) { ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, + flags.F.NumNegGCWorkers, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller, diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 78b6865c28..039ba1ddb8 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -80,6 +80,7 @@ var ( IngressClass string KubeConfigFile string NegGCPeriod time.Duration + NumNegGCWorkers int NodePortRanges PortRanges ResyncPeriod time.Duration L4NetLBProvisionDeadline time.Duration @@ -232,6 +233,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 flag.StringVar(&F.LeaderElection.LockObjectName, "lock-object-name", F.LeaderElection.LockObjectName, "Define the name of the lock object.") flag.DurationVar(&F.NegGCPeriod, "neg-gc-period", 120*time.Second, `Relist and garbage collect NEGs this often.`) + flag.IntVar(&F.NumNegGCWorkers, "num-neg-gc-workers", 10, "Number of goroutines created by NEG garbage collector. This value controls the maximum number of concurrent calls made to the GCE NEG Delete API.") flag.BoolVar(&F.EnableReadinessReflector, "enable-readiness-reflector", true, "Enable NEG Readiness Reflector") flag.BoolVar(&F.FinalizerAdd, "enable-finalizer-add", F.FinalizerAdd, "Enable adding Finalizer to Ingress.") diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index c66c297c9d..c748df1bff 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -122,6 +122,7 @@ func NewController( namer negtypes.NetworkEndpointGroupNamer, resyncPeriod time.Duration, gcPeriod time.Duration, + numGCWorkers int, enableReadinessReflector bool, runIngress bool, runL4Controller bool, @@ -166,6 +167,7 @@ func NewController( svcNegInformer.GetIndexer(), syncerMetrics, enableNonGcpMode, + numGCWorkers, logger) var reflector readiness.Reflector diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 34a9fbac21..4313c53954 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -132,6 +132,7 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test testContext.NegNamer, testContext.ResyncPeriod, testContext.ResyncPeriod, + testContext.NumGCWorkers, // TODO(freehan): enable readiness reflector for unit tests false, // enableReadinessReflector true, // runIngress diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index ec8ebb4e75..b35d34499e 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -95,6 +95,11 @@ type syncerManager struct { // This will make all NEGs created by NEG controller to be NON_GCP_PRIVATE_IP_PORT type. enableNonGcpMode bool + // Number of goroutines created for NEG garbage collection. This value + // controls the maximum number of concurrent calls that can be made to the GCE + // NEG Delete API. + numGCWorkers int + logger klog.Logger // zone maps keep track of the last set of zones the neg controller has seen @@ -116,6 +121,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, svcNegLister cache.Indexer, syncerMetrics *metrics.SyncerMetrics, enableNonGcpMode bool, + numGCWorkers int, logger klog.Logger) *syncerManager { var vmIpZoneMap, vmIpPortZoneMap map[string]struct{} @@ -138,6 +144,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, svcNegClient: svcNegClient, kubeSystemUID: kubeSystemUID, enableNonGcpMode: enableNonGcpMode, + numGCWorkers: numGCWorkers, logger: logger, vmIpZoneMap: vmIpZoneMap, vmIpPortZoneMap: vmIpPortZoneMap, @@ -528,84 +535,121 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error { // The worst outcome of the race condition is that neg is deleted in the end but user actually specifies a neg. // This would be resolved (sync neg) when the next endpoint update or resync arrives. // TODO: avoid race condition here + var errList []error - // Deletion candidate NEGs should be deleted from all zones, even ones that currently don't have any Ready nodes. + // errListMutex protects writes to errList. It should be acquired when one + // expects concurrent writes to errList. + var errListMutex sync.Mutex + + // Deletion candidate NEGs should be deleted from all zones, even ones that + // currently don't have any Ready nodes. zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate) if err != nil { errList = append(errList, fmt.Errorf("failed to get zones during garbage collection: %w", err)) } - // deleteNegOrReportErr will attempt to delete the specified NEG resource in the cloud. If an error - // occurs, it will report an error as an event on the given CR. If an error does occur, false will - // be returned to indicate that the CR should not be deleted. - deleteNegOrReportErr := func(name, zone string, cr *negv1beta1.ServiceNetworkEndpointGroup) bool { - expectedDesc := &utils.NegDescription{ - ClusterUID: string(manager.kubeSystemUID), - Namespace: cr.Namespace, - ServiceName: cr.GetLabels()[negtypes.NegCRServiceNameKey], - Port: cr.GetLabels()[negtypes.NegCRServicePortKey], - } - if err := manager.ensureDeleteNetworkEndpointGroup(name, zone, expectedDesc); err != nil { - err = fmt.Errorf("failed to delete NEG %s in %s: %s", name, zone, err) - manager.recorder.Eventf(cr, v1.EventTypeWarning, negtypes.NegGCError, err.Error()) - errList = append(errList, err) + deletionCandidatesChan := make(chan *negv1beta1.ServiceNetworkEndpointGroup, len(deletionCandidates)) + for _, dc := range deletionCandidates { + deletionCandidatesChan <- dc + } + close(deletionCandidatesChan) - // Error when deleting NEG and return false to indicate not to delete Neg CR - return false - } + wg := sync.WaitGroup{} + wg.Add(len(deletionCandidates)) + for i := 0; i < manager.numGCWorkers; i++ { + go func() { + for svcNegCR := range deletionCandidatesChan { + errs := manager.processNEGDeletionCandidate(svcNegCR, zones) - return true - } + errListMutex.Lock() + errList = append(errList, errs...) + errListMutex.Unlock() - for _, cr := range deletionCandidates { - shouldDeleteNegCR := true - deleteByZone := len(cr.Status.NetworkEndpointGroups) == 0 - manager.logger.V(2).Info("Count of NEG references for deletion candidate", "count", len(cr.Status.NetworkEndpointGroups), "svcneg", klog.KObj(cr)) - for _, negRef := range cr.Status.NetworkEndpointGroups { - resourceID, err := cloud.ParseResourceURL(negRef.SelfLink) - if err != nil { - errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", cr.Namespace, cr.Name, err)) - deleteByZone = true - continue + wg.Done() } + }() + } + wg.Wait() + + return utilerrors.NewAggregate(errList) +} + +// processNEGDeletionCandidate attempts to delete `svcNegCR` and all NEGs +// associated with it. In case when `svcNegCR` does not have ample information +// about the zones associated with this NEG, it will attempt to delete the NEG +// from all zones specified through the `zones` slice. +func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.ServiceNetworkEndpointGroup, zones []string) []error { + manager.logger.V(2).Info("Count of NEGs referenced by SvcNegCR", "svcneg", klog.KObj(svcNegCR), "count", len(svcNegCR.Status.NetworkEndpointGroups)) + var errList []error + shouldDeleteNegCR := true + deleteByZone := len(svcNegCR.Status.NetworkEndpointGroups) == 0 - shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, cr) + for _, negRef := range svcNegCR.Status.NetworkEndpointGroups { + resourceID, err := cloud.ParseResourceURL(negRef.SelfLink) + if err != nil { + errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", svcNegCR.Namespace, svcNegCR.Name, err)) + deleteByZone = true + continue } - if deleteByZone { - manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(cr), "cr", cr) - for _, zone := range zones { - shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(cr.Name, zone, cr) + shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList) + } + + if deleteByZone { + manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(svcNegCR), "svcNegCR", svcNegCR) + for _, zone := range zones { + shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList) + } + } + + if !shouldDeleteNegCR { + return errList + } + + func() { + manager.mu.Lock() + defer manager.mu.Unlock() + + // Verify that the NEG is still not wanted before deleting the CR. Mitigates the possibility of the race + // condition mentioned in garbageCollectNEGWithCRD() + svcKey := getServiceKey(svcNegCR.Namespace, svcNegCR.GetLabels()[negtypes.NegCRServiceNameKey]) + portInfoMap := manager.svcPortMap[svcKey] + for _, portInfo := range portInfoMap { + if portInfo.NegName == svcNegCR.Name { + manager.logger.V(2).Info("NEG CR is still desired, skipping deletion", "svcneg", klog.KObj(svcNegCR)) + return } } - if !shouldDeleteNegCR { - continue + manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(svcNegCR)) + if err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger); err != nil { + errList = append(errList, err) } + }() - func() { - manager.mu.Lock() - defer manager.mu.Unlock() + return errList +} - // Verify that the NEG is still not wanted before deleting the CR. Mitigates the possibility of the race - // condition mentioned above - svcKey := getServiceKey(cr.Namespace, cr.GetLabels()[negtypes.NegCRServiceNameKey]) - portInfoMap := manager.svcPortMap[svcKey] - for _, portInfo := range portInfoMap { - if portInfo.NegName == cr.Name { - manager.logger.V(2).Info("NEG CR is still desired, skipping deletion", "svcneg", klog.KObj(cr)) - return - } - } +// deleteNegOrReportErr will attempt to delete the specified NEG resource in the +// cloud. Successful deletion is indicated by returning `true` and a failure +// would return `false`. In addition, if the deletion failed, the error will be +// reported as an event on the given CR and added to the passed `errList`. +func (manager *syncerManager) deleteNegOrReportErr(name, zone string, svcNegCR *negv1beta1.ServiceNetworkEndpointGroup, errList *[]error) bool { + expectedDesc := &utils.NegDescription{ + ClusterUID: string(manager.kubeSystemUID), + Namespace: svcNegCR.Namespace, + ServiceName: svcNegCR.GetLabels()[negtypes.NegCRServiceNameKey], + Port: svcNegCR.GetLabels()[negtypes.NegCRServicePortKey], + } + if err := manager.ensureDeleteNetworkEndpointGroup(name, zone, expectedDesc); err != nil { + err = fmt.Errorf("failed to delete NEG %s in %s: %s", name, zone, err) + manager.recorder.Eventf(svcNegCR, v1.EventTypeWarning, negtypes.NegGCError, err.Error()) + *errList = append(*errList, err) - manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(cr)) - if err := deleteSvcNegCR(manager.svcNegClient, cr, manager.logger); err != nil { - errList = append(errList, err) - } - }() + return false } - return utilerrors.NewAggregate(errList) + return true } // ensureDeleteNetworkEndpointGroup ensures neg is delete from zone diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 42b8a4f6fb..c0152a6cd7 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -93,6 +93,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce testContext.SvcNegInformer.GetIndexer(), metrics.FakeSyncerMetrics(), false, //enableNonGcpMode + testContext.NumGCWorkers, klog.TODO(), ) return manager, testContext.Cloud diff --git a/pkg/neg/types/testing.go b/pkg/neg/types/testing.go index 2cea0d3f9a..cf2e94090d 100644 --- a/pkg/neg/types/testing.go +++ b/pkg/neg/types/testing.go @@ -40,6 +40,7 @@ const ( resyncPeriod = 1 * time.Second kubeSystemUID = "kube-system-uid" clusterID = "clusterid" + numGCWorkers = 5 ) // TestContext provides controller context for testing @@ -61,6 +62,7 @@ type TestContext struct { KubeSystemUID types.UID ResyncPeriod time.Duration + NumGCWorkers int } func NewTestContext() *TestContext { @@ -91,5 +93,6 @@ func NewTestContextWithKubeClient(kubeClient kubernetes.Interface) *TestContext SvcNegInformer: informersvcneg.NewServiceNetworkEndpointGroupInformer(negClient, namespace, resyncPeriod, utils.NewNamespaceIndexer()), KubeSystemUID: kubeSystemUID, ResyncPeriod: resyncPeriod, + NumGCWorkers: numGCWorkers, } }