Skip to content

Commit

Permalink
Add v2 frontend namer workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
skmatti committed Nov 14, 2019
1 parent 4741042 commit a3d1201
Show file tree
Hide file tree
Showing 28 changed files with 1,541 additions and 179 deletions.
14 changes: 13 additions & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

flag "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/klog"

Expand Down Expand Up @@ -123,6 +125,16 @@ func main() {
klog.V(0).Infof("Cluster name: %+v", namer.UID())
}

var kubeSystemUID types.UID
if flags.F.EnableV2FrontendNamer {
// Get kube-system UID that will be used for v2 frontend naming scheme.
kubeSystemNS, err := kubeClient.CoreV1().Namespaces().Get("kube-system", metav1.GetOptions{})
if err != nil {
klog.Fatalf("Error getting kube-system namespace: %v", err)
}
kubeSystemUID = kubeSystemNS.GetUID()
}

cloud := app.NewGCEClient()
defaultBackendServicePort := app.DefaultBackendServicePort(kubeClient)
ctxConfig := ingctx.ControllerContextConfig{
Expand All @@ -136,7 +148,7 @@ func main() {
ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace,
ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName,
}
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, cloud, namer, ctxConfig)
ctx := ingctx.NewControllerContext(kubeConfig, kubeClient, backendConfigClient, frontendConfigClient, cloud, namer, kubeSystemUID, ctxConfig)
go app.RunHTTPServer(ctx.HealthCheck)

