Skip to content

Commit

Permalink
Change healthchecksl4 sync method
Browse files Browse the repository at this point in the history
Removed shared global struct, instead only use shared mutex
For fake, return healthchecks instance with independent mutex
  • Loading branch information
panslava committed Sep 28, 2022
1 parent 0a04a59 commit 6c3c4f0
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 68 deletions.
3 changes: 0 additions & 3 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
flag "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecksl4"
"k8s.io/ingress-gce/pkg/ingparams"
"k8s.io/ingress-gce/pkg/l4lb"
"k8s.io/ingress-gce/pkg/psc"
Expand Down Expand Up @@ -280,8 +279,6 @@ func runControllers(ctx *ingctx.ControllerContext) {

fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

healthchecksl4.Initialize(ctx.Cloud, ctx)

if flags.F.RunL4Controller {
l4Controller := l4lb.NewILBController(ctx, stopCh)
go l4Controller.Run()
Expand Down
57 changes: 20 additions & 37 deletions pkg/healthchecksl4/healthchecksl4.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/cloud-provider/service/helpers"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/healthchecksprovider"
"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -47,52 +47,36 @@ const (
)

var (
// instanceLock to prevent duplicate initialization.
instanceLock = &sync.Mutex{}
// instance is a singleton instance, created by Initialize
instance *l4HealthChecks
// sharedLock used to prevent race condition between shared health checks and firewalls.
sharedLock = &sync.Mutex{}
)

type l4HealthChecks struct {
// sharedResourceLock serializes operations on the healthcheck and firewall
// resources shared across multiple Services.
sharedResourcesLock sync.Mutex
sharedResourcesLock *sync.Mutex
hcProvider healthChecksProvider
cloud *gce.Cloud
recorderFactory events.RecorderProducer
recorder record.EventRecorder
}

// Initialize creates singleton instance, must be run before GetInstance() func
func Initialize(cloud *gce.Cloud, recorderFactory events.RecorderProducer) {
instanceLock.Lock()
defer instanceLock.Unlock()

if instance != nil {
klog.Error("Multiple L4 Healthchecks initialization attempts")
return
}

instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
func NewL4HealthChecks(cloud *gce.Cloud, recorder record.EventRecorder) *l4HealthChecks {
return &l4HealthChecks{
sharedResourcesLock: sharedLock,
cloud: cloud,
recorder: recorder,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
}
klog.V(3).Infof("Initialized L4 Healthchecks")
}

// Fake creates instance of l4HealthChecks. Use for test only.
func Fake(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4HealthChecks {
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
// FakeNotSynced creates instance of l4HealthChecks with independent lock. Use for test only.
func FakeNotSynced(cloud *gce.Cloud, recorder record.EventRecorder) *l4HealthChecks {
return &l4HealthChecks{
sharedResourcesLock: &sync.Mutex{},
cloud: cloud,
recorder: recorder,
hcProvider: healthchecksprovider.NewHealthChecks(cloud, meta.VersionGA),
}
return instance
}

// GetInstance returns singleton instance, must be run after Initialize
func GetInstance() *l4HealthChecks {
return instance
}

// EnsureHealthCheckWithFirewall exist for the L4
Expand Down Expand Up @@ -197,7 +181,7 @@ func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string,
Name: hcFwName,
NodeNames: nodeNames,
}
return firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorderFactory.Recorder(svc.Namespace))
return firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorder)
}

// DeleteHealthCheckWithFirewall deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
Expand Down Expand Up @@ -285,8 +269,7 @@ func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) err
}
// Suppress Firewall XPN error, as this is no retryable and requires action by security admin
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
recorder := l4hc.recorderFactory.Recorder(svc.Namespace)
recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
l4hc.recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
return nil
}
return err
Expand Down
2 changes: 0 additions & 2 deletions pkg/l4lb/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"testing"
"time"

