Skip to content

Commit

Permalink
Refactor LoadBalancerPool to use cloud listing snapshotter
Browse files Browse the repository at this point in the history
  • Loading branch information
rramkumar1 committed Aug 31, 2018
1 parent d84c845 commit 73ed434
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 165 deletions.
4 changes: 2 additions & 2 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func (b *Backends) List() ([]interface{}, error) {
return nil, err
}
var ret []interface{}
for _, _ = range backends {
ret = append(ret, true)
for _, x := range backends {
ret = append(ret, x)
}
return ret, nil
}
24 changes: 13 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewLoadBalancerController(
hasSynced: ctx.HasSynced,
nodes: NewNodeController(ctx, instancePool),
instancePool: instancePool,
l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer),
l7Pool: loadbalancers.NewLoadBalancerPool(ctx.Cloud, ctx.ClusterNamer, true),
backendSyncer: backends.NewBackendSyncer(backendPool, healthChecker, ctx.ClusterNamer, ctx.BackendConfigEnabled),
negLinker: backends.NewNEGLinker(backendPool, ctx.Cloud, ctx.ClusterNamer),
igLinker: backends.NewInstanceGroupLinker(instancePool, backendPool, ctx.ClusterNamer),
Expand Down Expand Up @@ -369,9 +369,12 @@ func (lbc *LoadBalancerController) SyncLoadBalancer(state interface{}) error {
}

// Create higher-level LB resources.
if err := lbc.l7Pool.Sync(lb); err != nil {
l7, err := lbc.l7Pool.Sync(lb)
if err != nil {
return err
}
syncState.l7 = l7

return nil
}

