Skip to content

Commit

Permalink
Multi Network support for the ILB L4 controller.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmamczur committed Apr 7, 2023
1 parent b4ffb2b commit df633ce
Show file tree
Hide file tree
Showing 12 changed files with 435 additions and 21 deletions.
10 changes: 8 additions & 2 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"net/http"
"time"

"k8s.io/ingress-gce/pkg/multinetwork"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
api_v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -284,7 +286,7 @@ func (b *Backends) DeleteSignedUrlKey(be *composite.BackendService, keyName stri
}

// EnsureL4BackendService creates or updates the backend service with the given name.
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName) (*composite.BackendService, error) {
func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinity, scheme string, nm types.NamespacedName, network *multinetwork.NetworkInfo) (*composite.BackendService, error) {
start := time.Now()
klog.V(2).Infof("EnsureL4BackendService(%v, %v, %v): started", name, scheme, protocol)
defer func() {
Expand Down Expand Up @@ -313,6 +315,9 @@ func (b *Backends) EnsureL4BackendService(name, hcLink, protocol, sessionAffinit
SessionAffinity: utils.TranslateAffinityType(sessionAffinity),
LoadBalancingScheme: scheme,
}
if network != nil {
expectedBS.Network = network.NetworkURL
}
if protocol == string(api_v1.ProtocolTCP) {
expectedBS.ConnectionDraining = &composite.ConnectionDraining{DrainingTimeoutSec: DefaultConnectionDrainingTimeoutSeconds}
} else {
Expand Down Expand Up @@ -362,5 +367,6 @@ func backendSvcEqual(a, b *composite.BackendService) bool {
a.Description == b.Description &&
a.SessionAffinity == b.SessionAffinity &&
a.LoadBalancingScheme == b.LoadBalancingScheme &&
utils.EqualStringSets(a.HealthChecks, b.HealthChecks)
utils.EqualStringSets(a.HealthChecks, b.HealthChecks) &&
a.Network == b.Network
}
61 changes: 61 additions & 0 deletions pkg/backends/backends_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package backends

import (
"strings"
"testing"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/multinetwork"
"k8s.io/ingress-gce/pkg/utils"
namer "k8s.io/ingress-gce/pkg/utils/namer"
)

const (
kubeSystemUID = "ksuid123"
)

func TestEnsureL4BackendService(t *testing.T) {
serviceName := types.NamespacedName{Name: "test-service", Namespace: "test-ns"}
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
l4namer := namer.NewL4Namer(kubeSystemUID, nil)
backendPool := NewPool(fakeGCE, l4namer)

hcLink := l4namer.L4HealthCheck(serviceName.Namespace, serviceName.Name, false)
bsName := l4namer.L4Backend(serviceName.Namespace, serviceName.Name)
network := &multinetwork.NetworkInfo{NetworkURL: "https://www.googleapis.com/compute/v1/projects/test-poject/global/networks/test-vpc"}
bs, err := backendPool.EnsureL4BackendService(bsName, hcLink, "TCP", string(v1.ServiceAffinityNone), string(cloud.SchemeInternal), serviceName, network)
if err != nil {
t.Errorf("EnsureL4BackendService failed")
}

if bs.SessionAffinity != strings.ToUpper(string(v1.ServiceAffinityNone)) {
t.Errorf("BackendService.SessionAffinity was not populated correctly want=%q, got=%q", strings.ToUpper(string(v1.ServiceAffinityNone)), bs.SessionAffinity)
}
if bs.Network != network.NetworkURL {
t.Errorf("BackendService.Network was not populated correctly, want=%q, got=%q", network.NetworkURL, bs.Network)
}
if len(bs.HealthChecks) != 1 || bs.HealthChecks[0] != hcLink {
t.Errorf("BackendService.HealthChecks was not populated correctly, want=%q, got=%q", hcLink, bs.HealthChecks)
}
description, err := utils.MakeL4LBServiceDescription(serviceName.String(), "", meta.VersionGA, false, utils.ILB)
if err != nil {
t.Errorf("utils.MakeL4LBServiceDescription() failed %v", err)
}
if bs.Description != description {
t.Errorf("BackendService.Description was not populated correctly, want=%q, got=%q", description, bs.Description)
}
if bs.Protocol != "TCP" {
t.Errorf("BackendService.Protocol was not populated correctly, want=%q, got=%q", "TCP", bs.Protocol)
}
if bs.LoadBalancingScheme != string(cloud.SchemeInternal) {
t.Errorf("BackendService.LoadBalancingScheme was not populated correctly, want=%q, got=%q", string(cloud.SchemeInternal), bs.LoadBalancingScheme)
}
if bs.ConnectionDraining == nil || bs.ConnectionDraining.DrainingTimeoutSec != DefaultConnectionDrainingTimeoutSeconds {
t.Errorf("BackendService.ConnectionDraining was not populated correctly, want=connection draining with %q, got=%q", DefaultConnectionDrainingTimeoutSeconds, bs.ConnectionDraining)
}

}
2 changes: 1 addition & 1 deletion pkg/backends/regional_ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func createBackendService(t *testing.T, sp utils.ServicePort, backendPool *Backe
t.Helper()
namespacedName := types.NamespacedName{Name: "service.Name", Namespace: "service.Namespace"}
protocol := string(apiv1.ProtocolTCP)
if _, err := backendPool.EnsureL4BackendService(sp.BackendName(), hcLink, protocol, string(apiv1.ServiceAffinityNone), string(cloud.SchemeExternal), namespacedName); err != nil {
if _, err := backendPool.EnsureL4BackendService(sp.BackendName(), hcLink, protocol, string(apiv1.ServiceAffinityNone), string(cloud.SchemeExternal), namespacedName, nil); err != nil {
t.Fatalf("Error creating backend service %v", err)
}
}
8 changes: 7 additions & 1 deletion pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package firewalls

import (
"k8s.io/ingress-gce/pkg/multinetwork"
"strings"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand All @@ -39,6 +40,7 @@ type FirewallParams struct {
NodeNames []string
Protocol string
L4Type utils.L4LBType
Network *multinetwork.NetworkInfo
}

func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParams, sharedRule bool) error {
Expand All @@ -56,10 +58,14 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam
if err != nil {
klog.Warningf("EnsureL4FirewallRule(%v): failed to generate description for L4 %s rule, err: %v", params.Name, params.L4Type.ToString(), err)
}
networkURL := cloud.NetworkURL()
if params.Network != nil {
networkURL = params.Network.NetworkURL
}
expectedFw := &compute.Firewall{
Name: params.Name,
Description: fwDesc,
Network: cloud.NetworkURL(),
Network: networkURL,
SourceRanges: params.SourceRanges,
TargetTags: nodeTags,
Allowed: []*compute.FirewallAllowed{
Expand Down
126 changes: 126 additions & 0 deletions pkg/firewalls/firewalls_l4_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package firewalls

import (
"context"
"testing"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
compute "google.golang.org/api/compute/v1"
"k8s.io/cloud-provider-gcp/providers/gce"
"k8s.io/ingress-gce/pkg/multinetwork"
"k8s.io/ingress-gce/pkg/utils"
)

func TestEnsureL4FirewallRule(t *testing.T) {
firewallDescription, err := utils.MakeL4LBFirewallDescription(utils.ServiceKeyFunc("test-ns", "test-name"), "10.0.0.1", meta.VersionGA, false)
if err != nil {
t.Errorf("Failed making the description, err=%v", err)
}
tests := []struct {
desc string
nsName string
params *FirewallParams
shared bool
want *compute.Firewall
}{
{
desc: "default setup",
nsName: utils.ServiceKeyFunc("test-ns", "test-name"),
params: &FirewallParams{
Name: "test-firewall",
IP: "10.0.0.1",
SourceRanges: []string{
"10.1.2.8/29",
},
DestinationRanges: []string{
"10.1.2.16/29",
},
PortRanges: []string{"8080"},
NodeNames: []string{"k8s-test-node"},
Protocol: "TCP",
L4Type: utils.ILB,
Network: nil,
},
shared: false,
want: &compute.Firewall{
Name: "test-firewall",
Network: "",
SourceRanges: []string{
"10.1.2.8/29",
},
TargetTags: []string{"k8s-test"},
Description: firewallDescription,
Allowed: []*compute.FirewallAllowed{
{
IPProtocol: "tcp",
Ports: []string{"8080"},
},
},
},
},
{
desc: "non default network",
nsName: utils.ServiceKeyFunc("test-ns", "test-name"),
params: &FirewallParams{
Name: "test-firewall",
IP: "10.0.0.1",
SourceRanges: []string{
"10.1.2.8/29",
},
PortRanges: []string{"8080"},
NodeNames: []string{"k8s-test-node"},
Protocol: "TCP",
L4Type: utils.ILB,
Network: &multinetwork.NetworkInfo{
NetworkURL: "https://www.googleapis.com/compute/v1/projects/test-poject/global/networks/test-vpc",
},
},
shared: false,
want: &compute.Firewall{
Name: "test-firewall",
Network: "https://www.googleapis.com/compute/v1/projects/test-poject/global/networks/test-vpc",
SourceRanges: []string{
"10.1.2.8/29",
},
TargetTags: []string{"k8s-test"},
Description: firewallDescription,
Allowed: []*compute.FirewallAllowed{
{
IPProtocol: "tcp",
Ports: []string{"8080"},
},
},
},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
// Add some instance to act as the node so that target tags in the firewall can be resolved.
err = fakeGCE.Compute().Instances().Insert(context.Background(),
meta.ZonalKey("k8s-test-node", "us-central1-b"),
&compute.Instance{
Name: "test-node",
Zone: "us-central1-b",
Tags: &compute.Tags{
Items: []string{"k8s-test"},
},
})
if err != nil {
t.Errorf("failed to create instance err=%v", err)
}
if err := EnsureL4FirewallRule(fakeGCE, tc.nsName, tc.params, tc.shared); err != nil {
t.Errorf("EnsureL4FirewallRule() failed, err=%v", err)
}
firewall, err := fakeGCE.GetFirewall(tc.params.Name)
if err != nil {
t.Errorf("failed to get firewall err=%v", err)
}
if diff := cmp.Diff(tc.want, firewall, cmpopts.IgnoreFields(compute.Firewall{}, "SelfLink")); diff != "" {
t.Errorf("EnsureL4FirewallRule() diff -want +got\n%v\n", diff)
}
})
}
}
8 changes: 7 additions & 1 deletion pkg/healthchecksl4/healthchecksl4.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"k8s.io/ingress-gce/pkg/multinetwork"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -62,14 +64,16 @@ type l4HealthChecks struct {
hcProvider healthChecksProvider
cloud *gce.Cloud
recorder record.EventRecorder
network *multinetwork.NetworkInfo
}

func NewL4HealthChecks(cloud *gce.Cloud, recorder record.EventRecorder) *l4HealthChecks {
func NewL4HealthChecks(cloud *gce.Cloud, recorder record.EventRecorder, network *multinetwork.NetworkInfo) *l4HealthChecks {
return &l4HealthChecks{
sharedResourcesLock: sharedLock,
cloud: cloud,
recorder: recorder,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
network: network,
}
}

Expand Down Expand Up @@ -223,6 +227,7 @@ func (l4hc *l4HealthChecks) ensureIPv4Firewall(svc *corev1.Service, namer namer.
Protocol: string(corev1.ProtocolTCP),
Name: hcFwName,
NodeNames: nodeNames,
Network: l4hc.network,
}
err := firewalls.EnsureL4LBFirewallForHc(svc, isSharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorder)
if err != nil {
Expand All @@ -249,6 +254,7 @@ func (l4hc *l4HealthChecks) ensureIPv6Firewall(svc *corev1.Service, namer namer.
Protocol: string(corev1.ProtocolTCP),
Name: ipv6HCFWName,
NodeNames: nodeNames,
Network: l4hc.network,
}
err := firewalls.EnsureL4LBFirewallForHc(svc, isSharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorder)
if err != nil {
Expand Down
27 changes: 21 additions & 6 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
l4metrics "k8s.io/ingress-gce/pkg/l4lb/metrics"
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/metrics"
"k8s.io/ingress-gce/pkg/multinetwork"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
Expand All @@ -56,12 +57,14 @@ const (
type L4Controller struct {
ctx *context.ControllerContext
// kubeClient, needed for attaching finalizer
client kubernetes.Interface
svcQueue utils.TaskQueue
numWorkers int
serviceLister cache.Indexer
nodeLister listers.NodeLister
stopCh chan struct{}
client kubernetes.Interface
svcQueue utils.TaskQueue
numWorkers int
serviceLister cache.Indexer
nodeLister listers.NodeLister
networkLister cache.Indexer
gkeNetworkParamSetLister cache.Indexer
stopCh chan struct{}
// needed for listing the zones in the cluster.
translator *translator.Translator
// needed for linking the NEG with the backend service for each ILB service.
Expand Down Expand Up @@ -100,6 +103,13 @@ func NewILBController(ctx *context.ControllerContext, stopCh chan struct{}) *L4C

l4c.svcQueue = utils.NewPeriodicTaskQueueWithMultipleWorkers("l4", "services", l4c.numWorkers, l4c.sync)

if ctx.NetworkInformer != nil {
l4c.networkLister = ctx.NetworkInformer.GetIndexer()
}
if ctx.GKENetworkParamsInformer != nil {
l4c.gkeNetworkParamSetLister = ctx.GKENetworkParamsInformer.GetIndexer()
}

ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addSvc := obj.(*v1.Service)
Expand Down Expand Up @@ -227,6 +237,10 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service) *load
if err != nil {
return &loadbalancers.L4ILBSyncResult{Error: err}
}
network, err := multinetwork.ServiceNetwork(service, l4c.networkLister, l4c.gkeNetworkParamSetLister, l4c.ctx.Cloud)
if err != nil {
return &loadbalancers.L4ILBSyncResult{Error: err}
}
// Use the same function for both create and updates. If controller crashes and restarts,
// all existing services will show up as Service Adds.
l4ilbParams := &loadbalancers.L4ILBParams{
Expand All @@ -235,6 +249,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(service *v1.Service) *load
Namer: l4c.namer,
Recorder: l4c.ctx.Recorder(service.Namespace),
DualStackEnabled: l4c.enableDualStack,
Network: network,
}
l4 := loadbalancers.NewL4Handler(l4ilbParams)
syncResult := l4.EnsureInternalLoadBalancer(nodeNames, service)
Expand Down
Loading

0 comments on commit df633ce

Please sign in to comment.