Skip to content

Commit

Permalink
Add L4 ILB usage metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
skmatti committed Feb 24, 2020
1 parent 490cb4b commit 973106d
Show file tree
Hide file tree
Showing 8 changed files with 348 additions and 66 deletions.
4 changes: 2 additions & 2 deletions pkg/l4/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
if err := common.EnsureServiceFinalizer(service, common.ILBFinalizerV2, l4c.ctx.KubeClient); err != nil {
return fmt.Errorf("Failed to attach finalizer to service %s/%s, err %v", service.Namespace, service.Name, err)
}
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.ctx.ClusterNamer, l4c.ctx.Recorder(service.Namespace), &l4c.sharedResourcesLock)
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.ctx.ClusterNamer, l4c.ctx.Recorder(service.Namespace), &l4c.sharedResourcesLock, l4c.ctx.ControllerMetrics)
nodeNames, err := utils.GetReadyNodeNames(l4c.nodeLister)
if err != nil {
return err
Expand Down Expand Up @@ -172,7 +172,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}

func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) error {
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.ctx.ClusterNamer, l4c.ctx.Recorder(svc.Namespace), &l4c.sharedResourcesLock)
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.ctx.ClusterNamer, l4c.ctx.Recorder(svc.Namespace), &l4c.sharedResourcesLock, l4c.ctx.ControllerMetrics)
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key)
if err := l4.EnsureInternalLoadBalancerDeleted(svc); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancerFailed", "Error deleting load balancer: %v", err)
Expand Down
37 changes: 29 additions & 8 deletions pkg/loadbalancers/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/healthchecks"
"k8s.io/ingress-gce/pkg/metrics"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"
Expand All @@ -50,11 +51,13 @@ type L4 struct {
ServicePort utils.ServicePort
NamespacedName types.NamespacedName
sharedResourcesLock *sync.Mutex
// metricsCollector exports L4 ILB service controller usage metrics.
metricsCollector metrics.L4ILBMetricsCollector
}

// NewL4Handler creates a new L4Handler for the given L4 service.
func NewL4Handler(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer *namer.Namer, recorder record.EventRecorder, lock *sync.Mutex) *L4 {
l := &L4{cloud: cloud, scope: scope, namer: namer, recorder: recorder, Service: service, sharedResourcesLock: lock}
func NewL4Handler(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer *namer.Namer, recorder record.EventRecorder, lock *sync.Mutex, collector metrics.L4ILBMetricsCollector) *L4 {
l := &L4{cloud: cloud, scope: scope, namer: namer, recorder: recorder, Service: service, sharedResourcesLock: lock, metricsCollector: collector}
l.NamespacedName = types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
l.backendPool = backends.NewPool(l.cloud, l.namer)
l.ServicePort = utils.ServicePort{ID: utils.ServicePortID{Service: l.NamespacedName}, BackendNamer: l.namer,
Expand Down Expand Up @@ -135,13 +138,17 @@ func (l *L4) EnsureInternalLoadBalancerDeleted(svc *corev1.Service) error {
}
err = utils.IgnoreHTTPNotFound(healthchecks.DeleteHealthCheck(l.cloud, hcName))
if err != nil {
// This error will be hit if this is a shared healthcheck.
if utils.IsInUsedByError(err) {
klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName)
return retErr
if !utils.IsInUsedByError(err) {
klog.Errorf("Failed to delete healthcheck for internal loadbalancer service %s, err %v", l.NamespacedName.String(), err)
return err
}
klog.Errorf("Failed to delete healthcheck for internal loadbalancer service %s, err %v", l.NamespacedName.String(), err)
return err
// Ignore deletion error due to health check in use by another resource.
// This will be hit if this is a shared healthcheck.
klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName)
}
if retErr == nil {
klog.V(6).Infof("Service %s deleted, removing its state from metrics cache", l.NamespacedName.String())
l.metricsCollector.DeleteL4ILBService(l.NamespacedName.String())
}
return retErr
}
Expand Down Expand Up @@ -226,5 +233,19 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service)
klog.Errorf("EnsureInternalLoadBalancer: Failed to create forwarding rule - %v", err)
return nil, err
}

var serviceState metrics.L4ILBServiceState
if options.AllowGlobalAccess {
serviceState.EnabledGlobalAccess = true
}
// SubnetName is overrided to nil value if Alpha feature gate for custom subnet
// is not enabled. So, a non empty subnet name at this point implies that the
// feature is in use.
if options.SubnetName != "" {
serviceState.EnabledCustomSubnet = true
}
klog.V(6).Infof("Service %s ensured, updating its state %v in metrics cache", l.NamespacedName.String(), serviceState)
l.metricsCollector.SetL4ILBService(l.NamespacedName.String(), serviceState)

