Skip to content

Commit

Permalink
Add l4netlb metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kl52752 committed Mar 21, 2022
1 parent a277b80 commit 4a0fc05
Show file tree
Hide file tree
Showing 11 changed files with 583 additions and 56 deletions.
120 changes: 94 additions & 26 deletions pkg/l4lb/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,14 @@ func createLegacyForwardingRule(t *testing.T, svc *api_v1.Service, cloud *gce.Cl
// that the status field is as expected in each case.
func TestProcessCreateOrUpdate(t *testing.T) {
l4c := newServiceController(t, newFakeGCE())
prevMetrics := test.GetL4ILBLatencyMetric(t)
prevMetrics, err := test.GetL4ILBLatencyMetric()
if err != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", err)
}
newSvc := test.NewL4ILBService(false, 8080)
addILBService(l4c, newSvc)
addNEG(l4c, newSvc)
err := l4c.sync(getKeyForSvc(newSvc, t))
err = l4c.sync(getKeyForSvc(newSvc, t))
if err != nil {
t.Errorf("Failed to sync newly added service %s, err %v", newSvc.Name, err)
}
Expand All @@ -195,7 +198,11 @@ func TestProcessCreateOrUpdate(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(newSvc, true, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)
currMetrics, metricErr := test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)

// set the TrafficPolicy of the service to Local
newSvc.Spec.ExternalTrafficPolicy = api_v1.ServiceExternalTrafficPolicyTypeLocal
Expand All @@ -210,7 +217,11 @@ func TestProcessCreateOrUpdate(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(newSvc, true, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 1, UpdateCount: 1, UpperBoundSeconds: 1}, t)
currMetrics, metricErr = test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, UpdateCount: 1, UpperBoundSeconds: 1}, t)
// Remove the Internal LoadBalancer annotation, this should trigger a cleanup.
delete(newSvc.Annotations, gce.ServiceAnnotationLoadBalancerType)
updateILBService(l4c, newSvc)
Expand All @@ -225,7 +236,11 @@ func TestProcessCreateOrUpdate(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(newSvc, false, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 1, UpdateCount: 1, DeleteCount: 1, UpperBoundSeconds: 1}, t)
currMetrics, metricErr = test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, UpdateCount: 1, DeleteCount: 1, UpperBoundSeconds: 1}, t)
newSvc.DeletionTimestamp = &v1.Time{}
updateILBService(l4c, newSvc)
key, _ := common.KeyFunc(newSvc)
Expand Down Expand Up @@ -299,11 +314,14 @@ func TestProcessUpdateExternalTrafficPolicy(t *testing.T) {

func TestProcessDeletion(t *testing.T) {
l4c := newServiceController(t, newFakeGCE())
prevMetrics := test.GetL4ILBLatencyMetric(t)
prevMetrics, err := test.GetL4ILBLatencyMetric()
if err != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", err)
}
newSvc := test.NewL4ILBService(false, 8080)
addILBService(l4c, newSvc)
addNEG(l4c, newSvc)
err := l4c.sync(getKeyForSvc(newSvc, t))
err = l4c.sync(getKeyForSvc(newSvc, t))
if err != nil {
t.Errorf("Failed to sync newly added service %s, err %v", newSvc.Name, err)
}
Expand All @@ -313,7 +331,11 @@ func TestProcessDeletion(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(newSvc, true, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)
currMetrics, metricErr := test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)

// Mark the service for deletion by updating timestamp. Use svc instead of newSvc since that has the finalizer.
newSvc.DeletionTimestamp = &v1.Time{}
Expand All @@ -332,7 +354,11 @@ func TestProcessDeletion(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(newSvc, false, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 1, DeleteCount: 1, UpperBoundSeconds: 1}, t)
currMetrics, metricErr = test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, DeleteCount: 1, UpperBoundSeconds: 1}, t)
deleteILBService(l4c, newSvc)
newSvc, err = l4c.client.CoreV1().Services(newSvc.Namespace).Get(context2.TODO(), newSvc.Name, v1.GetOptions{})
if newSvc != nil {
Expand All @@ -342,12 +368,15 @@ func TestProcessDeletion(t *testing.T) {

func TestProcessCreateLegacyService(t *testing.T) {
l4c := newServiceController(t, newFakeGCE())
prevMetrics := test.GetL4ILBLatencyMetric(t)
prevMetrics, err := test.GetL4ILBLatencyMetric()
if err != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", err)
}
newSvc := test.NewL4ILBService(false, 8080)
// Set the legacy finalizer
newSvc.Finalizers = append(newSvc.Finalizers, common.LegacyILBFinalizer)
addILBService(l4c, newSvc)
err := l4c.sync(getKeyForSvc(newSvc, t))
err = l4c.sync(getKeyForSvc(newSvc, t))
if err != nil {
t.Errorf("Failed to sync newly added service %s, err %v", newSvc.Name, err)
}
Expand All @@ -357,12 +386,19 @@ func TestProcessCreateLegacyService(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(svc, false, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{}, t)
currMetrics, metricErr := test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{}, t)
}

func TestProcessCreateServiceWithLegacyInternalForwardingRule(t *testing.T) {
l4c := newServiceController(t, newFakeGCE())
prevMetrics := test.GetL4ILBLatencyMetric(t)
prevMetrics, err := test.GetL4ILBLatencyMetric()
if err != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", err)
}
newSvc := test.NewL4ILBService(false, 8080)
addILBService(l4c, newSvc)
// Mimic addition of NEG. This will not actually happen, but this test verifies that sync is skipped
Expand All @@ -372,7 +408,7 @@ func TestProcessCreateServiceWithLegacyInternalForwardingRule(t *testing.T) {
// A service can have the v1 finalizer reset due to a buggy script/manual operation.
// Subsetting controller should only process the service if it doesn't already have a forwarding rule.
createLegacyForwardingRule(t, newSvc, l4c.ctx.Cloud, string(cloud.SchemeInternal))
err := l4c.sync(getKeyForSvc(newSvc, t))
err = l4c.sync(getKeyForSvc(newSvc, t))
if err != nil {
t.Errorf("Failed to sync newly added service %s, err %v", newSvc.Name, err)
}
Expand All @@ -382,12 +418,19 @@ func TestProcessCreateServiceWithLegacyInternalForwardingRule(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(svc, false, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{}, t)
currMetrics, metricErr := test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{}, t)
}

func TestProcessCreateServiceWithLegacyExternalForwardingRule(t *testing.T) {
l4c := newServiceController(t, newFakeGCE())
prevMetrics := test.GetL4ILBLatencyMetric(t)
prevMetrics, err := test.GetL4ILBLatencyMetric()
if err != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", err)
}
newSvc := test.NewL4ILBService(false, 8080)
addILBService(l4c, newSvc)
// Mimic addition of NEG. This will happen in parallel with ILB sync, by the NEG controller.
Expand All @@ -396,7 +439,7 @@ func TestProcessCreateServiceWithLegacyExternalForwardingRule(t *testing.T) {
// Service processing should succeed in that case. The external forwarding rule will be deleted
// by service controller.
createLegacyForwardingRule(t, newSvc, l4c.ctx.Cloud, string(cloud.SchemeExternal))
err := l4c.sync(getKeyForSvc(newSvc, t))
err = l4c.sync(getKeyForSvc(newSvc, t))
if err != nil {
t.Errorf("Failed to sync newly added service %s, err %v", newSvc.Name, err)
}
Expand All @@ -406,12 +449,19 @@ func TestProcessCreateServiceWithLegacyExternalForwardingRule(t *testing.T) {
t.Errorf("Failed to lookup service %s, err: %v", newSvc.Name, err)
}
validateSvcStatus(svc, true, t)
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)
currMetrics, metricErr := test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)
}

