Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry pick #1976] Concurrently process deletion candidates within NEG Garbage Collector #2002

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
ctx.ClusterNamer,
flags.F.ResyncPeriod,
flags.F.NegGCPeriod,
flags.F.NumNegGCWorkers,
flags.F.EnableReadinessReflector,
flags.F.RunIngressController,
flags.F.RunL4Controller,
Expand Down
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
IngressClass string
KubeConfigFile string
NegGCPeriod time.Duration
NumNegGCWorkers int
NodePortRanges PortRanges
ResyncPeriod time.Duration
L4NetLBProvisionDeadline time.Duration
Expand Down Expand Up @@ -232,6 +233,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.StringVar(&F.LeaderElection.LockObjectName, "lock-object-name", F.LeaderElection.LockObjectName, "Define the name of the lock object.")
flag.DurationVar(&F.NegGCPeriod, "neg-gc-period", 120*time.Second,
`Relist and garbage collect NEGs this often.`)
flag.IntVar(&F.NumNegGCWorkers, "num-neg-gc-workers", 10, "Number of goroutines created by NEG garbage collector. This value controls the maximum number of concurrent calls made to the GCE NEG Delete API.")
flag.BoolVar(&F.EnableReadinessReflector, "enable-readiness-reflector", true, "Enable NEG Readiness Reflector")
flag.BoolVar(&F.FinalizerAdd, "enable-finalizer-add",
F.FinalizerAdd, "Enable adding Finalizer to Ingress.")
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func NewController(
namer negtypes.NetworkEndpointGroupNamer,
resyncPeriod time.Duration,
gcPeriod time.Duration,
numGCWorkers int,
enableReadinessReflector bool,
runIngress bool,
runL4Controller bool,
Expand Down Expand Up @@ -166,6 +167,7 @@ func NewController(
svcNegInformer.GetIndexer(),
syncerMetrics,
enableNonGcpMode,
numGCWorkers,
logger)

var reflector readiness.Reflector
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func newTestControllerWithParamsAndContext(kubeClient kubernetes.Interface, test
testContext.NegNamer,
testContext.ResyncPeriod,
testContext.ResyncPeriod,
testContext.NumGCWorkers,
// TODO(freehan): enable readiness reflector for unit tests
false, // enableReadinessReflector
true, // runIngress
Expand Down
156 changes: 100 additions & 56 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type syncerManager struct {
// This will make all NEGs created by NEG controller to be NON_GCP_PRIVATE_IP_PORT type.
enableNonGcpMode bool

// Number of goroutines created for NEG garbage collection. This value
// controls the maximum number of concurrent calls that can be made to the GCE
// NEG Delete API.
numGCWorkers int

logger klog.Logger

// zone maps keep track of the last set of zones the neg controller has seen
Expand All @@ -116,6 +121,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
svcNegLister cache.Indexer,
syncerMetrics *metrics.SyncerMetrics,
enableNonGcpMode bool,
numGCWorkers int,
logger klog.Logger) *syncerManager {

var vmIpZoneMap, vmIpPortZoneMap map[string]struct{}
Expand All @@ -138,6 +144,7 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer,
svcNegClient: svcNegClient,
kubeSystemUID: kubeSystemUID,
enableNonGcpMode: enableNonGcpMode,
numGCWorkers: numGCWorkers,
logger: logger,
vmIpZoneMap: vmIpZoneMap,
vmIpPortZoneMap: vmIpPortZoneMap,
Expand Down Expand Up @@ -528,84 +535,121 @@ func (manager *syncerManager) garbageCollectNEGWithCRD() error {
// The worst outcome of the race condition is that neg is deleted in the end but user actually specifies a neg.
// This would be resolved (sync neg) when the next endpoint update or resync arrives.
// TODO: avoid race condition here

var errList []error
// Deletion candidate NEGs should be deleted from all zones, even ones that currently don't have any Ready nodes.
// errListMutex protects writes to errList. It should be acquired when one
// expects concurrent writes to errList.
var errListMutex sync.Mutex

// Deletion candidate NEGs should be deleted from all zones, even ones that
// currently don't have any Ready nodes.
zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate)
if err != nil {
errList = append(errList, fmt.Errorf("failed to get zones during garbage collection: %w", err))
}

// deleteNegOrReportErr will attempt to delete the specified NEG resource in the cloud. If an error
// occurs, it will report an error as an event on the given CR. If an error does occur, false will
// be returned to indicate that the CR should not be deleted.
deleteNegOrReportErr := func(name, zone string, cr *negv1beta1.ServiceNetworkEndpointGroup) bool {
expectedDesc := &utils.NegDescription{
ClusterUID: string(manager.kubeSystemUID),
Namespace: cr.Namespace,
ServiceName: cr.GetLabels()[negtypes.NegCRServiceNameKey],
Port: cr.GetLabels()[negtypes.NegCRServicePortKey],
}
if err := manager.ensureDeleteNetworkEndpointGroup(name, zone, expectedDesc); err != nil {
err = fmt.Errorf("failed to delete NEG %s in %s: %s", name, zone, err)
manager.recorder.Eventf(cr, v1.EventTypeWarning, negtypes.NegGCError, err.Error())
errList = append(errList, err)
deletionCandidatesChan := make(chan *negv1beta1.ServiceNetworkEndpointGroup, len(deletionCandidates))
for _, dc := range deletionCandidates {
deletionCandidatesChan <- dc
}
close(deletionCandidatesChan)

// Error when deleting NEG and return false to indicate not to delete Neg CR
return false
}
wg := sync.WaitGroup{}
wg.Add(len(deletionCandidates))
for i := 0; i < manager.numGCWorkers; i++ {
go func() {
for svcNegCR := range deletionCandidatesChan {
errs := manager.processNEGDeletionCandidate(svcNegCR, zones)

return true
}
errListMutex.Lock()
errList = append(errList, errs...)
errListMutex.Unlock()

for _, cr := range deletionCandidates {
shouldDeleteNegCR := true
deleteByZone := len(cr.Status.NetworkEndpointGroups) == 0
manager.logger.V(2).Info("Count of NEG references for deletion candidate", "count", len(cr.Status.NetworkEndpointGroups), "svcneg", klog.KObj(cr))
for _, negRef := range cr.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", cr.Namespace, cr.Name, err))
deleteByZone = true
continue
wg.Done()
}
}()
}
wg.Wait()

return utilerrors.NewAggregate(errList)
}

// processNEGDeletionCandidate attempts to delete `svcNegCR` and all NEGs
// associated with it. In case when `svcNegCR` does not have ample information
// about the zones associated with this NEG, it will attempt to delete the NEG
// from all zones specified through the `zones` slice.
func (manager *syncerManager) processNEGDeletionCandidate(svcNegCR *negv1beta1.ServiceNetworkEndpointGroup, zones []string) []error {
manager.logger.V(2).Info("Count of NEGs referenced by SvcNegCR", "svcneg", klog.KObj(svcNegCR), "count", len(svcNegCR.Status.NetworkEndpointGroups))
var errList []error
shouldDeleteNegCR := true
deleteByZone := len(svcNegCR.Status.NetworkEndpointGroups) == 0

shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, cr)
for _, negRef := range svcNegCR.Status.NetworkEndpointGroups {
resourceID, err := cloud.ParseResourceURL(negRef.SelfLink)
if err != nil {
errList = append(errList, fmt.Errorf("failed to parse selflink for neg cr %s/%s: %s", svcNegCR.Namespace, svcNegCR.Name, err))
deleteByZone = true
continue
}

if deleteByZone {
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(cr), "cr", cr)
for _, zone := range zones {
shouldDeleteNegCR = shouldDeleteNegCR && deleteNegOrReportErr(cr.Name, zone, cr)
shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(resourceID.Key.Name, resourceID.Key.Zone, svcNegCR, &errList)
}

if deleteByZone {
manager.logger.V(2).Info("Deletion candidate has 0 NEG reference", "svcneg", klog.KObj(svcNegCR), "svcNegCR", svcNegCR)
for _, zone := range zones {
shouldDeleteNegCR = shouldDeleteNegCR && manager.deleteNegOrReportErr(svcNegCR.Name, zone, svcNegCR, &errList)
}
}

if !shouldDeleteNegCR {
return errList
}

func() {
manager.mu.Lock()
defer manager.mu.Unlock()

// Verify that the NEG is still not wanted before deleting the CR. Mitigates the possibility of the race
// condition mentioned in garbageCollectNEGWithCRD()
svcKey := getServiceKey(svcNegCR.Namespace, svcNegCR.GetLabels()[negtypes.NegCRServiceNameKey])
portInfoMap := manager.svcPortMap[svcKey]
for _, portInfo := range portInfoMap {
if portInfo.NegName == svcNegCR.Name {
manager.logger.V(2).Info("NEG CR is still desired, skipping deletion", "svcneg", klog.KObj(svcNegCR))
return
}
}

if !shouldDeleteNegCR {
continue
manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(svcNegCR))
if err := deleteSvcNegCR(manager.svcNegClient, svcNegCR, manager.logger); err != nil {
errList = append(errList, err)
}
}()

func() {
manager.mu.Lock()
defer manager.mu.Unlock()
return errList
}

// Verify that the NEG is still not wanted before deleting the CR. Mitigates the possibility of the race
// condition mentioned above
svcKey := getServiceKey(cr.Namespace, cr.GetLabels()[negtypes.NegCRServiceNameKey])
portInfoMap := manager.svcPortMap[svcKey]
for _, portInfo := range portInfoMap {
if portInfo.NegName == cr.Name {
manager.logger.V(2).Info("NEG CR is still desired, skipping deletion", "svcneg", klog.KObj(cr))
return
}
}
// deleteNegOrReportErr will attempt to delete the specified NEG resource in the
// cloud. Successful deletion is indicated by returning `true` and a failure
// would return `false`. In addition, if the deletion failed, the error will be
// reported as an event on the given CR and added to the passed `errList`.
func (manager *syncerManager) deleteNegOrReportErr(name, zone string, svcNegCR *negv1beta1.ServiceNetworkEndpointGroup, errList *[]error) bool {
expectedDesc := &utils.NegDescription{
ClusterUID: string(manager.kubeSystemUID),
Namespace: svcNegCR.Namespace,
ServiceName: svcNegCR.GetLabels()[negtypes.NegCRServiceNameKey],
Port: svcNegCR.GetLabels()[negtypes.NegCRServicePortKey],
}
if err := manager.ensureDeleteNetworkEndpointGroup(name, zone, expectedDesc); err != nil {
err = fmt.Errorf("failed to delete NEG %s in %s: %s", name, zone, err)
manager.recorder.Eventf(svcNegCR, v1.EventTypeWarning, negtypes.NegGCError, err.Error())
*errList = append(*errList, err)

manager.logger.V(2).Info("Deleting NEG CR", "svcneg", klog.KObj(cr))
if err := deleteSvcNegCR(manager.svcNegClient, cr, manager.logger); err != nil {
errList = append(errList, err)
}
}()
return false
}

return utilerrors.NewAggregate(errList)
return true
}

// ensureDeleteNetworkEndpointGroup ensures neg is delete from zone
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce
testContext.SvcNegInformer.GetIndexer(),
metrics.FakeSyncerMetrics(),
false, //enableNonGcpMode
testContext.NumGCWorkers,
klog.TODO(),
)
return manager, testContext.Cloud
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/types/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
resyncPeriod = 1 * time.Second
kubeSystemUID = "kube-system-uid"
clusterID = "clusterid"
numGCWorkers = 5
)

// TestContext provides controller context for testing
Expand All @@ -61,6 +62,7 @@ type TestContext struct {

KubeSystemUID types.UID
ResyncPeriod time.Duration
NumGCWorkers int
}

func NewTestContext() *TestContext {
Expand Down Expand Up @@ -91,5 +93,6 @@ func NewTestContextWithKubeClient(kubeClient kubernetes.Interface) *TestContext
SvcNegInformer: informersvcneg.NewServiceNetworkEndpointGroupInformer(negClient, namespace, resyncPeriod, utils.NewNamespaceIndexer()),
KubeSystemUID: kubeSystemUID,
ResyncPeriod: resyncPeriod,
NumGCWorkers: numGCWorkers,
}
}