Skip to content

Commit

Permalink
Rewrite L4 healthchecks: Apply review comments
Browse files Browse the repository at this point in the history
EnsureL4HealthCheck: replace lengthy return value list in with named struct
Improve firewall rule comparison
Added debug logs
And many more small ones
  • Loading branch information
cezarygerard committed May 17, 2022
1 parent a10c879 commit abf0c05
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 84 deletions.
24 changes: 17 additions & 7 deletions pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,28 @@ func EnsureL4FirewallRuleDeleted(cloud *gce.Cloud, fwName string) error {
}

func firewallRuleEqual(a, b *compute.Firewall, skipDescription bool) bool {
fwrEqual := len(a.Allowed) == 1 &&
len(a.Allowed) == len(b.Allowed) &&
a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol &&
utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) &&
utils.EqualStringSets(a.SourceRanges, b.SourceRanges) &&
if len(a.Allowed) != len(b.Allowed) {
return false
}
for i := range a.Allowed {
if !allowRulesEqual(a.Allowed[i], b.Allowed[i]) {
return false
}
}

srcAndTargetEqual := utils.EqualStringSets(a.SourceRanges, b.SourceRanges) &&
utils.EqualStringSets(a.TargetTags, b.TargetTags)

// Don't compare the "description" field for shared firewall rules
if skipDescription {
return fwrEqual
return srcAndTargetEqual
}
return fwrEqual && a.Description == b.Description
return srcAndTargetEqual && a.Description == b.Description
}

func allowRulesEqual(a *compute.FirewallAllowed, b *compute.FirewallAllowed) bool {
return a.IPProtocol == b.IPProtocol &&
utils.EqualStringSets(a.Ports, b.Ports)
}

func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error {
Expand Down
110 changes: 68 additions & 42 deletions pkg/healthchecks/healthchecks_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,37 @@ const (
)

var (
// instance is a sinngleton instance, created by InitializeL4
// instance is a singleton instance, created by InitializeL4
instance *l4HealthChecks
// mutex for preventing multiple initialization
initLock = &sync.Mutex{}
// instanceLock to prevent duplicate initialization.
instanceLock = &sync.Mutex{}
)

type l4HealthChecks struct {
mutex sync.Mutex
cloud *gce.Cloud
recorderFactory events.RecorderProducer
// Lock access to shared resources - node healthcecks and their firewall rules
sharedResourcesLock sync.Mutex
cloud *gce.Cloud
recorderFactory events.RecorderProducer
}

// InitializeL4 creates singleton instance, must be run before GetL4() func
func InitializeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) {
if instance != nil {
klog.Error("Multiple L4 Healthchecks initialization attempts")
return
}

instanceLock.Lock()
defer instanceLock.Unlock()

if instance == nil {
initLock.Lock()
defer initLock.Unlock()

if instance == nil {
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
klog.V(3).Infof("Initialized L4 Healthchecks")
}

}

// FakeL4 creates instance of l4HealthChecks> USe for test only.
Expand All @@ -92,59 +98,75 @@ func GetL4() *l4HealthChecks {
// for the healthcheck. If healthcheck is shared (external traffic policy 'cluster') then firewall rules will be shared
// regardless of scope param.
// If the healthcheck already exists, it is updated as needed.
func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) {
func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult {
hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
hcPath, hcPort := helpers.GetServiceHealthCheckPathPort(svc)
klog.V(3).Infof("Ensuring L4 healthcheck: %s and firewall rule %s from service %s/%s, shared: %v.", hcName, hcFwName, svc.Name, svc.Namespace, sharedHC)

if sharedHC {
hcPath, hcPort = gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort()
// lock out entire EnsureL4HealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
// We need to acquire a controller-wide mutex to ensure that in the case of a healthcheck shared between loadbalancers that the sync of the GCE resources is not performed in parallel.
l4hc.sharedResourcesLock.Lock()
defer l4hc.sharedResourcesLock.Unlock()
}
klog.V(3).Infof("L4 Healthcheck %s, path: %q, port %d", hcName, hcPath, hcPort)

namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := l4hc.ensureL4HealthCheckInternal(hcName, namespacedName, sharedHC, hcPath, hcPort, scope, l4Type)
if err != nil {
return "", "", "", annotations.HealthcheckResource, err
return &EnsureL4HealthCheckResult{
GceResourceInError: annotations.HealthcheckResource,
Err: err,
}
}

klog.V(3).Infof("Healthcheck created, ensuring firewall rule %s", hcFwName)
err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames)
if err != nil {
return "", "", "", annotations.FirewallForHealthcheckResource, err
return &EnsureL4HealthCheckResult{
GceResourceInError: annotations.HealthcheckResource,
Err: err,
}
}
return &EnsureL4HealthCheckResult{
HCName: hcName,
HCLink: hcLink,
HCFirewallRuleName: hcFwName,
}

return hcLink, hcFwName, hcName, "", err
}

// DeleteHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) {
if sharedHC {
// lock out entire DeleteHealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
}

hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}