if !flags.F.LeaderElection.LeaderElect {
Expand Down
6 changes: 5 additions & 1 deletion pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
informerv1 "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -55,7 +56,8 @@ type ControllerContext struct {

Cloud *gce.Cloud

ClusterNamer *namer.Namer
ClusterNamer *namer.Namer
KubeSystemUID types.UID

ControllerContextConfig
ASMConfigController *cmconfig.ConfigMapConfigController
Expand Down Expand Up @@ -100,13 +102,15 @@ func NewControllerContext(
frontendConfigClient frontendconfigclient.Interface,
cloud *gce.Cloud,
namer *namer.Namer,
kubeSystemUID types.UID,
config ControllerContextConfig) *ControllerContext {

context := &ControllerContext{
KubeConfig: kubeConfig,
KubeClient: kubeClient,
Cloud: cloud,
ClusterNamer: namer,
KubeSystemUID: kubeSystemUID,
ControllerContextConfig: config,
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()),
Expand Down
173 changes: 141 additions & 32 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewLoadBalancerController(
hasSynced: ctx.HasSynced,
nodes: NewNodeController(ctx, instancePool),
instancePool: instancePool,
l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer, ctx, namer.NewFrontendNamerFactory(ctx.ClusterNamer)),
l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer, ctx, namer.NewFrontendNamerFactory(ctx.ClusterNamer, ctx.KubeSystemUID)),
backendSyncer: backends.NewBackendSyncer(backendPool, healthChecker, ctx.Cloud),
negLinker: backends.NewNEGLinker(backendPool, negtypes.NewAdapter(ctx.Cloud), ctx.Cloud),
igLinker: backends.NewInstanceGroupLinker(instancePool, backendPool),
Expand Down Expand Up @@ -330,9 +330,10 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
// TODO(rramkumar): Do we need deleteAll? Can we get rid of its' flag?
if deleteAll {
klog.Infof("Shutting down cluster manager.")
if err := lbc.l7Pool.Shutdown(); err != nil {
if err := lbc.l7Pool.Shutdown(lbc.ctx.Ingresses().List()); err != nil {
return err
}

// The backend pool will also delete instance groups.
return lbc.backendSyncer.Shutdown()
}
Expand Down Expand Up @@ -453,29 +454,45 @@ func (lbc *LoadBalancerController) SyncLoadBalancer(state interface{}) error {
return nil
}

// GCLoadBalancers implements Controller.
func (lbc *LoadBalancerController) GCLoadBalancers(toKeep []*v1beta1.Ingress) error {
// Only GCE ingress associated resources are managed by this controller.
GCEIngresses := operator.Ingresses(toKeep).Filter(utils.IsGCEIngress).AsList()
return lbc.l7Pool.GC(common.ToIngressKeys(GCEIngresses))
// GCv1LoadBalancers implements Controller.
func (lbc *LoadBalancerController) GCv1LoadBalancers(toKeep []*v1beta1.Ingress) error {
return lbc.l7Pool.GCv1(common.ToIngressKeys(toKeep))
}

// GCv2LoadBalancer implements Controller.
func (lbc *LoadBalancerController) GCv2LoadBalancer(ing *v1beta1.Ingress) error {
return lbc.l7Pool.GCv2(ing)
}

// MaybeRemoveFinalizers cleans up Finalizers if needed.
func (lbc *LoadBalancerController) MaybeRemoveFinalizers(toCleanup []*v1beta1.Ingress) error {
// EnsureDeleteV1Finalizers implements Controller.
func (lbc *LoadBalancerController) EnsureDeleteV1Finalizers(toCleanup []*v1beta1.Ingress) error {
if !flags.F.FinalizerRemove {
klog.V(4).Infof("Removing finalizers not enabled")
return nil
}
for _, ing := range toCleanup {
ingClient := lbc.ctx.KubeClient.NetworkingV1beta1().Ingresses(ing.Namespace)
if err := common.RemoveFinalizer(ing, ingClient); err != nil {
klog.Errorf("Failed to remove Finalizer from Ingress %s/%s: %v", ing.Namespace, ing.Name, err)
if err := common.EnsureDeleteFinalizer(ing, ingClient, common.FinalizerKey); err != nil {
klog.Errorf("common.EnsureDeleteFinalizer(%q, _, %q) = %v, want nil", common.NamespacedName(ing), common.FinalizerKey, err)
return err
}
}
return nil
}

// EnsureDeleteV2Finalizer implements Controller.
func (lbc *LoadBalancerController) EnsureDeleteV2Finalizer(ing *v1beta1.Ingress) error {
if !flags.F.FinalizerRemove {
klog.V(4).Infof("Removing finalizers not enabled")
return nil
}
ingClient := lbc.ctx.KubeClient.NetworkingV1beta1().Ingresses(ing.Namespace)
if err := common.EnsureDeleteFinalizer(ing, ingClient, common.FinalizerKeyV2); err != nil {
klog.Errorf("common.EnsureDeleteFinalizer(%q, _, %q) = %v, want nil", common.NamespacedName(ing), common.FinalizerKeyV2, err)
}
return nil
}

// PostProcess implements Controller.
func (lbc *LoadBalancerController) PostProcess(state interface{}) error {
// We expect state to be a syncState
Expand Down Expand Up @@ -503,45 +520,53 @@ func (lbc *LoadBalancerController) sync(key string) error {

// Snapshot of list of ingresses.
allIngresses := lbc.ctx.Ingresses().List()

var syncErr error
// Perform GC as a deferred function.
defer func() {
// Return immediately if there was an error.
// Note that Garbage collection will occur regardless of sync error occurring.
// If an error occurred, it could have been caused by quota issues; therefore,
// garbage collecting now may free up enough quota for the next sync to pass.
if err != nil {
return
}

frontendGcPath, gcFrontends := gcPath(ingExists, ing)
err = lbc.ingSyncer.GC(allIngresses, ing, frontendGcPath, gcFrontends)

if err != nil && syncErr != nil {
syncErr = fmt.Errorf("error during sync %v, error during GC %v", syncErr, err)
}
}()

// Determine if the ingress needs to be GCed.
if !ingExists || utils.NeedsCleanup(ing) {
// GC will find GCE resources that were used for this ingress and delete them.
return lbc.ingSyncer.GC(allIngresses)
// Return immediately so GC is invoked.
return err
}

// Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes.
ing = ing.DeepCopy()
ingClient := lbc.ctx.KubeClient.NetworkingV1beta1().Ingresses(ing.Namespace)
if flags.F.FinalizerAdd {
if err := common.AddFinalizer(ing, ingClient); err != nil {
klog.Errorf("Failed to add Finalizer to Ingress %q: %v", key, err)
return err
}
// Ensure that a finalizer is attached.
if err = lbc.ensureFinalizer(ing); err != nil {
return err
}

// Bootstrap state for GCP sync.
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPort.ID, lbc.ctx.ClusterNamer)

if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
return msg
err = fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", err.Error())
return err
}

// Sync GCP resources.
syncState := &syncState{urlMap, ing, nil}
syncErr := lbc.ingSyncer.Sync(syncState)
syncErr = lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
}

// Garbage collection will occur regardless of an error occurring. If an error occurred,
// it could have been caused by quota issues; therefore, garbage collecting now may
// free up enough quota for the next sync to pass.
if gcErr := lbc.ingSyncer.GC(allIngresses); gcErr != nil {
return fmt.Errorf("error during sync %v, error during GC %v", syncErr, gcErr)
}

return syncErr
}

Expand Down Expand Up @@ -649,3 +674,87 @@ func (lbc *LoadBalancerController) ToSvcPorts(ings []*v1beta1.Ingress) []utils.S
}
return knownPorts
}

// defaultFrontendNamingScheme returns frontend naming scheme for an ingress without finalizer.
// This is used for adding an appropriate finalizer on the ingress.
func (lbc *LoadBalancerController) defaultFrontendNamingScheme(ing *v1beta1.Ingress) (namer.Scheme, error) {
// Ingress frontend naming scheme is determined based on the following logic,
// V2 frontend namer is disabled : v1 frontend naming scheme
// V2 frontend namer is enabled
// - VIP does not exists : v2 frontend naming scheme
// - VIP exists
// - GCE URL Map exists : v1 frontend naming scheme
// - GCE URL Map does not exists : v2 frontend naming scheme
if !flags.F.EnableV2FrontendNamer {
return namer.V1NamingScheme, nil
}
if !utils.HasVIP(ing) {
return namer.V2NamingScheme, nil
}
urlMapExists, err := lbc.l7Pool.HasUrlMap(ing)
if err != nil {
return "", err
}
if urlMapExists {
return namer.V1NamingScheme, nil
}
return namer.V2NamingScheme, nil
}

// ensureFinalizer ensures that a finalizer is attached.
func (lbc *LoadBalancerController) ensureFinalizer(ing *v1beta1.Ingress) error {
if !flags.F.FinalizerAdd {
klog.V(4).Infof("Adding finalizers not enabled")
return nil
}
ingKey := common.NamespacedName(ing)
if common.HasFinalizer(ing.ObjectMeta) {
klog.V(4).Infof("Finalizer exists for ingress %s", ingKey)
return nil
}
// Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes.
ing = ing.DeepCopy()
ingClient := lbc.ctx.KubeClient.NetworkingV1beta1().Ingresses(ing.Namespace)
namingScheme, err := lbc.defaultFrontendNamingScheme(ing)
if err != nil {
return err
}
finalizerKey, err := namer.FinalizerForNamingScheme(namingScheme)
if err != nil {
return err
}
if err := common.EnsureFinalizer(ing, ingClient, finalizerKey); err != nil {
klog.Errorf("common.EnsureFinalizer(%q, _, %q) = %v, want nil", ingKey, finalizerKey, err)
return err
}
return nil
}

// gcPath returns the naming scheme using which frontend resources needs to be cleanedup.
// This also returns a boolean to specify if we need to delete frontend resources.
// GC path is
// If ingress does not exist : v1 frontends and all backends
// If ingress exists
// - Needs cleanup
// - If v1 naming scheme : v1 frontends and all backends
// - If v2 naming scheme : v2 frontends and all backends
// - Does not need cleanup
// - Finalizer enabled : all backends
// - Finalizer disabled : v1 frontends and all backends
func gcPath(ingExists bool, ing *v1beta1.Ingress) (namer.Scheme, bool) {
// If ingress does not exist, that means its pre-finalizer era.
// Run GC via v1 naming scheme.
if !ingExists {
return namer.V1NamingScheme, true
}
// Determine if we do not need to delete current ingress.
if !utils.NeedsCleanup(ing) {
// GC backends only if current ingress does not need cleanup and finalizers is enabled.
if flags.F.FinalizerAdd {
return "", false
}
return namer.V1NamingScheme, true
}
frontendGcPath := namer.FrontendNamingScheme(ing)
return frontendGcPath, true
}
Loading

0 comments on commit a3d1201

Please sign in to comment.