Skip to content

Commit

Permalink
Merge pull request #991 from prameshj/ilb-fr
Browse files Browse the repository at this point in the history
Add a controller for handling L4 Internal LoadBalancer services
  • Loading branch information
k8s-ci-robot authored Feb 20, 2020
2 parents 05d9c8d + e2c4c2d commit 444fd52
Show file tree
Hide file tree
Showing 43 changed files with 15,058 additions and 41 deletions.
7 changes: 7 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/flags"
_ "k8s.io/ingress-gce/pkg/klog"
"k8s.io/ingress-gce/pkg/l4"
"k8s.io/ingress-gce/pkg/version"
)

Expand Down Expand Up @@ -217,6 +218,12 @@ func runControllers(ctx *ingctx.ControllerContext) {

fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

if flags.F.RunL4Controller {
l4Controller := l4.NewController(ctx, stopCh)
go l4Controller.Run()
klog.V(0).Infof("L4 controller started")
}

// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller)

Expand Down
23 changes: 23 additions & 0 deletions pkg/annotations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ func WantsL4ILB(service *v1.Service) (bool, string) {
return false, fmt.Sprintf("Type : %s, LBType : %s", service.Spec.Type, ltype)
}

// OnlyNEGStatusChanged returns true if the only annotation change between the 2 services is the NEG status annotation.
// This will be true if neg annotation was added or removed in the new service.
// Note : This assumes that the annotations in old and new service are different. If they are identical, this will
// return true.
func OnlyNEGStatusChanged(oldService, newService *v1.Service) bool {
return onlyNEGStatusChanged(oldService, newService) && onlyNEGStatusChanged(newService, oldService)
}

// onlyNEGStatusChanged returns true if the NEG Status annotation is the only extra annotation present in the new
// service but not in the old service.
// Note : This assumes that the annotations in old and new service are different. If they are identical, this will
// return true.
func onlyNEGStatusChanged(oldService, newService *v1.Service) bool {
for key, _ := range newService.ObjectMeta.Annotations {
if _, ok := oldService.ObjectMeta.Annotations[key]; !ok {
if key != NEGStatusKey {
return false
}
}
}
return true
}

// ApplicationProtocols returns a map of port (name or number) to the protocol
// on the port.
func (svc *Service) ApplicationProtocols() (map[string]AppProtocol, error) {
Expand Down
97 changes: 97 additions & 0 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -27,6 +29,10 @@ import (
"k8s.io/legacy-cloud-providers/gce"
)

const (
DefaultConnectionDrainingTimeoutSeconds = 30
)

// Backends handles CRUD operations for backends.
type Backends struct {
cloud *gce.Cloud
Expand Down Expand Up @@ -224,3 +230,94 @@ func (b *Backends) List(key *meta.Key, version meta.Version) ([]*composite.Backe
}
return clusterBackends, nil
}

// EnsureL4BackendService creates or updates the backend service with the given name.
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName, version meta.Version) (*composite.BackendService, error) {
klog.V(2).Infof("EnsureL4BackendService(%v, %v, %v): checking existing backend service", name, scheme, protocol)
key, err := composite.CreateKey(b.cloud, name, meta.Regional)
if err != nil {
return nil, err
}
bs, err := composite.GetBackendService(b.cloud, key, meta.VersionGA)
if err != nil && !utils.IsNotFoundError(err) {
return nil, err
}
desc, err := utils.MakeL4ILBServiceDescription(nm.String(), "", meta.VersionGA)
if err != nil {
klog.Warningf("EnsureL4BackendService: Failed to generate description for BackendService %s, err %v",
name, err)
}
expectedBS := &composite.BackendService{
Name: name,
Protocol: string(protocol),
Description: desc,
HealthChecks: []string{hcLink},
SessionAffinity: utils.TranslateAffinityType(sessionAffinity),
LoadBalancingScheme: string(scheme),
ConnectionDraining: &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds},
}

// Create backend service if none was found
if bs == nil {
klog.V(2).Infof("EnsureL4BackendService: creating backend service %v", name)
err := composite.CreateBackendService(b.cloud, key, expectedBS)
if err != nil {
return nil, err
}
klog.V(2).Infof("EnsureL4BackendService: created backend service %v successfully", name)
// We need to perform a GCE call to re-fetch the object we just created
// so that the "Fingerprint" field is filled in. This is needed to update the
// object without error. The lookup is also needed to populate the selfLink.
return composite.GetBackendService(b.cloud, key, meta.VersionGA)
}

if backendSvcEqual(expectedBS, bs) {
return bs, nil
}
if bs.ConnectionDraining != nil && bs.ConnectionDraining.DrainingTimeoutSec > 0 {
// if user overrides this value, continue using that.
expectedBS.ConnectionDraining.DrainingTimeoutSec = bs.ConnectionDraining.DrainingTimeoutSec
}
klog.V(2).Infof("EnsureL4BackendService: updating backend service %v", name)
// Set fingerprint for optimistic locking
expectedBS.Fingerprint = bs.Fingerprint
if err := composite.UpdateBackendService(b.cloud, key, expectedBS); err != nil {
return nil, err
}
klog.V(2).Infof("EnsureL4BackendService: updated backend service %v successfully", name)
return composite.GetBackendService(b.cloud, key, meta.VersionGA)
}

