Skip to content

Commit

Permalink
Merge pull request #1812 from panslava/change-healthchecks_l4-mutex
Browse files Browse the repository at this point in the history
Change healthchecksl4 sync mechanism
  • Loading branch information
k8s-ci-robot authored Oct 6, 2022
2 parents 0bbfe07 + 5af9549 commit 43e9cf6
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 68 deletions.
3 changes: 0 additions & 3 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
flag "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecksl4"
"k8s.io/ingress-gce/pkg/ingparams"
"k8s.io/ingress-gce/pkg/l4lb"
"k8s.io/ingress-gce/pkg/psc"
Expand Down Expand Up @@ -280,8 +279,6 @@ func runControllers(ctx *ingctx.ControllerContext) {

fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

healthchecksl4.Initialize(ctx.Cloud, ctx)

if flags.F.RunL4Controller {
l4Controller := l4lb.NewILBController(ctx, stopCh)
go l4Controller.Run()
Expand Down
57 changes: 20 additions & 37 deletions pkg/healthchecksl4/healthchecksl4.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/cloud-provider/service/helpers"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/healthchecksprovider"
"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -47,52 +47,36 @@ const (
)

var (
// instanceLock to prevent duplicate initialization.
instanceLock = &sync.Mutex{}
// instance is a singleton instance, created by Initialize
instance *l4HealthChecks
// sharedLock used to prevent race condition between shared health checks and firewalls.
sharedLock = &sync.Mutex{}
)

type l4HealthChecks struct {
// sharedResourceLock serializes operations on the healthcheck and firewall
// resources shared across multiple Services.
sharedResourcesLock sync.Mutex
sharedResourcesLock *sync.Mutex
hcProvider healthChecksProvider
cloud *gce.Cloud
recorderFactory events.RecorderProducer
recorder record.EventRecorder
}

// Initialize creates singleton instance, must be run before GetInstance() func
func Initialize(cloud *gce.Cloud, recorderFactory events.RecorderProducer) {
instanceLock.Lock()
defer instanceLock.Unlock()

if instance != nil {
klog.Error("Multiple L4 Healthchecks initialization attempts")
return
}

instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
func NewL4HealthChecks(cloud *gce.Cloud, recorder record.EventRecorder) *l4HealthChecks {
return &l4HealthChecks{
sharedResourcesLock: sharedLock,
cloud: cloud,
recorder: recorder,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
}
klog.V(3).Infof("Initialized L4 Healthchecks")
}

// Fake creates instance of l4HealthChecks. Use for test only.
func Fake(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4HealthChecks {
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
// Fake creates instance of l4HealthChecks with independent lock. Use for test only.
func Fake(cloud *gce.Cloud, recorder record.EventRecorder) *l4HealthChecks {
return &l4HealthChecks{
sharedResourcesLock: &sync.Mutex{},
cloud: cloud,
recorder: recorder,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
}
return instance
}

// GetInstance returns singleton instance, must be run after Initialize
func GetInstance() *l4HealthChecks {
return instance
}

// EnsureHealthCheckWithFirewall exist for the L4
Expand Down Expand Up @@ -197,7 +181,7 @@ func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string,
Name: hcFwName,
NodeNames: nodeNames,
}
return firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorderFactory.Recorder(svc.Namespace))
return firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorder)
}

// DeleteHealthCheckWithFirewall deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
Expand Down Expand Up @@ -285,8 +269,7 @@ func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) err
}
// Suppress Firewall XPN error, as this is no retryable and requires action by security admin
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
recorder := l4hc.recorderFactory.Recorder(svc.Namespace)
recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
l4hc.recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
return nil
}
return err
Expand Down
2 changes: 0 additions & 2 deletions pkg/l4lb/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"testing"
"time"

