Skip to content

Commit

Permalink
Use NEG CRs for NEG Garbage Collection
Browse files Browse the repository at this point in the history
 - Add finalizer to NEG CRs to ensure CR exists until NEG is deleted
 during GC
 - When neg crd is enabled, GC will use neg crs, otherwise will fallback
 to regular GC using the naming to determine which NEGs are to be
 deleted
  • Loading branch information
swetharepakula committed Jul 29, 2020
1 parent d14b9c9 commit a219968
Show file tree
Hide file tree
Showing 3 changed files with 303 additions and 5 deletions.
70 changes: 67 additions & 3 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/klog"
utilpointer "k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -236,10 +237,16 @@ func (manager *syncerManager) GC() error {
manager.garbageCollectSyncer()

// Garbage collect NEGs
if err := manager.garbageCollectNEG(); err != nil {
return fmt.Errorf("failed to garbage collect negs: %v", err)
var err error
if manager.svcNegClient != nil {
err = manager.garbageCollectNEGWithCRD()
} else {
err = manager.garbageCollectNEG()
}
return nil
if err != nil {
err = fmt.Errorf("failed to garbage collect negs: %v", err)
}
return err
}

// ReadinessGateEnabledNegs returns a list of NEGs which has readiness gate enabled for the input pod's namespace and labels.
Expand Down Expand Up @@ -370,6 +377,62 @@ func (manager *syncerManager) garbageCollectNEG() error {
return nil
}

// garbageCollectNEGWithCRD uses the NEG CRs and the svcPortMap to determine which NEGs
// need to be garbage collected. NEGs that have a corresponding CR with a deletion
// timestamp, don't exist in the svcPortMap or don't have a corresponding NEG in the
// will all be garbage collected
func (manager *syncerManager) garbageCollectNEGWithCRD() error {
deletionCandidates := map[string]*negv1beta1.ServiceNetworkEndpointGroup{}
negCRs := manager.svcNegLister.List()
for _, obj := range negCRs {
neg := obj.(*negv1beta1.ServiceNetworkEndpointGroup)
deletionCandidates[neg.Name] = neg
}

func() {
manager.mu.Lock()
defer manager.mu.Unlock()
for _, portInfoMap := range manager.svcPortMap {
for _, portInfo := range portInfoMap {
if neg, ok := deletionCandidates[portInfo.NegName]; ok && neg.GetDeletionTimestamp().IsZero() {
delete(deletionCandidates, portInfo.NegName)
}
}
}
}()

negList, err := manager.cloud.AggregatedListNetworkEndpointGroup(meta.VersionGA)
if err != nil {
return fmt.Errorf("failed to retrieve aggregated NEG list: %v", err)
}

var errList []error
for key, neg := range negList {
if key.Type() != meta.Zonal {
// covers the case when key.Zone is not populated
klog.V(4).Infof("Ignoring key %v as it is not zonal", key)
continue
}
if _, ok := deletionCandidates[neg.Name]; ok {
if err := manager.ensureDeleteNetworkEndpointGroup(neg.Name, key.Zone); err != nil {
return fmt.Errorf("failed to delete NEG %s in %s: %s", neg.Name, key.Zone, err)
}
}
}
for _, cr := range deletionCandidates {
cr.Finalizers = []string{}
if cr.GetDeletionTimestamp().IsZero() {
ts := metav1.Now()
cr.SetDeletionTimestamp(&ts)
}
if _, err := manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(cr.Namespace).Update(context.Background(), cr, metav1.UpdateOptions{}); err != nil {
errList = append(errList, err)
}
}

return utilerrors.NewAggregate(errList)
}

// ensureDeleteNetworkEndpointGroup ensures neg is delete from zone
func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string) error {
_, err := manager.cloud.GetNetworkEndpointGroup(name, zone, meta.VersionGA)
Expand Down Expand Up @@ -416,6 +479,7 @@ func (manager *syncerManager) ensureSvcNegCR(svcKey serviceKey, portInfo negtype
Namespace: svcKey.namespace,
OwnerReferences: []metav1.OwnerReference{*ownerReference},
Labels: labels,
Finalizers: []string{common.NegFinalizerKey},
},
}

Expand Down
236 changes: 234 additions & 2 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
negfake "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned/fake"

"k8s.io/ingress-gce/pkg/utils/common"
namer_util "k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/legacy-cloud-providers/gce"
)
Expand Down Expand Up @@ -1064,15 +1065,192 @@ func TestNegCRDeletions(t *testing.T) {
}
}

func TestGarbageCollectionNegCrdEnabled(t *testing.T) {
t.Parallel()

svc1 := &v1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: testServiceNamespace,
Name: testServiceName,
},
}
svc1.SetUID("svc-uid")