func TestProcessUpdateClusterIPToILBService(t *testing.T) {
l4c := newServiceController(t, newFakeGCE())
prevMetrics := test.GetL4ILBLatencyMetric(t)
prevMetrics, err := test.GetL4ILBLatencyMetric()
if err != nil {
t.Errorf("Error getting L4 ILB latency metrics %v", err)
}
clusterSvc := &api_v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: "testsvc",
Expand All @@ -435,7 +485,7 @@ func TestProcessUpdateClusterIPToILBService(t *testing.T) {
t.Errorf("Incorrectly marked service %v as not needing update", newSvc)
}
addNEG(l4c, newSvc)
err := l4c.sync(getKeyForSvc(newSvc, t))
err = l4c.sync(getKeyForSvc(newSvc, t))
if err != nil {
t.Errorf("Failed to sync newly updated service %s, err %v", newSvc.Name, err)
}
Expand All @@ -446,7 +496,11 @@ func TestProcessUpdateClusterIPToILBService(t *testing.T) {
}
validateSvcStatus(newSvc, true, t)
// this will be a create metric since an ILB IP is being assigned for the first time.
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)
currMetrics, metricErr := test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 1, UpperBoundSeconds: 1}, t)
}

func TestProcessMultipleServices(t *testing.T) {
Expand All @@ -456,7 +510,10 @@ func TestProcessMultipleServices(t *testing.T) {
for _, onlyLocal := range []bool{true, false} {
t.Run(fmt.Sprintf("L4 with LocalMode=%v", onlyLocal), func(t *testing.T) {
l4c := newServiceController(t, newFakeGCE())
prevMetrics := test.GetL4ILBLatencyMetric(t)
prevMetrics, err := test.GetL4ILBLatencyMetric()
if err != nil {
t.Errorf("Error getting L4 ILB latency metrics %v", err)
}
go l4c.Run()
var svcNames []string
var testNs string
Expand Down Expand Up @@ -490,7 +547,11 @@ func TestProcessMultipleServices(t *testing.T) {
validateSvcStatus(newSvc, true, t)
}
// this will be a create metric since an ILB IP is being assigned for the first time.
prevMetrics.ValidateDiff(test.GetL4ILBLatencyMetric(t), &test.L4LBLatencyMetricInfo{CreateCount: 20, UpperBoundSeconds: 1}, t)
currMetrics, metricErr := test.GetL4ILBLatencyMetric()
if metricErr != nil {
t.Errorf("Error getting L4 ILB latency metrics err: %v", metricErr)
}
prevMetrics.ValidateDiff(currMetrics, &test.L4LBLatencyMetricInfo{CreateCount: 20, UpperBoundSeconds: 1}, t)

})
}
Expand Down Expand Up @@ -539,16 +600,23 @@ func TestProcessServiceWithDelayedNEGAdd(t *testing.T) {

func TestProcessServiceOnError(t *testing.T) {
l4c := newServiceController(t, newFakeGCEWithInsertError())
prevMetrics := test.GetL4ILBErrorMetric(t)
prevMetrics, err := test.GetL4ILBErrorMetric()
if err != nil {
t.Errorf("Error getting L4 ILB error metrics err: %v", err)
}
newSvc := test.NewL4ILBService(false, 8080)
addILBService(l4c, newSvc)
addNEG(l4c, newSvc)
err := l4c.sync(getKeyForSvc(newSvc, t))
err = l4c.sync(getKeyForSvc(newSvc, t))
if err == nil {
t.Fatalf("Failed to generate error when syncing service %s", newSvc.Name)
}
expectMetrics := &test.L4LBErrorMetricInfo{
ByGCEResource: map[string]uint64{annotations.ForwardingRuleResource: 1},
ByErrorType: map[string]uint64{http.StatusText(http.StatusInternalServerError): 1}}
prevMetrics.ValidateDiff(test.GetL4ILBErrorMetric(t), expectMetrics, t)
currMetrics, errMetrics := test.GetL4ILBErrorMetric()
if errMetrics != nil {
t.Errorf("Error getting L4 ILB error metrics err: %v", errMetrics)
}
prevMetrics.ValidateDiff(currMetrics, expectMetrics, t)
}
30 changes: 30 additions & 0 deletions pkg/l4lb/l4netlbcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/backends"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/instances"
l4metrics "k8s.io/ingress-gce/pkg/l4lb/metrics"
"k8s.io/ingress-gce/pkg/loadbalancers"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
Expand Down Expand Up @@ -107,6 +109,7 @@ func NewL4NetLBController(
}
},
})
//TODO change to component name "l4netlb-controller"
ctx.AddHealthCheck("service-controller health", l4netLBc.checkHealth)
return l4netLBc
}
Expand Down Expand Up @@ -290,12 +293,14 @@ func (lc *L4NetLBController) sync(key string) error {
klog.V(3).Infof("Ignoring sync of non-existent service %s", key)
return nil
}
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}.String()
if lc.needsDeletion(svc) {
klog.V(3).Infof("Deleting L4 External LoadBalancer resources for service %s", key)
result := lc.garbageCollectRBSNetLB(key, svc)
if result == nil {
return nil
}
lc.publishMetrics(result, namespacedName)
return result.Error
}