// backendsListEqual asserts that backend lists are equal by group link only
func backendsListEqual(a, b []*composite.Backend) bool {
if len(a) != len(b) {
return false
}
if len(a) == 0 {
return true
}

aSet := sets.NewString()
for _, v := range a {
aSet.Insert(v.Group)
}
bSet := sets.NewString()
for _, v := range b {
bSet.Insert(v.Group)
}

return aSet.Equal(bSet)
}

// backendSvcEqual returns true if the 2 BackendService objects are equal.
// ConnectionDraining timeout is not checked for equality, if user changes
// this timeout and no other backendService parameters change, the backend
// service will not be updated.
func backendSvcEqual(a, b *composite.BackendService) bool {
return a.Protocol == b.Protocol &&
a.Description == b.Description &&
a.SessionAffinity == b.SessionAffinity &&
a.LoadBalancingScheme == b.LoadBalancingScheme &&
utils.EqualStringSets(a.HealthChecks, b.HealthChecks) &&
backendsListEqual(a.Backends, b.Backends)
}
7 changes: 6 additions & 1 deletion pkg/backends/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
// FeatureL7ILB defines the feature name of L7 Internal Load Balancer
// L7-ILB Resources are currently alpha and regional
FeatureL7ILB = "L7ILB"
//FeaturePrimaryVMIPNEG defines the feature name of GCE_PRIMARY_VM_IP NEGs which are used for L4 ILB.
FeaturePrimaryVMIPNEG = "PrimaryVMIPNEG"
)

var (
Expand All @@ -46,7 +48,7 @@ var (
}
// TODO: (shance) refactor all scope to be above the serviceport level
scopeToFeatures = map[meta.KeyType][]string{
meta.Regional: []string{FeatureL7ILB},
meta.Regional: []string{FeatureL7ILB, FeaturePrimaryVMIPNEG},
}
)