svc2 := svc1.DeepCopy()
svc2.Name = "svc-2"
svc2.SetUID("svc2-uid")
port80 := int32(80)
port81 := int32(81)

testCases := []struct {
desc string
negsExist bool
markedForDeletion bool
svc *v1.Service
svcPort int32
desiredPort bool
}{
{desc: "undesired service, undesired port, marked for deletion",
negsExist: true,
markedForDeletion: true,
svc: svc2,
svcPort: port81,
},
{desc: "undesired service, undesired port",
negsExist: true,
markedForDeletion: false,
svc: svc2,
svcPort: port81,
},
{desc: "desired service, desired port, marked for deletion",
negsExist: true,
markedForDeletion: true,
svc: svc1,
svcPort: port80,
desiredPort: true,
},
{desc: "desired service, undesired port, marked for deletion",
negsExist: true,
markedForDeletion: true,
svc: svc1,
svcPort: port81,
},
{desc: "desired service, undesired port",
negsExist: true,
markedForDeletion: false,
svc: svc1,
svcPort: port81,
},
{desc: "negs don't exist, undesired service, undesired port, marked for deletion",
negsExist: false,
markedForDeletion: true,
svc: svc2,
svcPort: port81,
},
{desc: "negs don't exist, undesired service, undesired port",
negsExist: false,
markedForDeletion: false,
svc: svc2,
svcPort: port81,
},
{desc: "negs don't exist, desired service, desired port, marked for deletion",
negsExist: false,
markedForDeletion: true,
svc: svc1,
svcPort: port80,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
for _, negName := range []string{"test", ""} {
kubeClient := fake.NewSimpleClientset()
svcNegClient := negfake.NewSimpleClientset()
manager := NewTestSyncerManagerWithNegClient(kubeClient, svcNegClient)
manager.serviceLister.Add(svc1)

// Create NEG to be GC'ed
if negName == "" {
negName = manager.namer.NEG("test", "test", tc.svcPort)
}

gcPortInfo := negtypes.PortInfo{PortTuple: negtypes.SvcPortTuple{Port: tc.svcPort}, NegName: negName}
cr := createNegCR(tc.svc, serviceKey{namespace: testServiceNamespace, name: testServiceName}, gcPortInfo)
if tc.markedForDeletion {
now := metav1.Now()
cr.SetDeletionTimestamp(&now)
}
if _, err := manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(cr.Namespace).Create(context2.TODO(), &cr, metav1.CreateOptions{}); err != nil {
t.Fatalf("failed to create neg cr")
}

crs := getNegCRs(t, svcNegClient, testServiceNamespace)
populateSvcNegCache(t, manager, svcNegClient, testServiceNamespace)

desiredNegName := manager.namer.NEG(testServiceNamespace, testServiceName, port80)
if tc.desiredPort {
desiredNegName = negName
}

// Initialize NEG that should not be deleted
ports := make(negtypes.PortInfoMap)
ports[negtypes.PortInfoMapKey{ServicePort: port80, Subset: ""}] = negtypes.PortInfo{PortTuple: negtypes.SvcPortTuple{Port: port80, TargetPort: "namedport"}, NegName: desiredNegName}
// Ignore errors caused by having desired NEG CR with deletion timestamp set
if err := manager.EnsureSyncers(testServiceNamespace, testServiceName, ports); err != nil && !(tc.desiredPort && tc.markedForDeletion) {
t.Fatalf("Failed to ensure syncer: %v", err)
}
// func waitForNegs(cloud negtypes.NetworkEndpointGroupCloud, numNegs int, version meta.Version) error {
err := waitForNegs(manager.cloud, 2, meta.VersionGA)
if err != nil {
t.Errorf("negs not created as expected")
}
// time.Sleep(time.Second)
crs = getNegCRs(t, svcNegClient, testServiceNamespace)
populateSvcNegCache(t, manager, svcNegClient, testServiceNamespace)

version := meta.VersionGA
for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType, negtypes.VmIpEndpointType} {
if networkEndpointType == negtypes.VmIpEndpointType {
version = meta.VersionAlpha
}

if tc.negsExist {
for _, zone := range []string{negtypes.TestZone1, negtypes.TestZone2} {
manager.cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Version: version,
Name: negName,
NetworkEndpointType: string(networkEndpointType),
}, zone)
}
}

if err := manager.GC(); err != nil {
t.Fatalf("Failed to GC: %v", err)
}

for _, zone := range []string{negtypes.TestZone1, negtypes.TestZone2} {
negs, _ := manager.cloud.ListNetworkEndpointGroup(zone, version)
for _, neg := range negs {
if neg.Name == negName {
t.Errorf("Expect NEG %q in zone %q to be GCed.", negName, zone)
}
}
}

crs = getNegCRs(t, svcNegClient, testServiceNamespace)
for _, cr := range crs {
if cr.Name == negName && (len(cr.Finalizers) != 0 || cr.GetDeletionTimestamp().IsZero()) {
t.Errorf("Expect NEG CR %s to be GCed.", negName)
}
if cr.Name == desiredNegName && desiredNegName != negName && (len(cr.Finalizers) == 0 || !cr.GetDeletionTimestamp().IsZero()) {
t.Errorf("Neg CR %s should not have been GC'ed", negName)
}
}

}
// make sure there is no leaking go routine
manager.StopSyncer(testServiceNamespace, testServiceName)