"k8s.io/ingress-gce/pkg/healthchecksl4"
"k8s.io/ingress-gce/pkg/loadbalancers"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
Expand Down Expand Up @@ -80,7 +79,6 @@ func newServiceController(t *testing.T, fakeGCE *gce.Cloud) *L4Controller {
for _, n := range nodes {
ctx.NodeInformer.GetIndexer().Add(n)
}
healthchecksl4.Fake(ctx.Cloud, ctx)
return NewILBController(ctx, stopCh)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ func newL4NetLBServiceController() *L4NetLBController {
for _, n := range nodes {
ctx.NodeInformer.GetIndexer().Add(n)
}
healthchecksl4.Fake(ctx.Cloud, ctx)
return NewL4NetLBController(ctx, stopCh)
}

Expand Down Expand Up @@ -873,7 +872,7 @@ func TestHealthCheckWhenExternalTrafficPolicyWasUpdated(t *testing.T) {
// delete shared health check if is created, update service to Cluster and
// check that non-shared health check was created
hcNameShared := lc.namer.L4HealthCheck(svc.Namespace, svc.Name, true)
healthchecksl4.Fake(lc.ctx.Cloud, lc.ctx).DeleteHealthCheckWithFirewall(svc, lc.namer, true, meta.Regional, utils.XLB)
healthchecksl4.FakeNotSynced(lc.ctx.Cloud, lc.ctx.Recorder(svc.Namespace)).DeleteHealthCheckWithFirewall(svc, lc.namer, true, meta.Regional, utils.XLB)
// Update ExternalTrafficPolicy to Cluster check if shared HC was created
err = updateAndAssertExternalTrafficPolicy(newSvc, lc, v1.ServiceExternalTrafficPolicyTypeCluster, hcNameShared)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loadbalancers/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewL4Handler(params *L4ILBParams) *L4 {
namer: params.Namer,
recorder: params.Recorder,
Service: params.Service,
healthChecks: healthchecksl4.GetInstance(),
healthChecks: healthchecksl4.NewL4HealthChecks(params.Cloud, params.Recorder),
forwardingRules: forwardingrules.New(params.Cloud, meta.VersionGA, scope),
}
l4.NamespacedName = types.NamespacedName{Name: params.Service.Name, Namespace: params.Service.Namespace}
Expand Down
34 changes: 17 additions & 17 deletions pkg/loadbalancers/l4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestEnsureInternalBackendServiceUpdates(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

bsName := l4.namer.L4Backend(l4.Service.Namespace, l4.Service.Name)
_, err := l4.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(svc.Spec.SessionAffinity), string(cloud.SchemeInternal), l4.NamespacedName, meta.VersionGA)
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestEnsureInternalLoadBalancer(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestEnsureInternalLoadBalancerTypeChange(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestEnsureInternalLoadBalancerClearPreviousResources(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestUpdateResourceLinks(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestEnsureInternalLoadBalancerHealthCheckConfigurable(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -550,7 +550,7 @@ func TestEnsureInternalLoadBalancerDeleted(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -589,7 +589,7 @@ func TestEnsureInternalLoadBalancerDeletedTwiceDoesNotError(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -716,7 +716,7 @@ func ensureService(fakeGCE *gce.Cloud, namer *namer_util.L4Namer, nodeNames []st
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, zoneName); err != nil {
return nil, nil, &L4ILBSyncResult{Error: fmt.Errorf("Unexpected error when adding nodes %v", err)}
Expand Down Expand Up @@ -748,7 +748,7 @@ func TestEnsureInternalLoadBalancerWithSpecialHealthCheck(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -860,7 +860,7 @@ func TestEnsureInternalLoadBalancerErrors(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

//lbName :=l4.namer.L4Backend(params.service.Namespace, params.service.Name)
frName := l4.GetFRName()
Expand Down Expand Up @@ -950,7 +950,7 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -1039,7 +1039,7 @@ func TestEnsureInternalLoadBalancerCustomSubnet(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -1144,7 +1144,7 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

fwName := l4.namer.L4Backend(l4.Service.Namespace, l4.Service.Name)
tc := struct {
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func TestEnsureInternalLoadBalancerModifyProtocol(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

_, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName)
if err != nil {
Expand Down Expand Up @@ -1305,7 +1305,7 @@ func TestEnsureInternalLoadBalancerAllPorts(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4 := NewL4Handler(l4ilbParams)
l4.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4ilbParams.Recorder)

if _, err := test.CreateAndInsertNodes(l4.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/loadbalancers/l4netlb.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewL4NetLB(params *L4NetLBParams) *L4NetLB {
Service: params.Service,
NamespacedName: types.NamespacedName{Name: params.Service.Name, Namespace: params.Service.Namespace},
backendPool: backends.NewPool(params.Cloud, params.Namer),
healthChecks: healthchecksl4.GetInstance(),
healthChecks: healthchecksl4.NewL4HealthChecks(params.Cloud, params.Recorder),
forwardingRules: forwardingrules.New(params.Cloud, meta.VersionGA, meta.Regional),
}
portId := utils.ServicePortID{Service: l4netlb.NamespacedName}
Expand Down
10 changes: 5 additions & 5 deletions pkg/loadbalancers/l4netlb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestEnsureL4NetLoadBalancer(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4netlb := NewL4NetLB(l4NetLBParams)
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4netlb.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4netlb.recorder)

if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestDeleteL4NetLoadBalancer(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4NetLB := NewL4NetLB(l4NetLBParams)
l4NetLB.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4NetLB.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4NetLB.recorder)

if _, err := test.CreateAndInsertNodes(l4NetLB.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -232,7 +232,7 @@ func ensureLoadBalancer(port int, vals gce.TestClusterValues, fakeGCE *gce.Cloud
Recorder: record.NewFakeRecorder(100),
}
l4NetLB := NewL4NetLB(l4NetLBParams)
l4NetLB.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4NetLB.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4NetLB.recorder)

result := l4NetLB.EnsureFrontend(emptyNodes, svc)
if result.Error != nil {
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestMetricsForStandardNetworkTier(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4netlb := NewL4NetLB(l4NetLBParams)
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4netlb.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4netlb.recorder)

if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestEnsureNetLBFirewallDestinations(t *testing.T) {
Recorder: record.NewFakeRecorder(100),
}
l4netlb := NewL4NetLB(l4NetLBParams)
l4netlb.healthChecks = healthchecksl4.Fake(fakeGCE, &test.FakeRecorderSource{})
l4netlb.healthChecks = healthchecksl4.FakeNotSynced(fakeGCE, l4netlb.recorder)

if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil {
t.Errorf("Unexpected error when adding nodes %v", err)
Expand Down

0 comments on commit 6c3c4f0

Please sign in to comment.