Expand All @@ -68,6 +70,9 @@ func featuresFromServicePort(sp *utils.ServicePort) []string {
if sp.NEGEnabled {
features = append(features, FeatureNEG)
}
if sp.PrimaryIPNEGEnabled {
features = append(features, FeaturePrimaryVMIPNEG)
}
if sp.L7ILBEnabled {
features = append(features, FeatureL7ILB)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/backends/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package backends

import (
"fmt"
"strings"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -169,6 +170,12 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error {
// gc deletes the provided backends
func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets.String) error {
for _, be := range backends {
// Skip L4 LB backend services
// backendSyncer currently only GC backend services for L7 XLB/ILB.
// L4 LB is GC as part of the deletion flow as there is no shared backend services among L4 ILBs.
if strings.Contains(be.Description, utils.L4ILBServiceDescKey) {
continue
}
var key *meta.Key
name := be.Name
scope, err := composite.ScopeFromSelfLink(be.SelfLink)
Expand All @@ -181,7 +188,6 @@ func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets
if knownPorts.Has(key.String()) {
continue
}

klog.V(2).Infof("GCing backendService for port %s", name)
err = s.backendPool.Delete(name, be.Version, scope)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func (p *portset) check(fakeGCE *gce.Cloud) error {
return fmt.Errorf("backend for port %+v should exist, but got: %v", sp.NodePort, err)
}
} else {
if bs, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp)); !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
bs, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp))
if err == nil || !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
if sp.PrimaryIPNEGEnabled {
// It is expected that these Backends should not get cleaned up in the GC loop.
continue
}
return fmt.Errorf("backend for port %+v should not exist, but got %v", sp, bs)
}
}
Expand Down Expand Up @@ -333,7 +338,7 @@ func TestGC(t *testing.T) {
}
}

// Test GC with both ELB and ILBs
// Test GC with both ELB and ILBs. Add in an L4 ILB NEG which should not be deleted as part of GC.
func TestGCMixed(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE)
Expand All @@ -345,6 +350,7 @@ func TestGCMixed(t *testing.T) {
{NodePort: 84, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true, BackendNamer: defaultNamer},
{NodePort: 85, Protocol: annotations.ProtocolHTTPS, NEGEnabled: true, L7ILBEnabled: true, BackendNamer: defaultNamer},
{NodePort: 86, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true, BackendNamer: defaultNamer},
{ID: utils.ServicePortID{Service: types.NamespacedName{Name: "testsvc"}}, PrimaryIPNEGEnabled: true, BackendNamer: defaultNamer},
}
ps := newPortset(svcNodePorts)
if err := ps.add(svcNodePorts); err != nil {
Expand Down
23 changes: 18 additions & 5 deletions pkg/controller/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ import (
"k8s.io/ingress-gce/pkg/backendconfig"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/errors"
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/utils"
namer_util "k8s.io/ingress-gce/pkg/utils/namer"
)

const (
// DefaultHost is the host used if none is specified. It is a valid value
// for the "Host" field recognized by GCE.
DefaultHost = "*"

// DefaultPath is the path used if none is specified. It is a valid path
// recognized by GCE.
DefaultPath = "/*"
)

// getServicePortParams allows for passing parameters to getServicePort()
type getServicePortParams struct {
isL7ILB bool
Expand Down Expand Up @@ -206,14 +215,14 @@ func (t *Translator) TranslateIngress(ing *v1beta1.Ingress, systemDefaultBackend
// sent to one of the last backend in the rules list.
path := p.Path
if path == "" {
path = loadbalancers.DefaultPath
path = DefaultPath
}
pathRules = append(pathRules, utils.PathRule{Path: path, Backend: *svcPort})
}
}
host := rule.Host
if host == "" {
host = loadbalancers.DefaultHost
host = DefaultHost
}
urlMap.PutPathRulesForHost(host, pathRules)
}
Expand Down Expand Up @@ -266,9 +275,13 @@ func (t *Translator) GetZoneForNode(name string) (string, error) {

// ListZones returns a list of zones this Kubernetes cluster spans.
func (t *Translator) ListZones() ([]string, error) {
zones := sets.String{}
nodeLister := t.ctx.NodeInformer.GetIndexer()
readyNodes, err := listers.NewNodeLister(nodeLister).ListWithPredicate(utils.GetNodeConditionPredicate())
return t.listZonesWithLister(listers.NewNodeLister(nodeLister))
}

func (t *Translator) listZonesWithLister(lister listers.NodeLister) ([]string, error) {
zones := sets.String{}
readyNodes, err := lister.ListWithPredicate(utils.GetNodeConditionPredicate())
if err != nil {
return zones.List(), err
}
Expand Down
Loading

0 comments on commit 444fd52

Please sign in to comment.