crs = getNegCRs(t, svcNegClient, testServiceNamespace)
for _, cr := range crs {
manager.svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(cr.Namespace).Delete(context2.TODO(), cr.Name, metav1.DeleteOptions{})
}
}
})
}

}

// Check that NEG CR Conditions exist and are in the expected condition
func checkNegCR(t *testing.T, neg *negv1beta1.ServiceNetworkEndpointGroup, svcKey serviceKey, svcUID apitypes.UID, expectedInfo negtypes.PortInfo) {

if neg.GetNamespace() != svcKey.namespace {
t.Errorf("neg namespace is %s, expected %s", neg.GetNamespace(), svcKey.namespace)
}

// TODO Add check for finalizer after Neg CRD Garbage Collection is implemented.

//check labels
labels := neg.GetLabels()
if len(labels) != 3 {
Expand Down Expand Up @@ -1109,6 +1287,15 @@ func checkNegCR(t *testing.T, neg *negv1beta1.ServiceNetworkEndpointGroup, svcKe
t.Errorf("Expected neg owner ref not block owner deltion")
}
}

finalizers := neg.GetFinalizers()
if len(finalizers) != 1 {
t.Errorf("Expected neg to have one finalizer, has %d", len(finalizers))
} else {
if finalizers[0] != common.NegFinalizerKey {
t.Errorf("Expected neg to have finalizer %s, but found %s", common.NegFinalizerKey, finalizers[0])
}
}
}

// populateSyncerManager for testing
Expand Down Expand Up @@ -1272,7 +1459,52 @@ func createNegCR(service *v1.Service, svcKey serviceKey, portInfo negtypes.PortI
Namespace: svcKey.namespace,
OwnerReferences: []metav1.OwnerReference{*ownerReference},
Labels: labels,
Finalizers: []string{common.NegFinalizerKey},
ResourceVersion: "rv",
},
}
}

// getNegCRs returns a list of NEG CRs under the specified namespace using the svcNegClient provided.
func getNegCRs(t *testing.T, svcNegClient svcnegclient.Interface, namespace string) []negv1beta1.ServiceNetworkEndpointGroup {
crs, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).List(context2.TODO(), metav1.ListOptions{})
if err != nil {
t.Errorf("failed to get neg crs")
}
return crs.Items
}

// populateSvcNegCache takes all the NEG CRs under the specified namespace using the provided the svcNegClient
// and use them to populate the manager's svcNeg cache.
func populateSvcNegCache(t *testing.T, manager *syncerManager, svcNegClient svcnegclient.Interface, namespace string) {
crs, err := svcNegClient.NetworkingV1beta1().ServiceNetworkEndpointGroups(namespace).List(context2.TODO(), metav1.ListOptions{})
if err != nil {
t.Errorf("failed to get neg crs")
}
for _, cr := range crs.Items {
negCR := cr
manager.svcNegLister.Add(&negCR)
}
}

// waitForNegs returns only after the number of negs exist in the provided cloud object equals numNegs.
// If the negs do not exist within a second, waitForNegs will return an error.
func waitForNegs(cloud negtypes.NetworkEndpointGroupCloud, numNegs int, version meta.Version) error {
timer := time.NewTimer(500 * time.Millisecond)
foundNegs := 0
for numNegs != foundNegs {
select {
case <-timer.C:
return fmt.Errorf("negs have not been created")
default:
negs, err := cloud.AggregatedListNetworkEndpointGroup(version)
if err != nil {
return err
}

foundNegs = len(negs)
}

}
return nil
}
2 changes: 2 additions & 0 deletions pkg/utils/common/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
LegacyILBFinalizer = "gke.networking.io/l4-ilb-v1"
// ILBFinalizerV2 is the finalizer used by newer controllers that implement Internal LoadBalancer services.
ILBFinalizerV2 = "gke.networking.io/l4-ilb-v2"
// NegFinalizerKey is the finalizer used by neg controller to ensure NEG CRs are deleted after corresponding negs are deleted
NegFinalizerKey = "networking.gke.io/neg-finalizer"
)

// IsDeletionCandidate is true if the passed in meta contains an ingress finalizer.
Expand Down

0 comments on commit a219968

Please sign in to comment.