Skip to content

Commit

Permalink
Multi Network support for the ILB L4 controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmamczur committed Mar 16, 2023
1 parent 9cae185 commit d094c18
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 16 deletions.
10 changes: 8 additions & 2 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"net/http"
"time"

"k8s.io/ingress-gce/pkg/multinetwork"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
api_v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -284,7 +286,7 @@ func (b *Backends) DeleteSignedUrlKey(be *composite.BackendService, keyName stri
}

// EnsureL4BackendService creates or updates the backend service with the given name.
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName) (*composite.BackendService, error) {
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName, network *multinetwork.NetworkInfo) (*composite.BackendService, error) {
start := time.Now()
klog.V(2).Infof("EnsureL4BackendService(%v, %v, %v): started", name, scheme, protocol)
defer func() {
Expand Down Expand Up @@ -313,6 +315,9 @@ func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinit
SessionAffinity: utils.TranslateAffinityType(sessionAffinity),
LoadBalancingScheme: scheme,
}
if network != nil {
expectedBS.Network = network.NetworkURL
}
if protocol == string(api_v1.ProtocolTCP) {
expectedBS.ConnectionDraining = &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds}
} else {
Expand Down Expand Up @@ -362,5 +367,6 @@ func backendSvcEqual(a, b *composite.BackendService) bool {
a.Description == b.Description &&
a.SessionAffinity == b.SessionAffinity &&
a.LoadBalancingScheme == b.LoadBalancingScheme &&
utils.EqualStringSets(a.HealthChecks, b.HealthChecks)
utils.EqualStringSets(a.HealthChecks, b.HealthChecks) &&
a.Network == b.Network
}
61 changes: 61 additions & 0 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package backends

import (
"strings"
"testing"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/ingress-gce/pkg/multinetwork"
"k8s.io/ingress-gce/pkg/utils"
namer "k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/legacy-cloud-providers/gce"
)

const (
kubeSystemUID = "ksuid123"
)

func TestEnsureL4BackendService(t *testing.T) {
serviceName := types.NamespacedName{Name: "test-service", Namespace: "test-ns"}
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
l4namer := namer.NewL4Namer(kubeSystemUID, nil)
backendPool := NewPool(fakeGCE, l4namer)

hcLink := l4namer.L4HealthCheck(serviceName.Namespace, serviceName.Name, false)
bsName := l4namer.L4Backend(serviceName.Namespace, serviceName.Name)
network := &multinetwork.NetworkInfo{NetworkURL: "https://www.googleapis.com/compute/v1/projects/test-poject/global/networks/test-vpc"}
bs, err := backendPool.EnsureL4BackendService(bsName, hcLink, "TCP", string(v1.ServiceAffinityNone), string(cloud.SchemeInternal), serviceName, network)
if err != nil {
t.Errorf("EnsureL4BackendService failed")
}

if bs.SessionAffinity != strings.ToUpper(string(v1.ServiceAffinityNone)) {
t.Errorf("BackendService.SessionAffinity was not populated correctly want=%q, got=%q", strings.ToUpper(string(v1.ServiceAffinityNone)), bs.SessionAffinity)
}
if bs.Network != network.NetworkURL {
t.Errorf("BackendService.Network was not populated correctly, want=%q, got=%q", network.NetworkURL, bs.Network)
}
if len(bs.HealthChecks) != 1 || bs.HealthChecks[0] != hcLink {
t.Errorf("BackendService.HealthChecks was not populated correctly, want=%q, got=%q", hcLink, bs.HealthChecks)
}
description, err := utils.MakeL4LBServiceDescription(serviceName.String(), "", meta.VersionGA, false, utils.ILB)
if err != nil {
t.Errorf("utils.MakeL4LBServiceDescription() failed %v", err)
}
if bs.Description != description {
t.Errorf("BackendService.Description was not populated correctly, want=%q, got=%q", description, bs.Description)
}
if bs.Protocol != "TCP" {
t.Errorf("BackendService.Protocol was not populated correctly, want=%q, got=%q", "TCP", bs.Protocol)
}
if bs.LoadBalancingScheme != string(cloud.SchemeInternal) {
t.Errorf("BackendService.LoadBalancingScheme was not populated correctly, want=%q, got=%q", string(cloud.SchemeInternal), bs.LoadBalancingScheme)
}
if bs.ConnectionDraining == nil || bs.ConnectionDraining.DrainingTimeoutSec != DefaultConnectionDrainingTimeoutSeconds {
t.Errorf("BackendService.ConnectionDraining was not populated correctly, want=connection draining with %q, got=%q", DefaultConnectionDrainingTimeoutSeconds, bs.ConnectionDraining)
}

}
2 changes: 1 addition & 1 deletion pkg/backends/regional_ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func createBackendService(t *testing.T, sp utils.ServicePort, backendPool *Backe
t.Helper()
namespacedName := types.NamespacedName{Name: "service.Name", Namespace: "service.Namespace"}
protocol := string(apiv1.ProtocolTCP)
if _, err := backendPool.EnsureL4BackendService(sp.BackendName(), hcLink, protocol, string(apiv1.ServiceAffinityNone), string(cloud.SchemeExternal), namespacedName); err != nil {
if _, err := backendPool.EnsureL4BackendService(sp.BackendName(), hcLink, protocol, string(apiv1.ServiceAffinityNone), string(cloud.SchemeExternal), namespacedName, nil); err != nil {
t.Fatalf("Error creating backend service %v", err)
}
}
27 changes: 21 additions & 6 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
l4metrics "k8s.io/ingress-gce/pkg/l4lb/metrics"
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/metrics"
"k8s.io/ingress-gce/pkg/multinetwork"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
Expand All @@ -56,12 +57,14 @@ const (
type L4Controller struct {
ctx *context.ControllerContext
// kubeClient, needed for attaching finalizer
client kubernetes.Interface
svcQueue utils.TaskQueue
numWorkers int
serviceLister cache.Indexer
nodeLister listers.NodeLister
stopCh chan struct{}
client kubernetes.Interface
svcQueue utils.TaskQueue
numWorkers int
serviceLister cache.Indexer
nodeLister listers.NodeLister
networkLister cache.Indexer
gkeNetworkParamSetLister cache.Indexer
stopCh chan struct{}
// needed for listing the zones in the cluster.
translator *translator.Translator
// needed for linking the NEG with the backend service for each ILB service.
Expand Down Expand Up @@ -100,6 +103,13 @@ func NewILBController(ctx *context.ControllerContext, stopCh chan struct{}) *L4C

l4c.svcQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers("l4", "services", l4c.numWorkers, l4c.sync)

if ctx.NetworkInformer != nil {
l4c.networkLister = ctx.NetworkInformer.GetIndexer()
}
if ctx.GKENetworkParamsInformer != nil {
l4c.gkeNetworkParamSetLister = ctx.GKENetworkParamsInformer.GetIndexer()
}

ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addSvc := obj.(*v1.Service)
Expand Down Expand Up @@ -227,6 +237,10 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service) *load
if err != nil {
return &loadbalancers.L4ILBSyncResult{Error: err}
}
network, err := multinetwork.ServiceNetwork(service, l4c.networkLister, l4c.gkeNetworkParamSetLister, l4c.ctx.Cloud)
if err != nil {
return &loadbalancers.L4ILBSyncResult{Error: err}
}
// Use the same function for both create and updates. If controller crashes and restarts,
// all existing services will show up as Service Adds.
l4ilbParams := &loadbalancers.L4ILBParams{
Expand All @@ -235,6 +249,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service) *load
Namer: l4c.namer,
Recorder: l4c.ctx.Recorder(service.Namespace),
DualStackEnabled: l4c.enableDualStack,
Network: network,
}
l4 := loadbalancers.NewL4Handler(l4ilbParams)
syncResult := l4.EnsureInternalLoadBalancer(nodeNames, service)
Expand Down
6 changes: 5 additions & 1 deletion pkg/loadbalancers/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,18 @@ func (l4 *L4) ensureIPv4ForwardingRule(bsLink string, options gce.ILBOptions, ex
err)
}

networkURL := l4.cloud.NetworkURL()
if l4.network != nil {
networkURL = l4.network.NetworkURL
}
fr := &composite.ForwardingRule{
Name: frName,
IPAddress: ipToUse,
Ports: ports,
IPProtocol: string(protocol),
LoadBalancingScheme: string(cloud.SchemeInternal),
Subnetwork: subnetworkURL,
Network: l4.cloud.NetworkURL(),
Network: networkURL,
NetworkTier: cloud.NetworkTierDefault.ToGCEValue(),
Version: version,
BackendService: bsLink,
Expand Down
10 changes: 9 additions & 1 deletion pkg/loadbalancers/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"k8s.io/ingress-gce/pkg/multinetwork"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -55,6 +57,7 @@ type L4 struct {
forwardingRules ForwardingRulesProvider
healthChecks healthchecksl4.L4HealthChecks
enableDualStack bool
network *multinetwork.NetworkInfo
}

// L4ILBSyncResult contains information about the outcome of an L4 ILB sync. It stores the list of resource name annotations,
Expand All @@ -76,6 +79,7 @@ type L4ILBParams struct {
Namer namer.L4ResourcesNamer
Recorder record.EventRecorder
DualStackEnabled bool
Network *multinetwork.NetworkInfo
}

// NewL4Handler creates a new L4Handler for the given L4 service.
Expand All @@ -90,6 +94,7 @@ func NewL4Handler(params *L4ILBParams) *L4 {
healthChecks: healthchecksl4.NewL4HealthChecks(params.Cloud, params.Recorder),
forwardingRules: forwardingrules.New(params.Cloud, meta.VersionGA, scope),
enableDualStack: params.DualStackEnabled,
network: params.Network,
}
l4.NamespacedName = types.NamespacedName{Name: params.Service.Name, Namespace: params.Service.Namespace}
l4.backendPool = backends.NewPool(l4.cloud, l4.namer)
Expand Down Expand Up @@ -360,7 +365,7 @@ func (l4 *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service
}

// ensure backend service
bs, err := l4.backendPool.EnsureL4BackendService(bsName, hcLink, string(protocol), string(l4.Service.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName)
bs, err := l4.backendPool.EnsureL4BackendService(bsName, hcLink, string(protocol), string(l4.Service.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName, l4.network)
if err != nil {
result.GCEResourceInError = annotations.BackendServiceResource
result.Error = err
Expand Down Expand Up @@ -518,6 +523,9 @@ func (l4 *L4) getServiceSubnetworkURL(options gce.ILBOptions) (string, error) {
if options.SubnetName != "" {
return l4.getSubnetworkURLByName(options.SubnetName)
}
if l4.network != nil {
return l4.network.SubnetworkURL, nil
}
return l4.cloud.SubnetworkURL(), nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/loadbalancers/l4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ func TestEnsureInternalBackendServiceUpdates(t *testing.T) {
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

bsName := l4.namer.L4Backend(l4.Service.Namespace, l4.Service.Name)
_, err := l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(svc.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName)
_, err := l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(svc.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName, nil)
if err != nil {
t.Errorf("Failed to ensure backend service %s - err %v", bsName, err)
}

// Update the Internal Backend Service with a new ServiceAffinity
_, err = l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(v1.ServiceAffinityNone), string(cloud.SchemeInternal), l4.NamespacedName)
_, err = l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(v1.ServiceAffinityNone), string(cloud.SchemeInternal), l4.NamespacedName, nil)
if err != nil {
t.Errorf("Failed to ensure backend service %s - err %v", bsName, err)
}
Expand All @@ -105,7 +105,7 @@ func TestEnsureInternalBackendServiceUpdates(t *testing.T) {
if err != nil {
t.Errorf("Failed to update backend service with new connection draining timeout - err %v", err)
}
bs, err = l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(v1.ServiceAffinityNone), string(cloud.SchemeInternal), l4.NamespacedName)
bs, err = l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(v1.ServiceAffinityNone), string(cloud.SchemeInternal), l4.NamespacedName, nil)
if err != nil {
t.Errorf("Failed to ensure backend service %s - err %v", bsName, err)
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) {
if hcResult.Err != nil {
t.Errorf("Failed to create healthcheck, err %v", hcResult.Err)
}
_, err := l4.backendPool.EnsureL4BackendService(lbName, hcResult.HCLink, "TCP", string(l4.Service.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName)
_, err := l4.backendPool.EnsureL4BackendService(lbName, hcResult.HCLink, "TCP", string(l4.Service.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName, nil)
if err != nil {
t.Errorf("Failed to create backendservice, err %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/loadbalancers/l4netlb.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (l4netlb *L4NetLB) provideBackendService(syncResult *L4NetLBSyncResult, hcL
bsName := l4netlb.namer.L4Backend(l4netlb.Service.Namespace, l4netlb.Service.Name)
servicePorts := l4netlb.Service.Spec.Ports
protocol := utils.GetProtocol(servicePorts)
bs, err := l4netlb.backendPool.EnsureL4BackendService(bsName, hcLink, string(protocol), string(l4netlb.Service.Spec.SessionAffinity), string(cloud.SchemeExternal), l4netlb.NamespacedName)
bs, err := l4netlb.backendPool.EnsureL4BackendService(bsName, hcLink, string(protocol), string(l4netlb.Service.Spec.SessionAffinity), string(cloud.SchemeExternal), l4netlb.NamespacedName, nil)
if err != nil {
syncResult.GCEResourceInError = annotations.BackendServiceResource
syncResult.Error = fmt.Errorf("Failed to ensure backend service %s - %w", bsName, err)
Expand Down

0 comments on commit d094c18

Please sign in to comment.