Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi networking support in the L4 ILB controller #2013

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -284,7 +285,7 @@ func (b *Backends) DeleteSignedUrlKey(be *composite.BackendService, keyName stri
}

// EnsureL4BackendService creates or updates the backend service with the given name.
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName) (*composite.BackendService, error) {
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName, network network.NetworkInfo) (*composite.BackendService, error) {
start := time.Now()
klog.V(2).Infof("EnsureL4BackendService(%v, %v, %v): started", name, scheme, protocol)
defer func() {
Expand Down Expand Up @@ -313,6 +314,9 @@ func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinit
SessionAffinity: utils.TranslateAffinityType(sessionAffinity),
LoadBalancingScheme: scheme,
}
if !network.IsDefault {
expectedBS.Network = network.NetworkURL
}
if protocol == string(api_v1.ProtocolTCP) {
expectedBS.ConnectionDraining = &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds}
} else {
Expand Down Expand Up @@ -362,5 +366,6 @@ func backendSvcEqual(a, b *composite.BackendService) bool {
a.Description == b.Description &&
a.SessionAffinity == b.SessionAffinity &&
a.LoadBalancingScheme == b.LoadBalancingScheme &&
utils.EqualStringSets(a.HealthChecks, b.HealthChecks)
utils.EqualStringSets(a.HealthChecks, b.HealthChecks) &&
a.Network == b.Network
}
61 changes: 61 additions & 0 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package backends

import (
"strings"
"testing"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
namer "k8s.io/ingress-gce/pkg/utils/namer"
)

const (
kubeSystemUID = "ksuid123"
)

func TestEnsureL4BackendService(t *testing.T) {
serviceName := types.NamespacedName{Name: "test-service", Namespace: "test-ns"}
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
l4namer := namer.NewL4Namer(kubeSystemUID, nil)
backendPool := NewPool(fakeGCE, l4namer)

hcLink := l4namer.L4HealthCheck(serviceName.Namespace, serviceName.Name, false)
bsName := l4namer.L4Backend(serviceName.Namespace, serviceName.Name)
network := network.NetworkInfo{IsDefault: false, NetworkURL: "https://www.googleapis.com/compute/v1/projects/test-poject/global/networks/test-vpc"}
bs, err := backendPool.EnsureL4BackendService(bsName, hcLink, "TCP", string(v1.ServiceAffinityNone), string(cloud.SchemeInternal), serviceName, network)
if err != nil {
t.Errorf("EnsureL4BackendService failed")
}

if bs.SessionAffinity != strings.ToUpper(string(v1.ServiceAffinityNone)) {
t.Errorf("BackendService.SessionAffinity was not populated correctly want=%q, got=%q", strings.ToUpper(string(v1.ServiceAffinityNone)), bs.SessionAffinity)
}
if bs.Network != network.NetworkURL {
t.Errorf("BackendService.Network was not populated correctly, want=%q, got=%q", network.NetworkURL, bs.Network)
}
if len(bs.HealthChecks) != 1 || bs.HealthChecks[0] != hcLink {
t.Errorf("BackendService.HealthChecks was not populated correctly, want=%q, got=%q", hcLink, bs.HealthChecks)
}
description, err := utils.MakeL4LBServiceDescription(serviceName.String(), "", meta.VersionGA, false, utils.ILB)
if err != nil {
t.Errorf("utils.MakeL4LBServiceDescription() failed %v", err)
}
if bs.Description != description {
t.Errorf("BackendService.Description was not populated correctly, want=%q, got=%q", description, bs.Description)
}
if bs.Protocol != "TCP" {
t.Errorf("BackendService.Protocol was not populated correctly, want=%q, got=%q", "TCP", bs.Protocol)
}
if bs.LoadBalancingScheme != string(cloud.SchemeInternal) {
t.Errorf("BackendService.LoadBalancingScheme was not populated correctly, want=%q, got=%q", string(cloud.SchemeInternal), bs.LoadBalancingScheme)
}
if bs.ConnectionDraining == nil || bs.ConnectionDraining.DrainingTimeoutSec != DefaultConnectionDrainingTimeoutSeconds {
t.Errorf("BackendService.ConnectionDraining was not populated correctly, want=connection draining with %q, got=%q", DefaultConnectionDrainingTimeoutSeconds, bs.ConnectionDraining)
}

}
6 changes: 5 additions & 1 deletion pkg/backends/regional_ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/instancegroups"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
Expand Down Expand Up @@ -229,7 +230,10 @@ func createBackendService(t *testing.T, sp utils.ServicePort, backendPool *Backe
t.Helper()
namespacedName := types.NamespacedName{Name: "service.Name", Namespace: "service.Namespace"}
protocol := string(apiv1.ProtocolTCP)
if _, err := backendPool.EnsureL4BackendService(sp.BackendName(), hcLink, protocol, string(apiv1.ServiceAffinityNone), string(cloud.SchemeExternal), namespacedName); err != nil {
serviceAffinityNone := string(apiv1.ServiceAffinityNone)
schemeExternal := string(cloud.SchemeExternal)
defaultNetworkInfo := network.NetworkInfo{IsDefault: true}
if _, err := backendPool.EnsureL4BackendService(sp.BackendName(), hcLink, protocol, serviceAffinityNone, schemeExternal, namespacedName, defaultNetworkInfo); err != nil {
t.Fatalf("Error creating backend service %v", err)
}
}
4 changes: 3 additions & 1 deletion pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog/v2"
)
Expand All @@ -39,6 +40,7 @@ type FirewallParams struct {
NodeNames []string
Protocol string
L4Type utils.L4LBType
Network network.NetworkInfo
}

func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParams, sharedRule bool) error {
Expand All @@ -59,7 +61,7 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam
expectedFw := &compute.Firewall{
Name: params.Name,
Description: fwDesc,
Network: cloud.NetworkURL(),
Network: params.Network.NetworkURL,
SourceRanges: params.SourceRanges,
TargetTags: nodeTags,
Allowed: []*compute.FirewallAllowed{
Expand Down
131 changes: 131 additions & 0 deletions pkg/firewalls/firewalls_l4_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package firewalls

import (
"context"
"testing"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
compute "google.golang.org/api/compute/v1"
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
)

func TestEnsureL4FirewallRule(t *testing.T) {
firewallDescription, err := utils.MakeL4LBFirewallDescription(utils.ServiceKeyFunc("test-ns", "test-name"), "10.0.0.1", meta.VersionGA, false)
if err != nil {
t.Errorf("Failed making the description, err=%v", err)
}
tests := []struct {
desc string
nsName string
params *FirewallParams
shared bool
want *compute.Firewall
}{
{
desc: "default setup",
nsName: utils.ServiceKeyFunc("test-ns", "test-name"),
params: &FirewallParams{
Name: "test-firewall",
IP: "10.0.0.1",
SourceRanges: []string{
"10.1.2.8/29",
},
DestinationRanges: []string{
"10.1.2.16/29",
},
PortRanges: []string{"8080"},
NodeNames: []string{"k8s-test-node"},
Protocol: "TCP",
L4Type: utils.ILB,
Network: network.NetworkInfo{IsDefault: true},
},
shared: false,
want: &compute.Firewall{
Name: "test-firewall",
Network: "",
SourceRanges: []string{
"10.1.2.8/29",
},
TargetTags: []string{"k8s-test"},
Description: firewallDescription,
Allowed: []*compute.FirewallAllowed{
{
IPProtocol: "tcp",
Ports: []string{"8080"},
},
},
},
},
{
desc: "non default network",
nsName: utils.ServiceKeyFunc("test-ns", "test-name"),
params: &FirewallParams{
Name: "test-firewall",
IP: "10.0.0.1",
SourceRanges: []string{
"10.1.2.8/29",
},
PortRanges: []string{"8080"},
NodeNames: []string{"k8s-test-node"},
Protocol: "TCP",
L4Type: utils.ILB,
Network: network.NetworkInfo{
IsDefault: false,
NetworkURL: "https://www.googleapis.com/compute/v1/projects/test-poject/global/networks/test-vpc",
},
},
shared: false,
want: &compute.Firewall{
Name: "test-firewall",
Network: "https://www.googleapis.com/compute/v1/projects/test-poject/global/networks/test-vpc",
SourceRanges: []string{
"10.1.2.8/29",
},
TargetTags: []string{"k8s-test"},
Description: firewallDescription,
Allowed: []*compute.FirewallAllowed{
{
IPProtocol: "tcp",
Ports: []string{"8080"},
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
// Add some instance to act as the node so that target tags in the firewall can be resolved.
mmamczur marked this conversation as resolved.
Show resolved Hide resolved
createVMInstanceWithTag(t, fakeGCE, "k8s-test")
if err := EnsureL4FirewallRule(fakeGCE, tc.nsName, tc.params, tc.shared); err != nil {
t.Errorf("EnsureL4FirewallRule() failed, err=%v", err)
}
firewall, err := fakeGCE.GetFirewall(tc.params.Name)
if err != nil {
t.Errorf("failed to get firewall err=%v", err)
}
if diff := cmp.Diff(tc.want, firewall, cmpopts.IgnoreFields(compute.Firewall{}, "SelfLink")); diff != "" {
t.Errorf("EnsureL4FirewallRule() diff -want +got\n%v\n", diff)
}
})
}
}

func createVMInstanceWithTag(t *testing.T, fakeGCE *gce.Cloud, tag string) {
err := fakeGCE.Compute().Instances().Insert(context.Background(),
meta.ZonalKey("k8s-test-node", "us-central1-b"),
&compute.Instance{
Name: "test-node",
Zone: "us-central1-b",
Tags: &compute.Tags{
Items: []string{tag},
},
})
if err != nil {
t.Errorf("failed to create instance err=%v", err)
}
}
7 changes: 6 additions & 1 deletion pkg/healthchecksl4/healthchecksl4.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/healthchecksprovider"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -62,14 +63,16 @@ type l4HealthChecks struct {
hcProvider healthChecksProvider
cloud *gce.Cloud
recorder record.EventRecorder
network network.NetworkInfo
}

func NewL4HealthChecks(cloud *gce.Cloud, recorder record.EventRecorder) *l4HealthChecks {
func NewL4HealthChecks(cloud *gce.Cloud, recorder record.EventRecorder, network network.NetworkInfo) *l4HealthChecks {
return &l4HealthChecks{
sharedResourcesLock: sharedLock,
cloud: cloud,
recorder: recorder,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
network: network,
}
}

Expand Down Expand Up @@ -223,6 +226,7 @@ func (l4hc *l4HealthChecks) ensureIPv4Firewall(svc *corev1.Service, namer namer.
Protocol: string(corev1.ProtocolTCP),
Name: hcFwName,
NodeNames: nodeNames,
Network: l4hc.network,
}
err := firewalls.EnsureL4LBFirewallForHc(svc, isSharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorder)
if err != nil {
Expand All @@ -249,6 +253,7 @@ func (l4hc *l4HealthChecks) ensureIPv6Firewall(svc *corev1.Service, namer namer.
Protocol: string(corev1.ProtocolTCP),
Name: ipv6HCFWName,
NodeNames: nodeNames,
Network: l4hc.network,
}
err := firewalls.EnsureL4LBFirewallForHc(svc, isSharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorder)
if err != nil {
Expand Down
27 changes: 21 additions & 6 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/ingress-gce/pkg/utils/namer"
Expand All @@ -56,12 +57,14 @@ const (
type L4Controller struct {
ctx *context.ControllerContext
// kubeClient, needed for attaching finalizer
client kubernetes.Interface
svcQueue utils.TaskQueue
numWorkers int
serviceLister cache.Indexer
nodeLister listers.NodeLister
stopCh chan struct{}
client kubernetes.Interface
svcQueue utils.TaskQueue
numWorkers int
serviceLister cache.Indexer
nodeLister listers.NodeLister
networkLister cache.Indexer
gkeNetworkParamSetLister cache.Indexer
stopCh chan struct{}
// needed for listing the zones in the cluster.
translator *translator.Translator
// needed for linking the NEG with the backend service for each ILB service.
Expand Down Expand Up @@ -100,6 +103,13 @@ func NewILBController(ctx *context.ControllerContext, stopCh chan struct{}) *L4C

l4c.svcQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers("l4", "services", l4c.numWorkers, l4c.sync)

if ctx.NetworkInformer != nil {
l4c.networkLister = ctx.NetworkInformer.GetIndexer()
}
if ctx.GKENetworkParamsInformer != nil {
l4c.gkeNetworkParamSetLister = ctx.GKENetworkParamsInformer.GetIndexer()
}

ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addSvc := obj.(*v1.Service)
Expand Down Expand Up @@ -227,6 +237,10 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service) *load
if err != nil {
return &loadbalancers.L4ILBSyncResult{Error: err}
}
network, err := network.ServiceNetwork(service, l4c.networkLister, l4c.gkeNetworkParamSetLister, l4c.ctx.Cloud, klog.TODO())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you needed to pass the logger there?
looks weird

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was a request from Swetha in the changes for NEG controller. They have this scoped log there and want to use it in the code they call. This function is shared in both controllers so I now have to pass some logger

if err != nil {
return &loadbalancers.L4ILBSyncResult{Error: err}
}
// Use the same function for both create and updates. If controller crashes and restarts,
// all existing services will show up as Service Adds.
l4ilbParams := &loadbalancers.L4ILBParams{
Expand All @@ -235,6 +249,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service) *load
Namer: l4c.namer,
Recorder: l4c.ctx.Recorder(service.Namespace),
DualStackEnabled: l4c.enableDualStack,
Network: *network,
}
l4 := loadbalancers.NewL4Handler(l4ilbParams)
syncResult := l4.EnsureInternalLoadBalancer(nodeNames, service)
Expand Down
Loading