Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a controller for handling L4 Internal LoadBalancer services #991

Merged
merged 3 commits into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/flags"
_ "k8s.io/ingress-gce/pkg/klog"
"k8s.io/ingress-gce/pkg/l4"
"k8s.io/ingress-gce/pkg/version"
)

Expand Down Expand Up @@ -217,6 +218,12 @@ func runControllers(ctx *ingctx.ControllerContext) {

fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

if flags.F.RunL4Controller {
l4Controller := l4.NewController(ctx, stopCh)
go l4Controller.Run()
klog.V(0).Infof("L4 controller started")
}

// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller)

Expand Down
23 changes: 23 additions & 0 deletions pkg/annotations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ func WantsL4ILB(service *v1.Service) (bool, string) {
return false, fmt.Sprintf("Type : %s, LBType : %s", service.Spec.Type, ltype)
}

// OnlyNEGStatusChanged returns true if the only annotation change between the 2 services is the NEG status annotation.
// This will be true if neg annotation was added or removed in the new service.
// Note : This assumes that the annotations in old and new service are different. If they are identical, this will
// return true.
func OnlyNEGStatusChanged(oldService, newService *v1.Service) bool {
return onlyNEGStatusChanged(oldService, newService) && onlyNEGStatusChanged(newService, oldService)
}

// onlyNEGStatusChanged returns true if the NEG Status annotation is the only extra annotation present in the new
// service but not in the old service.
// Note : This assumes that the annotations in old and new service are different. If they are identical, this will
// return true.
func onlyNEGStatusChanged(oldService, newService *v1.Service) bool {
for key, _ := range newService.ObjectMeta.Annotations {
if _, ok := oldService.ObjectMeta.Annotations[key]; !ok {
if key != NEGStatusKey {
return false
}
}
}
return true
}

// ApplicationProtocols returns a map of port (name or number) to the protocol
// on the port.
func (svc *Service) ApplicationProtocols() (map[string]AppProtocol, error) {
Expand Down
97 changes: 97 additions & 0 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -27,6 +29,10 @@ import (
"k8s.io/legacy-cloud-providers/gce"
)

const (
DefaultConnectionDrainingTimeoutSeconds = 30
)

// Backends handles CRUD operations for backends.
type Backends struct {
cloud *gce.Cloud
Expand Down Expand Up @@ -224,3 +230,94 @@ func (b *Backends) List(key *meta.Key, version meta.Version) ([]*composite.Backe
}
return clusterBackends, nil
}

// EnsureL4BackendService creates or updates the backend service with the given name.
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName, version meta.Version) (*composite.BackendService, error) {
klog.V(2).Infof("EnsureL4BackendService(%v, %v, %v): checking existing backend service", name, scheme, protocol)
key, err := composite.CreateKey(b.cloud, name, meta.Regional)
prameshj marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
bs, err := composite.GetBackendService(b.cloud, key, meta.VersionGA)
if err != nil && !utils.IsNotFoundError(err) {
return nil, err
}
desc, err := utils.MakeL4ILBServiceDescription(nm.String(), "", meta.VersionGA)
if err != nil {
klog.Warningf("EnsureL4BackendService: Failed to generate description for BackendService %s, err %v",
name, err)
}
expectedBS := &composite.BackendService{
Name: name,
Protocol: string(protocol),
Description: desc,
HealthChecks: []string{hcLink},
SessionAffinity: utils.TranslateAffinityType(sessionAffinity),
LoadBalancingScheme: string(scheme),
ConnectionDraining: &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds},
}

// Create backend service if none was found
if bs == nil {
klog.V(2).Infof("EnsureL4BackendService: creating backend service %v", name)
err := composite.CreateBackendService(b.cloud, key, expectedBS)
if err != nil {
return nil, err
}
klog.V(2).Infof("EnsureL4BackendService: created backend service %v successfully", name)
// We need to perform a GCE call to re-fetch the object we just created
// so that the "Fingerprint" field is filled in. This is needed to update the
// object without error. The lookup is also needed to populate the selfLink.
return composite.GetBackendService(b.cloud, key, meta.VersionGA)
}

if backendSvcEqual(expectedBS, bs) {
return bs, nil
}
if bs.ConnectionDraining != nil && bs.ConnectionDraining.DrainingTimeoutSec > 0 {
// if user overrides this value, continue using that.
expectedBS.ConnectionDraining.DrainingTimeoutSec = bs.ConnectionDraining.DrainingTimeoutSec
}
klog.V(2).Infof("EnsureL4BackendService: updating backend service %v", name)
// Set fingerprint for optimistic locking
expectedBS.Fingerprint = bs.Fingerprint
if err := composite.UpdateBackendService(b.cloud, key, expectedBS); err != nil {
return nil, err
}
klog.V(2).Infof("EnsureL4BackendService: updated backend service %v successfully", name)
return composite.GetBackendService(b.cloud, key, meta.VersionGA)
}