klog.V(3).Infof("Trying to delete L4 healthcheck: %s and firewall rule %s from service %s/%s, shared: %v", hcName, hcFwName, svc.Name, svc.Namespace, sharedHC)
if sharedHC {
// We need to acquire a controller-wide mutex to ensure that in the case of a healthcheck shared between loadbalancers that the sync of the GCE resources is not performed in parallel.
l4hc.sharedResourcesLock.Lock()
defer l4hc.sharedResourcesLock.Unlock()
}

err := utils.IgnoreHTTPNotFound(l4hc.deleteHealthCheck(hcName, scope))
if err != nil {
// Ignore deletion error due to health check in use by another resource.
if !utils.IsInUsedByError(err) {
klog.Errorf("Failed to delete healthcheck for service %s - %v", namespacedName.String(), err)
return annotations.HealthcheckResource, err
}
// Ignore deletion error due to health check in use by another resource.
// This will be hit if this is a shared healthcheck.
klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName)
klog.V(2).Infof("Failed to delete healthcheck %s: shared health check in use.", hcName)
return "", nil
}
// Health check deleted, now delete the firewall rule
return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type)
}

func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(hcName string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
selfLink := ""
key, err := composite.CreateKey(l4hc.cloud, name, scope)
key, err := composite.CreateKey(l4hc.cloud, hcName, scope)
if err != nil {
return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", name, svcName.String())
return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", hcName, svcName.String())
}
hc, err := composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
if err != nil {
Expand All @@ -156,10 +178,11 @@ func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName typ
if scope == meta.Regional {
region = l4hc.cloud.Region()
}
expectedHC := NewL4HealthCheck(name, svcName, shared, path, port, l4Type, scope, region)
expectedHC := newL4HealthCheck(hcName, svcName, shared, path, port, l4Type, scope, region)

if hc == nil {
// Create the healthcheck
klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared)
klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v. Expected healthcheck: %v", hcName, svcName, shared, expectedHC)
err = composite.CreateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
Expand All @@ -170,20 +193,22 @@ func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName typ
selfLink = hc.SelfLink
if !needToUpdateHealthChecks(hc, expectedHC) {
// nothing to do
klog.V(3).Infof("Healthcheck %v already exists", hcName)
return hc, selfLink, nil
}
mergeHealthChecks(hc, expectedHC)
klog.V(2).Infof("Updating healthcheck %s for service %s", name, svcName)
klog.V(2).Infof("Updating healthcheck %s for service %s, updated healthcheck: %v", hcName, svcName, expectedHC)
err = composite.UpdateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
return expectedHC, selfLink, err
}

// ensureFirewall rule for L4 service.
// The firewall rules are the same for ILB and NetLB that use external traffic policy 'local' (sharedHC == true)
// despite healthchecks have different scopes (global vs regional)
// ensureFirewall rule for `svc`.
//
// L4 ILB and L4 NetLB Services with ExternalTrafficPolicy=Cluster use the same firewall
// rule at global scope.
func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string, hcPort int32, sharedHC bool, nodeNames []string) error {
// Add firewall rule for healthchecks to nodes
hcFWRParams := firewalls.FirewallParams{
Expand Down Expand Up @@ -213,9 +238,10 @@ func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcNam
return annotations.HealthcheckResource, err
}
if !safeToDelete {
klog.V(2).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName)
klog.V(3).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName)
return "", nil
}
klog.V(3).Infof("Deleting healthcheck firewall rule named: %s", hcFwName)
// Delete healthcheck firewall rule if no healthcheck uses the firewall rule.
err = l4hc.deleteFirewall(hcFwName, svc)
if err != nil {
Expand Down Expand Up @@ -255,7 +281,7 @@ func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) err
return nil
}

