Skip to content

Commit

Permalink
enable readiness reflector for Standalone NEG
Browse files Browse the repository at this point in the history
  • Loading branch information
freehan committed Nov 12, 2019
1 parent 011cb28 commit cd2dce2
Show file tree
Hide file tree
Showing 6 changed files with 345 additions and 134 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty
return err
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer, false)); err != nil {
if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true)); err != nil {
return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err)
}
}
Expand Down
146 changes: 103 additions & 43 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package readiness

import (
"fmt"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/composite"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog"
"strconv"
"strings"
"sync"
)

Expand All @@ -48,10 +49,11 @@ func (n negMeta) String() string {

// podStatusPatcher interface allows patching pod status
type podStatusPatcher interface {
// syncPod patches the neg condition in the pod status to be True.
// key is the key to the pod. It is the namespaced name in the format of "namespace/name"
// negName is the name of the NEG resource
syncPod(key, negName string) error
// syncPod syncs the NEG readiness gate condition of the given pod.
// podKey is the key to the pod. It is the namespaced name in the format of "namespace/name"
// neg is the key of the NEG resource
// backendService is the key of the BackendService resource.
syncPod(podKey string, neg, backendService *meta.Key) error
}

// pollTarget is the target for polling
Expand Down Expand Up @@ -135,69 +137,127 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
}
defer p.unMarkPolling(key)

// TODO(freehan): refactor errList from pkg/neg/syncers to be reused here
var errList []error
klog.V(2).Infof("polling NEG %q in zone %q", key.Name, key.Zone)
// TODO(freehan): filter the NEs that are in interest once the API supports it
res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true, key.SyncerKey.GetAPIVersion())
if err != nil {
return true, err
}

// Traverse the response and check if the endpoints in interest are HEALTHY
func() {
p.lock.Lock()
defer p.lock.Unlock()
var healthyCount int
for _, r := range res {
healthy, err := p.processHealthStatus(key, r)
if healthy && err == nil {
healthyCount++
}
return p.processHealthStatus(key, res)
}

// processHealthStatus updates Pod readiness gates based on the input health status response.
//
// We update the pod (using the patcher) when:
// 1. if the endpoint considered healthy with one of the backend service health check
// 2. if the NEG is not associated with any health checks
// It returns true if retry is needed.
func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.NetworkEndpointWithHealthStatus) (bool, error) {
p.lock.Lock()
defer p.lock.Unlock()
klog.V(4).Infof("processHealthStatus(%q, %+v)", key.String(), healthStatuses)

var (
errList []error
// healthChecked indicates whether at least one of the endpoint in response has health status.
// If a NEG is attached to a Backend Service with health check, all endpoints
// in the NEG will be health checked. However, for new endpoints, it may take a while for the
// health check to start. The assumption is that if at least one of the endpoint in a NEG has
// health status, all the endpoints in the NEG should have health status eventually.
healthChecked bool
// patchCount is the count of the pod got patched
patchCount int
unhealthyPods []types.NamespacedName
)

for _, healthStatus := range healthStatuses {
if healthStatus == nil {
klog.Warningf("healthStatus is nil from response %+v", healthStatuses)
continue
}

if healthStatus.NetworkEndpoint == nil {
klog.Warningf("Health status has nil associated network endpoint: %v", healthStatus)
continue
}

healthChecked = healthChecked || hasHealthStatus(healthStatus)

ne := negtypes.NetworkEndpoint{
IP: healthStatus.NetworkEndpoint.IpAddress,
Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10),
Node: healthStatus.NetworkEndpoint.Instance,
}

podName, ok := p.getPod(key, ne)
if !ok {
// The pod is not in interest. Skip
continue
}

bsKey := getHealthyBackendService(healthStatus)
if bsKey == nil {
unhealthyPods = append(unhealthyPods, podName)
continue
} else {
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), bsKey)
if err != nil {
errList = append(errList, err)
continue
} else {
patchCount++
}
}
if healthyCount != len(p.pollMap[key].endpointMap) {
retry = true
}
}()
return retry, utilerrors.NewAggregate(errList)
}

// processHealthStatus evaluates the health status of the input network endpoint.
// Assumes p.lock is held when calling this method.
func (p *poller) processHealthStatus(key negMeta, healthStatus *composite.NetworkEndpointWithHealthStatus) (healthy bool, err error) {
ne := negtypes.NetworkEndpoint{
IP: healthStatus.NetworkEndpoint.IpAddress,
Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10),
Node: healthStatus.NetworkEndpoint.Instance,
}
podName, ok := p.getPod(key, ne)
if !ok {
return false, nil

// if the NEG is not health checked, signal the patcher to mark the unhealthy pods to be Ready.
// This is most likely due to health check is not configured for the NEG. Hence none of the endpoints
// in the NEG has health status.
if !healthChecked {
for _, podName := range unhealthyPods {
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), nil)
if err != nil {
errList = append(errList, err)
} else {
patchCount++
}
}
}

// If we didn't patch all of the endpoints, we must keep polling for health status
return patchCount < len(p.pollMap[key].endpointMap), utilerrors.NewAggregate(errList)
}

// getHealthyBackendService returns one of the first backend service key where the endpoint is considered healthy.
func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus) *meta.Key {
for _, hs := range healthStatus.Healths {
if hs == nil {
klog.Errorf("Health status is nil in health status of network endpoint %v ", healthStatus)
continue
}
if hs.BackendService == nil {
klog.Warningf("Backend service is nil in health status of network endpoint %v: %v", ne, hs)
klog.Errorf("Backend service is nil in health status of network endpoint %v", healthStatus)
continue
}

// This assumes the ingress backend service uses the NEG naming scheme. Hence the backend service share the same name as NEG.
if strings.Contains(hs.BackendService.BackendService, key.Name) {
if hs.HealthState == healthyState {
healthy = true
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), key.Name)
return healthy, err
if hs.HealthState == healthyState {
id, err := cloud.ParseResourceURL(hs.BackendService.BackendService)
if err != nil {
klog.Errorf("Failed to parse backend service reference from a Network Endpoint health status %v: %v", healthStatus, err)
continue
}
if id != nil {
return id.Key
}
}

}
return false, nil
return nil
}

// hasHealthStatus returns true if there is at least 1 health status associated with the endpoint.
func hasHealthStatus(healthStatus *composite.NetworkEndpointWithHealthStatus) bool {
return healthStatus != nil && len(healthStatus.Healths) > 0
}

// getPod returns the namespaced name of a pod corresponds to an endpoint and whether the pod is registered
Expand Down
Loading

0 comments on commit cd2dce2

Please sign in to comment.