// backendsListEqual asserts that backend lists are equal by group link only
func backendsListEqual(a, b []*composite.Backend) bool {
if len(a) != len(b) {
return false
}
if len(a) == 0 {
return true
}

aSet := sets.NewString()
for _, v := range a {
aSet.Insert(v.Group)
}
bSet := sets.NewString()
for _, v := range b {
bSet.Insert(v.Group)
}

return aSet.Equal(bSet)
}

// backendSvcEqual returns true if the 2 BackendService objects are equal.
// ConnectionDraining timeout is not checked for equality, if user changes
// this timeout and no other backendService parameters change, the backend
// service will not be updated.
func backendSvcEqual(a, b *composite.BackendService) bool {
return a.Protocol == b.Protocol &&
a.Description == b.Description &&
a.SessionAffinity == b.SessionAffinity &&
a.LoadBalancingScheme == b.LoadBalancingScheme &&
utils.EqualStringSets(a.HealthChecks, b.HealthChecks) &&
backendsListEqual(a.Backends, b.Backends)
}
7 changes: 6 additions & 1 deletion pkg/backends/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
// FeatureL7ILB defines the feature name of L7 Internal Load Balancer
// L7-ILB Resources are currently alpha and regional
FeatureL7ILB = "L7ILB"
//FeaturePrimaryVMIPNEG defines the feature name of GCE_PRIMARY_VM_IP NEGs which are used for L4 ILB.
FeaturePrimaryVMIPNEG = "PrimaryVMIPNEG"
)

var (
Expand All @@ -46,7 +48,7 @@ var (
}
// TODO: (shance) refactor all scope to be above the serviceport level
scopeToFeatures = map[meta.KeyType][]string{
meta.Regional: []string{FeatureL7ILB},
meta.Regional: []string{FeatureL7ILB, FeaturePrimaryVMIPNEG},
}
)

Expand All @@ -68,6 +70,9 @@ func featuresFromServicePort(sp *utils.ServicePort) []string {
if sp.NEGEnabled {
features = append(features, FeatureNEG)
}
if sp.PrimaryIPNEGEnabled {
features = append(features, FeaturePrimaryVMIPNEG)
}
if sp.L7ILBEnabled {
features = append(features, FeatureL7ILB)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/backends/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package backends

import (
"fmt"
"strings"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -169,6 +170,12 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error {
// gc deletes the provided backends
func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets.String) error {
for _, be := range backends {
// Skip L4 LB backend services
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment:
backendSyncer currently only GC backend services for L7 XLB/ILB.
L4 LB is GC as part of the deletion follow as there is no shared backend services among L4 ILBs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// backendSyncer currently only GC backend services for L7 XLB/ILB.
// L4 LB is GC as part of the deletion flow as there is no shared backend services among L4 ILBs.
if strings.Contains(be.Description, utils.L4ILBServiceDescKey) {
continue
}
var key *meta.Key
name := be.Name
scope, err := composite.ScopeFromSelfLink(be.SelfLink)
Expand All @@ -181,7 +188,6 @@ func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets
if knownPorts.Has(key.String()) {
continue
}

klog.V(2).Infof("GCing backendService for port %s", name)
err = s.backendPool.Delete(name, be.Version, scope)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func (p *portset) check(fakeGCE *gce.Cloud) error {
return fmt.Errorf("backend for port %+v should exist, but got: %v", sp.NodePort, err)
}
} else {
if bs, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp)); !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
bs, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp))
if err == nil || !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
if sp.PrimaryIPNEGEnabled {
// It is expected that these Backends should not get cleaned up in the GC loop.
continue
}
return fmt.Errorf("backend for port %+v should not exist, but got %v", sp, bs)
}
}
Expand Down Expand Up @@ -333,7 +338,7 @@ func TestGC(t *testing.T) {
}
}