Expand Down Expand Up @@ -400,16 +403,12 @@ func (lbc *LoadBalancerController) PostProcess(state interface{}) error {

// Get the loadbalancer and update the ingress status.
ing := syncState.ing
k, err := utils.KeyFunc(ing)
if err != nil {
return fmt.Errorf("cannot get key for Ingress %s/%s: %v", ing.Namespace, ing.Name, err)
}

l7, err := lbc.l7Pool.Get(k)
if err != nil {
return fmt.Errorf("unable to get loadbalancer: %v", err)
if syncState.l7 == nil {
return fmt.Errorf("sync state does not contain L7 spec")
}
if err := lbc.updateIngressStatus(l7, ing); err != nil {

if err := lbc.updateIngressStatus(syncState.l7, ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
}
return nil
Expand Down Expand Up @@ -449,7 +448,10 @@ func (lbc *LoadBalancerController) sync(key string) error {

// Bootstrap state for GCP sync.
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID)
syncState := &syncState{urlMap, ing}
syncState := &syncState{
urlMap: urlMap,
ing: ing,
}
if errs != nil {
return fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newLoadBalancerController() *LoadBalancerController {
lbc := NewLoadBalancerController(ctx, stopCh)
// TODO(rramkumar): Fix this so we don't have to override with our fake
lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer)
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(loadbalancers.NewFakeLoadBalancers(clusterUID, namer), namer)
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(loadbalancers.NewFakeLoadBalancers(clusterUID, namer), namer, false)
lbc.instancePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}})

lbc.hasSynced = func() bool { return true }
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controller

import (
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/utils"

extensions "k8s.io/api/extensions/v1beta1"
Expand All @@ -31,5 +32,6 @@ type gcState struct {
// syncState is used by the controller to maintain state for routines that sync GCP resources of an Ingress.
type syncState struct {
urlMap *utils.GCEURLMap
l7 *loadbalancers.L7
ing *extensions.Ingress
}
6 changes: 4 additions & 2 deletions pkg/loadbalancers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ type LoadBalancers interface {

// LoadBalancerPool is an interface to manage the cloud resources associated
// with a gce loadbalancer.
// TODO(rramkumar): Break up this interface into 2: Pool & Syncer.
type LoadBalancerPool interface {
Get(name string) (*L7, error)
// Note: Get is currrently only used for testing.
Get(name string) bool
Delete(name string) error
Sync(ri *L7RuntimeInfo) error
Sync(ri *L7RuntimeInfo) (*L7, error)
GC(names []string) error
Shutdown() error
}
93 changes: 42 additions & 51 deletions pkg/loadbalancers/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,68 +176,59 @@ func (l *L7) GetIP() string {
return ""
}

// Cleanup deletes resources specific to this l7 in the right order.
// Cleanup deletes resources specific to this l7 in the right order given a base name.
// forwarding rule -> target proxy -> url map
// This leaves backends and health checks, which are shared across loadbalancers.
func (l *L7) Cleanup() error {
if l.fw != nil {
glog.V(2).Infof("Deleting global forwarding rule %v", l.fw.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteGlobalForwardingRule(l.fw.Name)); err != nil {
return err
}
l.fw = nil
func Cleanup(name string, cloud LoadBalancers, namer *utils.Namer) error {
fwName := namer.ForwardingRule(name, "HTTP")
glog.V(2).Infof("Deleting global forwarding rule %v", fwName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteGlobalForwardingRule(fwName)); err != nil {
return err
}
if l.fws != nil {
glog.V(2).Infof("Deleting global forwarding rule %v", l.fws.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteGlobalForwardingRule(l.fws.Name)); err != nil {
return err
}
l.fws = nil
fwsName := namer.ForwardingRule(name, "HTTPS")
glog.V(2).Infof("Deleting global forwarding rule %v", fwsName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteGlobalForwardingRule(fwsName)); err != nil {
return err
}
if l.ip != nil {
glog.V(2).Infof("Deleting static IP %v(%v)", l.ip.Name, l.ip.Address)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteGlobalAddress(l.ip.Name)); err != nil {
return err
}
l.ip = nil

staticIPName := fwName
glog.V(2).Infof("Deleting static IP %v", staticIPName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteGlobalAddress(staticIPName)); err != nil {
return err
}
if l.tps != nil {
glog.V(2).Infof("Deleting target https proxy %v", l.tps.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteTargetHttpsProxy(l.tps.Name)); err != nil {
return err
}
l.tps = nil

tpsName := namer.TargetProxy(name, "HTTPS")
glog.V(2).Infof("Deleting target https proxy %v", tpsName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteTargetHttpsProxy(tpsName)); err != nil {
return err
}
// Delete the SSL cert if it is from a secret, not referencing a pre-created GCE cert.
if len(l.sslCerts) != 0 && l.runtimeInfo.TLSName == "" {
var certErr error
for _, cert := range l.sslCerts {
glog.V(2).Infof("Deleting sslcert %s", cert.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteSslCertificate(cert.Name)); err != nil {
glog.Errorf("Old cert delete failed - %v", err)
certErr = err
}

}
l.sslCerts = nil
if certErr != nil {
return certErr
}
certs, err := cloud.ListSslCertificates()
if err != nil {
return err
}
if l.tp != nil {
glog.V(2).Infof("Deleting target http proxy %v", l.tp.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteTargetHttpProxy(l.tp.Name)); err != nil {
return err
for _, c := range certs {
// Delete the SSL cert if it is from a secret, not referencing a pre-created GCE cert.
if namer.IsCertUsedForLB(name, c.Name) {
glog.V(2).Infof("Deleting sslcert %s", c.Name)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteSslCertificate(c.Name)); err != nil {
return err
}
}
l.tp = nil
}
if l.um != nil {
glog.V(2).Infof("Deleting url map %v", l.um.Name)
if err := utils.IgnoreHTTPNotFound(l.cloud.DeleteUrlMap(l.um.Name)); err != nil {
return err
}
l.um = nil

tpName := namer.TargetProxy(name, "HTTP")
glog.V(2).Infof("Deleting target http proxy %v", tpName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteTargetHttpProxy(tpName)); err != nil {
return err
}

umName := namer.UrlMap(name)
glog.V(2).Infof("Deleting url map %v", umName)
if err := utils.IgnoreHTTPNotFound(cloud.DeleteUrlMap(umName)); err != nil {
return err
}

return nil
}

Expand Down
90 changes: 53 additions & 37 deletions pkg/loadbalancers/l7s.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package loadbalancers

import (
"fmt"
"reflect"
"time"

"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"

"k8s.io/apimachinery/pkg/util/sets"

Expand All @@ -43,39 +44,45 @@ func (l *L7s) Namer() *utils.Namer {
// NewLoadBalancerPool returns a new loadbalancer pool.
// - cloud: implements LoadBalancers. Used to sync L7 loadbalancer resources
// with the cloud.
func NewLoadBalancerPool(cloud LoadBalancers, namer *utils.Namer) LoadBalancerPool {
return &L7s{cloud, storage.NewInMemoryPool(), namer}
func NewLoadBalancerPool(cloud LoadBalancers, namer *utils.Namer, resyncWithCloud bool) LoadBalancerPool {
l7Pool := &L7s{
cloud: cloud,
namer: namer,
}
if !resyncWithCloud {
l7Pool.snapshotter = storage.NewInMemoryPool()
}
keyFunc := func(i interface{}) (string, error) {
um := i.(*compute.UrlMap)
if !namer.NameBelongsToCluster(um.Name) {
return "", fmt.Errorf("unrecognized name %v", um.Name)
}
// Scrub out the UrlMap prefix of the name to get the base LB name.
return namer.ScrubUrlMapPrefix(um.Name), nil
}
l7Pool.snapshotter = storage.NewCloudListingPool("loadbalancers", keyFunc, l7Pool, 30*time.Second)
return l7Pool
}

// Get returns the loadbalancer by name.
func (l *L7s) Get(name string) (*L7, error) {
// Get implements LoadBalancerPool.
// Note: This is currently only used for testing.
func (l *L7s) Get(name string) bool {
name = l.namer.LoadBalancer(name)
lb, exists := l.snapshotter.Get(name)
if !exists {
return nil, fmt.Errorf("loadbalancer %v not in pool", name)
}
return lb.(*L7), nil
_, exists := l.snapshotter.Get(name)
return exists
}

// Sync a load balancer with the given runtime info from the controller.
func (l *L7s) Sync(ri *L7RuntimeInfo) error {
// Sync implements LoadBalancerPool.
func (l *L7s) Sync(ri *L7RuntimeInfo) (*L7, error) {
name := l.namer.LoadBalancer(ri.Name)

lb, _ := l.Get(name)
if lb == nil {
glog.V(3).Infof("Creating l7 %v", name)
lb = &L7{
runtimeInfo: ri,
Name: l.namer.LoadBalancer(ri.Name),
cloud: l.cloud,
namer: l.namer,
}
} else {
if !reflect.DeepEqual(lb.runtimeInfo, ri) {
glog.V(3).Infof("LB %v runtime info changed, old %+v new %+v", lb.Name, lb.runtimeInfo, ri)
lb.runtimeInfo = ri
}
glog.V(3).Infof("Sync: LB %s", name)
lb := &L7{
runtimeInfo: ri,
Name: l.namer.LoadBalancer(ri.Name),
cloud: l.cloud,
namer: l.namer,
}

// Add the lb to the pool, in case we create an UrlMap but run out
// of quota in creating the ForwardingRule we still need to cleanup
// the UrlMap during GC.
Expand All @@ -86,28 +93,24 @@ func (l *L7s) Sync(ri *L7RuntimeInfo) error {
// make it exist we need to create a collection of gce resources, done
// through the edge hop.
if err := lb.edgeHop(); err != nil {
return err
return lb, err
}

return nil
return lb, nil
}

// Delete deletes a load balancer by name.
// Delete implements LoadBalancerPool.
func (l *L7s) Delete(name string) error {
name = l.namer.LoadBalancer(name)
lb, err := l.Get(name)
if err != nil {
return err
}
glog.V(3).Infof("Deleting lb %v", name)
if err := lb.Cleanup(); err != nil {
if err := Cleanup(name, l.cloud, l.namer); err != nil {
return err
}
l.snapshotter.Delete(name)
return nil
}

// GC garbage collects loadbalancers not in the input list.
// GC implements LoadBalancerPool.
func (l *L7s) GC(names []string) error {
glog.V(4).Infof("GC(%v)", names)

Expand All @@ -131,11 +134,24 @@ func (l *L7s) GC(names []string) error {
return nil
}

// Shutdown logs whether or not the pool is empty.
// Shutdown implemented LoadBalancerPool.
func (l *L7s) Shutdown() error {
if err := l.GC([]string{}); err != nil {
return err
}
glog.V(2).Infof("Loadbalancer pool shutdown.")
return nil
}

// List lists all loadbalancers via listing all URLMap's.
func (l *L7s) List() ([]interface{}, error) {
urlMaps, err := l.cloud.ListUrlMaps()
if err != nil {
return nil, err
}
var ret []interface{}
for _, x := range urlMaps {
ret = append(ret, x)
}
return ret, nil
}
Loading

0 comments on commit 73ed434

Please sign in to comment.