func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck {
func newL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck {
httpSettings := composite.HTTPHealthCheck{
Port: int64(port),
RequestPath: path,
Expand All @@ -282,7 +308,7 @@ func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, pa

// mergeHealthChecks reconciles HealthCheck config to be no smaller than
// the default values. newHC is assumed to have defaults,
// since it is created by the NewL4HealthCheck call.
// since it is created by the newL4HealthCheck call.
// E.g. old health check interval is 2s, new has the default of 8.
// The HC interval will be reconciled to 8 seconds.
// If the existing health check values are larger than the default interval,
Expand Down
8 changes: 4 additions & 4 deletions pkg/healthchecks/healthchecks_l4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMergeHealthChecks(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
// healthcheck intervals and thresholds are common for Global and Regional healthchecks. Hence testing only Global case.
wantHC := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
wantHC := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
hc := &composite.HealthCheck{
CheckIntervalSec: tc.checkIntervalSec,
TimeoutSec: tc.timeoutSec,
Expand Down Expand Up @@ -97,8 +97,8 @@ func TestCompareHealthChecks(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
// healthcheck intervals and thresholds are common for Global and Regional healthchecks. Hence testing only Global case.
hc := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
wantHC := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
hc := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
wantHC := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
if tc.modifier != nil {
tc.modifier(hc)
}
Expand All @@ -120,7 +120,7 @@ func TestCreateHealthCheck(t *testing.T) {
{meta.Global, ""},
{meta.Regional, "us-central1"},
} {
hc := NewL4HealthCheck("hc", namespaceName, false, "/", 12345, utils.ILB, v.scope, v.region)
hc := newL4HealthCheck("hc", namespaceName, false, "/", 12345, utils.ILB, v.scope, v.region)
if hc.Region != v.region {
t.Errorf("HealthCheck Region mismatch! %v != %v", hc.Region, v.region)
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/healthchecks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
)
Expand Down Expand Up @@ -57,3 +58,19 @@ type HealthChecker interface {
Delete(name string, scope meta.KeyType) error
Get(name string, version meta.Version, scope meta.KeyType) (*translator.HealthCheck, error)
}

// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services
type L4HealthChecks interface {
// EnsureL4HealthCheck creates health check (and firewall rule) for l4 service
EnsureL4HealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service
DeleteHealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error)
}

type EnsureL4HealthCheckResult struct {
HCName string
HCLink string
HCFirewallRuleName string
GceResourceInError string
Err error
}
11 changes: 0 additions & 11 deletions pkg/loadbalancers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ package loadbalancers

import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/networking/v1"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
)

// LoadBalancerPool is an interface to manage the cloud resources associated
Expand All @@ -40,11 +37,3 @@ type LoadBalancerPool interface {
// HasUrlMap returns true if an URL map exists in GCE for given ingress.
HasUrlMap(ing *v1.Ingress) (bool, error)
}

// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services
type L4HealthChecks interface {
// EnsureL4HealthCheck creates health check (and firewall rule) for l4 service
EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error)
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service
DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error)
}
16 changes: 8 additions & 8 deletions pkg/loadbalancers/l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type L4 struct {
Service *corev1.Service
ServicePort utils.ServicePort
NamespacedName types.NamespacedName
l4HealthChecks L4HealthChecks
l4HealthChecks healthchecks.L4HealthChecks
}

// L4ILBSyncResult contains information about the outcome of an L4 ILB sync. It stores the list of resource name annotations,
Expand Down Expand Up @@ -215,14 +215,14 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service)

// create healthcheck
sharedHC := !helpers.RequestsOnlyLocalTraffic(l.Service)
hcLink, hcFwName, hcName, resourceInErr, err := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, nodeNames)
hcResult := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, nodeNames)

if err != nil {
result.GCEResourceInError = resourceInErr
result.Error = err
if hcResult.Err != nil {
result.GCEResourceInError = hcResult.GceResourceInError
result.Error = hcResult.Err
return result
}
result.Annotations[annotations.HealthcheckKey] = hcName
result.Annotations[annotations.HealthcheckKey] = hcResult.HCName

_, portRanges, _, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports)

Expand All @@ -247,7 +247,7 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service)
return result
}
result.Annotations[annotations.FirewallRuleKey] = name
result.Annotations[annotations.FirewallRuleForHealthcheckKey] = hcFwName
result.Annotations[annotations.FirewallRuleForHealthcheckKey] = hcResult.HCFirewallRuleName

// Check if protocol has changed for this service. In this case, forwarding rule should be deleted before
// the backend service can be updated.
Expand All @@ -265,7 +265,7 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service)
}

// ensure backend service
bs, err := l.backendPool.EnsureL4BackendService(name, hcLink, string(protocol), string(l.Service.Spec.SessionAffinity),
bs, err := l.backendPool.EnsureL4BackendService(name, hcResult.HCLink, string(protocol), string(l.Service.Spec.SessionAffinity),
string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA)
if err != nil {
result.GCEResourceInError = annotations.BackendServiceResource
Expand Down
8 changes: 4 additions & 4 deletions pkg/loadbalancers/l4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) {

// Create the expected resources necessary for an Internal Load Balancer
sharedHC := !servicehelper.RequestsOnlyLocalTraffic(svc)
hcLink, _, _, _, err := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, []string{})
hcResult := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, []string{})

if err != nil {
t.Errorf("Failed to create healthcheck, err %v", err)
if hcResult.Err != nil {
t.Errorf("Failed to create healthcheck, err %v", hcResult.Err)
}
_, err = l.backendPool.EnsureL4BackendService(lbName, hcLink, "TCP", string(l.Service.Spec.SessionAffinity),
_, err := l.backendPool.EnsureL4BackendService(lbName, hcResult.HCLink, "TCP", string(l.Service.Spec.SessionAffinity),
string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA)
if err != nil {
t.Errorf("Failed to create backendservice, err %v", err)
Expand Down
Loading

0 comments on commit abf0c05

Please sign in to comment.