"k8s.io/ingress-gce/pkg/healthchecksl4"
"k8s.io/ingress-gce/pkg/loadbalancers"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
Expand Down Expand Up @@ -80,7 +79,6 @@ func newServiceController(t *testing.T, fakeGCE *gce.Cloud) *L4Controller {
for _, n := range nodes {
ctx.NodeInformer.GetIndexer().Add(n)
}
healthchecksl4.Fake(ctx.Cloud, ctx)
return NewILBController(ctx, stopCh)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ func newL4NetLBServiceController() *L4NetLBController {
for _, n := range nodes {
ctx.NodeInformer.GetIndexer().Add(n)
}
healthchecksl4.Fake(ctx.Cloud, ctx)
return NewL4NetLBController(ctx, stopCh)
}

Expand Down Expand Up @@ -873,7 +872,7 @@ func TestHealthCheckWhenExternalTrafficPolicyWasUpdated(t *testing.T) {
// delete shared health check if is created, update service to Cluster and
// check that non-shared health check was created
hcNameShared := lc.namer.L4HealthCheck(svc.Namespace, svc.Name, true)
healthchecksl4.Fake(lc.ctx.Cloud, lc.ctx).DeleteHealthCheckWithFirewall(svc, lc.namer, true, meta.Regional, utils.XLB)
healthchecksl4.Fake(lc.ctx.Cloud, lc.ctx.Recorder(svc.Namespace)).DeleteHealthCheckWithFirewall(svc, lc.namer, true, meta.Regional, utils.XLB)
// Update ExternalTrafficPolicy to Cluster check if shared HC was created
err = updateAndAssertExternalTrafficPolicy(newSvc, lc, v1.ServiceExternalTrafficPolicyTypeCluster, hcNameShared)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loadbalancers/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewL4Handler(params *L4ILBParams) *L4 {
namer: params.Namer,
recorder: params.Recorder,
Service: params.Service,
healthChecks: healthchecksl4.GetInstance(),
healthChecks: healthchecksl4.NewL4HealthChecks(params.Cloud, params.Recorder),
forwardingRules: forwardingrules.New(params.Cloud, meta.VersionGA, scope),
}
l4.NamespacedName = types.NamespacedName{Name: params.Service.Name, Namespace: params.Service.Namespace}
Expand Down
34 changes: 17 additions & 17 deletions pkg/loadbalancers/l4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestEnsureInternalBackendServiceUpdates(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

bsName := l4.namer.L4Backend(l4.Service.Namespace, l4.Service.Name)
_, err := l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(svc.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName, meta.VersionGA)
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestEnsureInternalLoadBalancer(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestEnsureInternalLoadBalancerTypeChange(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestEnsureInternalLoadBalancerClearPreviousResources(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestUpdateResourceLinks(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestEnsureInternalLoadBalancerHealthCheckConfigurable(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -550,7 +550,7 @@ func TestEnsureInternalLoadBalancerDeleted(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestEnsureInternalLoadBalancerDeletedTwiceDoesNotError(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -716,7 +716,7 @@ func ensureService(fakeGCE *gce.Cloud, namer *namer_util.L4Namer, nodeNames []st
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, zoneName); err != nil {
return nil, nil, &L4ILBSyncResult{Error: fmt.Errorf("Unexpected error when adding nodes %v", err)}
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestEnsureInternalLoadBalancerWithSpecialHealthCheck(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -860,7 +860,7 @@ func TestEnsureInternalLoadBalancerErrors(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

//lbName :=l4.namer.L4Backend(params.service.Namespace, params.service.Name)
frName := l4.GetFRName()
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -1039,7 +1039,7 @@ func TestEnsureInternalLoadBalancerCustomSubnet(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -1144,7 +1144,7 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

fwName := l4.namer.L4Backend(l4.Service.Namespace, l4.Service.Name)
tc := struct {
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func TestEnsureInternalLoadBalancerModifyProtocol(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -1305,7 +1305,7 @@ func TestEnsureInternalLoadBalancerAllPorts(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.Fake(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/loadbalancers/l4netlb.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewL4NetLB(params *L4NetLBParams) *L4NetLB {
Service: params.Service,
NamespacedName: types.NamespacedName{Name: params.Service.Name, Namespace: params.Service.Namespace},
backendPool: backends.NewPool(params.Cloud, params.Namer),
healthChecks: healthchecksl4.GetInstance(),
healthChecks: healthchecksl4.NewL4HealthChecks(params.Cloud, params.Recorder),
forwardingRules: forwardingrules.New(params.Cloud, meta.VersionGA, meta.Regional),
}
return l4netlb
Expand Down
10 changes: 5 additions & 5 deletions pkg/loadbalancers/l4netlb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestEnsureL4NetLoadBalancer(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4netlb := NewL4NetLB(l4NetLBParams)
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, l4netlb.recorder)

if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestDeleteL4NetLoadBalancer(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4NetLB := NewL4NetLB(l4NetLBParams)
l4NetLB.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4NetLB.healthChecks = healthchecksl4.Fake(fakeGCE, l4NetLB.recorder)

if _, err := test.CreateAndInsertNodes(l4NetLB.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -232,7 +232,7 @@ func ensureLoadBalancer(port int, vals gce.TestClusterValues, fakeGCE *gce.Cloud
Recorder: record.NewFakeRecorder(100),
}
l4NetLB := NewL4NetLB(l4NetLBParams)
l4NetLB.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4NetLB.healthChecks = healthchecksl4.Fake(fakeGCE, l4NetLB.recorder)

result := l4NetLB.EnsureFrontend(emptyNodes, svc)
if result.Error != nil {
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestMetricsForStandardNetworkTier(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4netlb := NewL4NetLB(l4NetLBParams)
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, l4netlb.recorder)

if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestEnsureNetLBFirewallDestinations(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4netlb := NewL4NetLB(l4NetLBParams)
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, l4netlb.recorder)

if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down

0 comments on commit 43e9cf6

Please sign in to comment.