return &corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{{IP: fr.IPAddress}}}, nil
}
35 changes: 20 additions & 15 deletions pkg/loadbalancers/l4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"google.golang.org/api/compute/v1"
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/metrics"
"k8s.io/ingress-gce/pkg/utils"
"reflect"
"strings"
Expand All @@ -45,6 +46,10 @@ const (
// eventMsgFirewallChange = "XPN Firewall change required by network admin"
)

var (
fakeMetricsCollector = metrics.NewControllerMetrics()
)

func getFakeGCECloud(vals gce.TestClusterValues) *gce.Cloud {
fakeGCE := gce.NewFakeGCECloud(vals)
// InsertHook required to assign an IP Address for forwarding rule
Expand All @@ -64,7 +69,7 @@ func TestEnsureInternalBackendServiceUpdates(t *testing.T) {
fakeGCE := getFakeGCECloud(gce.DefaultTestClusterValues())
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
bsName := l.namer.PrimaryIPNEG(l.Service.Namespace, l.Service.Name)
_, err := l.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(svc.Spec.SessionAffinity), string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA)
if err != nil {
Expand Down Expand Up @@ -113,7 +118,7 @@ func TestEnsureInternalLoadBalancer(t *testing.T) {

svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand All @@ -137,7 +142,7 @@ func TestEnsureInternalLoadBalancerTypeChange(t *testing.T) {

svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -170,7 +175,7 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) {
fakeGCE := getFakeGCECloud(vals)
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -212,7 +217,7 @@ func TestEnsureInternalLoadBalancerClearPreviousResources(t *testing.T) {
fakeGCE := getFakeGCECloud(vals)
svc := test.NewL4ILBService(true, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -322,7 +327,7 @@ func TestUpdateResourceLinks(t *testing.T) {
fakeGCE := getFakeGCECloud(vals)
svc := test.NewL4ILBService(true, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -399,7 +404,7 @@ func TestEnsureInternalLoadBalancerHealthCheckConfigurable(t *testing.T) {
fakeGCE := getFakeGCECloud(vals)
svc := test.NewL4ILBService(true, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -439,7 +444,7 @@ func TestEnsureInternalLoadBalancerDeleted(t *testing.T) {
nodeNames := []string{"test-node-1"}
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -469,7 +474,7 @@ func TestEnsureInternalLoadBalancerDeletedTwiceDoesNotError(t *testing.T) {
nodeNames := []string{"test-node-1"}
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -504,7 +509,7 @@ func TestEnsureInternalLoadBalancerWithSpecialHealthCheck(t *testing.T) {
nodeNames := []string{"test-node-1"}
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -609,7 +614,7 @@ func TestEnsureInternalLoadBalancerErrors(t *testing.T) {
tc.adjustParams(params)
}
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(params.service, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(params.service, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
//lbName := l.namer.PrimaryIPNEG(params.service.Namespace, params.service.Name)
frName := l.GetFRName()
key, err := composite.CreateKey(l.cloud, frName, meta.Regional)
Expand Down Expand Up @@ -690,7 +695,7 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) {
nodeNames := []string{"test-node-1"}
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -776,7 +781,7 @@ func TestEnsureInternalLoadBalancerDisableGlobalAccess(t *testing.T) {
nodeNames := []string{"test-node-1"}
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -838,7 +843,7 @@ func TestEnsureInternalLoadBalancerCustomSubnet(t *testing.T) {

svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
_, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName)
if err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -930,7 +935,7 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) {
fakeGCE := getFakeGCECloud(vals)
svc := test.NewL4ILBService(false, 8080)
namer := namer_util.NewNamer(clusterUID, "")
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{})
l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}, fakeMetricsCollector)
fwName := l.namer.PrimaryIPNEG(l.Service.Namespace, l.Service.Name)
tc := struct {
Input []int
Expand Down
13 changes: 10 additions & 3 deletions pkg/metrics/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,16 @@ const (
cookieAffinity = feature("CookieAffinity")
customRequestHeaders = feature("CustomRequestHeaders")

standaloneNeg = feature("StandaloneNEG")
ingressNeg = feature("IngressNEG")
asmNeg = feature("AsmNEG")
standaloneNeg = feature("StandaloneNEG")
ingressNeg = feature("IngressNEG")
asmNeg = feature("AsmNEG")
vmPrimaryIpNeg = feature("VmPrimaryIpNEG")
vmPrimaryIpNegLocal = feature("VmPrimaryIpNegLocal")
vmPrimaryIpNegCluster = feature("VmPrimaryIpNegCluster")

l4ILBService = feature("L4ILBService")
l4IlbGlobalAccess = feature("L4ILBGlobalAccess")
l4IlbCustomSubnet = feature("L4ILBCustomSubnet")
)

// featuresForIngress returns the list of features for given ingress.
Expand Down
Loading

0 comments on commit 973106d

Please sign in to comment.