// Test GC with both ELB and ILBs
// Test GC with both ELB and ILBs. Add in an L4 ILB NEG which should not be deleted as part of GC.
func TestGCMixed(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE)
Expand All @@ -345,6 +350,7 @@ func TestGCMixed(t *testing.T) {
{NodePort: 84, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true, BackendNamer: defaultNamer},
{NodePort: 85, Protocol: annotations.ProtocolHTTPS, NEGEnabled: true, L7ILBEnabled: true, BackendNamer: defaultNamer},
{NodePort: 86, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true, BackendNamer: defaultNamer},
{ID: utils.ServicePortID{Service: types.NamespacedName{Name: "testsvc"}}, PrimaryIPNEGEnabled: true, BackendNamer: defaultNamer},
}
ps := newPortset(svcNodePorts)
if err := ps.add(svcNodePorts); err != nil {
Expand Down
23 changes: 18 additions & 5 deletions pkg/controller/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ import (
"k8s.io/ingress-gce/pkg/backendconfig"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/errors"
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/utils"
namer_util "k8s.io/ingress-gce/pkg/utils/namer"
)

const (
// DefaultHost is the host used if none is specified. It is a valid value
// for the "Host" field recognized by GCE.
DefaultHost = "*"

// DefaultPath is the path used if none is specified. It is a valid path
// recognized by GCE.
DefaultPath = "/*"
)

// getServicePortParams allows for passing parameters to getServicePort()
type getServicePortParams struct {
isL7ILB bool
Expand Down Expand Up @@ -206,14 +215,14 @@ func (t *Translator) TranslateIngress(ing *v1beta1.Ingress, systemDefaultBackend
// sent to one of the last backend in the rules list.
path := p.Path
if path == "" {
path = loadbalancers.DefaultPath
path = DefaultPath
}
pathRules = append(pathRules, utils.PathRule{Path: path, Backend: *svcPort})
}
}
host := rule.Host
if host == "" {
host = loadbalancers.DefaultHost
host = DefaultHost
}
urlMap.PutPathRulesForHost(host, pathRules)
}
Expand Down Expand Up @@ -266,9 +275,13 @@ func (t *Translator) GetZoneForNode(name string) (string, error) {

// ListZones returns a list of zones this Kubernetes cluster spans.
func (t *Translator) ListZones() ([]string, error) {
zones := sets.String{}
nodeLister := t.ctx.NodeInformer.GetIndexer()
readyNodes, err := listers.NewNodeLister(nodeLister).ListWithPredicate(utils.GetNodeConditionPredicate())
return t.listZonesWithLister(listers.NewNodeLister(nodeLister))
}

func (t *Translator) listZonesWithLister(lister listers.NodeLister) ([]string, error) {
zones := sets.String{}
readyNodes, err := lister.ListWithPredicate(utils.GetNodeConditionPredicate())
if err != nil {
return zones.List(), err
}
Expand Down
Loading