Expand All @@ -305,6 +310,7 @@ func (lc *L4NetLBController) sync(key string) error {
// result will be nil if the service was ignored(due to presence of service controller finalizer).
return nil
}
lc.publishMetrics(result, namespacedName)
return result.Error
}
klog.V(3).Infof("Ignoring sync of service %s, neither delete nor ensure needed.", key)
Expand Down Expand Up @@ -367,6 +373,7 @@ func (lc *L4NetLBController) syncInternal(service *v1.Service) *loadbalancers.L4
syncResult.Error = fmt.Errorf("failed to set resource annotations, err: %w", err)
return syncResult
}
syncResult.MetricsState.InSuccess = true
return syncResult
}

Expand Down Expand Up @@ -416,3 +423,26 @@ func (lc *L4NetLBController) garbageCollectRBSNetLB(key string, svc *v1.Service)
lc.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted L4 External LoadBalancer")
return result
}

// publishMetrics sets controller metrics for NetLB services and pushes NetLB metrics based on sync type.
func (lc *L4NetLBController) publishMetrics(result *loadbalancers.L4NetLBSyncResult, namespacedName string) {
if result == nil {
return
}
switch result.SyncType {
case loadbalancers.SyncTypeCreate, loadbalancers.SyncTypeUpdate:
klog.V(4).Infof("External L4 Loadbalancer for Service %s ensured, updating its state %v in metrics cache", namespacedName, result.MetricsState)
lc.ctx.ControllerMetrics.SetL4NetLBService(namespacedName, result.MetricsState)
l4metrics.PublishNetLBSyncMetrics(result.Error == nil, result.SyncType, result.GCEResourceInError, utils.GetErrorType(result.Error), result.StartTime)

case loadbalancers.SyncTypeDelete:
// if service is successfully deleted, remove it from cache
if result.Error == nil {
klog.V(4).Infof("External L4 Loadbalancer for Service %s deleted, removing its state from metrics cache", namespacedName)
lc.ctx.ControllerMetrics.DeleteL4NetLBService(namespacedName)
}
l4metrics.PublishNetLBSyncMetrics(result.Error == nil, result.SyncType, result.GCEResourceInError, utils.GetErrorType(result.Error), result.StartTime)
default:
klog.Warningf("Unknown sync type %q, skipping metrics", result.SyncType)
}
}
Loading

0 comments on commit 4a0fc05

Please sign in to comment.