Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable readiness reflector for Standalone NEG #927

Merged
merged 1 commit into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty
return err
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer, false)); err != nil {
if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true)); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have this on the wrong parameter.

return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err)
}
}
Expand Down
147 changes: 103 additions & 44 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package readiness

import (
"fmt"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/composite"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog"
"strconv"
"strings"
"sync"
)

Expand All @@ -48,10 +49,11 @@ func (n negMeta) String() string {

// podStatusPatcher interface allows patching pod status
type podStatusPatcher interface {
// syncPod patches the neg condition in the pod status to be True.
// key is the key to the pod. It is the namespaced name in the format of "namespace/name"
// negName is the name of the NEG resource
syncPod(key, negName string) error
// syncPod syncs the NEG readiness gate condition of the given pod.
// podKey is the key to the pod. It is the namespaced name in the format of "namespace/name"
// neg is the key of the NEG resource
// backendService is the key of the BackendService resource.
syncPod(podKey string, neg, backendService *meta.Key) error
}

// pollTarget is the target for polling
Expand Down Expand Up @@ -135,69 +137,126 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
}
defer p.unMarkPolling(key)

// TODO(freehan): refactor errList from pkg/neg/syncers to be reused here
var errList []error
klog.V(2).Infof("polling NEG %q in zone %q", key.Name, key.Zone)
// TODO(freehan): filter the NEs that are in interest once the API supports it
res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true, key.SyncerKey.GetAPIVersion())
if err != nil {
return true, err
}

// Traverse the response and check if the endpoints in interest are HEALTHY
func() {
p.lock.Lock()
defer p.lock.Unlock()
var healthyCount int
for _, r := range res {
healthy, err := p.processHealthStatus(key, r)
if healthy && err == nil {
healthyCount++
}
if err != nil {
errList = append(errList, err)
}
return p.processHealthStatus(key, res)
}

// processHealthStatus updates Pod readiness gates based on the input health status response.
//
// We update the pod (using the patcher) when:
// 1. if the endpoint considered healthy with one of the backend service health check
// 2. if the NEG is not associated with any health checks
// It returns true if retry is needed.
func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.NetworkEndpointWithHealthStatus) (bool, error) {
p.lock.Lock()
defer p.lock.Unlock()
klog.V(4).Infof("processHealthStatus(%q, %+v)", key.String(), healthStatuses)

var (
errList []error
// healthChecked indicates whether at least one of the endpoint in response has health status.
// If a NEG is attached to a Backend Service with health check, all endpoints
// in the NEG will be health checked. However, for new endpoints, it may take a while for the
// health check to start. The assumption is that if at least one of the endpoint in a NEG has
// health status, all the endpoints in the NEG should have health status eventually.
healthChecked bool
// patchCount is the count of the pod got patched
patchCount int
unhealthyPods []types.NamespacedName
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any harm in adding lots of logging at V(3) or above?

glog.V(3).Infof("processHealthStatus(%v, %+v)", ...)
etc...
glog.V(3).Infof("processing healthStatus %v ", ...)

for _, healthStatus := range healthStatuses {
if healthStatus == nil {
klog.Warningf("healthStatus is nil from response %+v", healthStatuses)
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would the healthStatus be nil? Also, you should log a warn or error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a sanity check since the Go Client parsing GCE API response is not bullet proof.
Will log a warning.

}
if healthyCount != len(p.pollMap[key].endpointMap) {
retry = true

if healthStatus.NetworkEndpoint == nil {
klog.Warningf("Health status has nil associated network endpoint: %v", healthStatus)
continue
}
}()
return retry, utilerrors.NewAggregate(errList)
}

// processHealthStatus evaluates the health status of the input network endpoint.
// Assumes p.lock is held when calling this method.
func (p *poller) processHealthStatus(key negMeta, healthStatus *composite.NetworkEndpointWithHealthStatus) (healthy bool, err error) {
ne := negtypes.NetworkEndpoint{
IP: healthStatus.NetworkEndpoint.IpAddress,
Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10),
Node: healthStatus.NetworkEndpoint.Instance,
healthChecked = healthChecked || hasHealthStatus(healthStatus)

ne := negtypes.NetworkEndpoint{
IP: healthStatus.NetworkEndpoint.IpAddress,
Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10),
Node: healthStatus.NetworkEndpoint.Instance,
}

podName, ok := p.getPod(key, ne)
if !ok {
// The pod is not in interest. Skip
continue
}

bsKey := getHealthyBackendService(healthStatus)
if bsKey == nil {
unhealthyPods = append(unhealthyPods, podName)
bowei marked this conversation as resolved.
Show resolved Hide resolved
continue
}

err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), bsKey)
if err != nil {
errList = append(errList, err)
continue
}
patchCount++
}
podName, ok := p.getPod(key, ne)
if !ok {
return false, nil

// if the NEG is not health checked, signal the patcher to mark the unhealthy pods to be Ready.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this happen? I think this needs an explanation.
(NEG not healthchecked => unhealthy pod = ready)

// This is most likely due to health check is not configured for the NEG. Hence none of the endpoints
// in the NEG has health status.
if !healthChecked {
for _, podName := range unhealthyPods {
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), nil)
if err != nil {
errList = append(errList, err)
continue
}
patchCount++
}
}

// If we didn't patch all of the endpoints, we must keep polling for health status
return patchCount < len(p.pollMap[key].endpointMap), utilerrors.NewAggregate(errList)
}

// getHealthyBackendService returns one of the first backend service key where the endpoint is considered healthy.
func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus) *meta.Key {
for _, hs := range healthStatus.Healths {
if hs == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be an Error log.
Also, kind of strange to have to check for nils everywhere

klog.Errorf("Health status is nil in health status of network endpoint %v ", healthStatus)
continue
}
if hs.BackendService == nil {
klog.Warningf("Backend service is nil in health status of network endpoint %v: %v", ne, hs)
klog.Errorf("Backend service is nil in health status of network endpoint %v", healthStatus)
continue
}

// This assumes the ingress backend service uses the NEG naming scheme. Hence the backend service share the same name as NEG.
if strings.Contains(hs.BackendService.BackendService, key.Name) {
if hs.HealthState == healthyState {
healthy = true
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), key.Name)
return healthy, err
if hs.HealthState == healthyState {
id, err := cloud.ParseResourceURL(hs.BackendService.BackendService)
if err != nil {
klog.Errorf("Failed to parse backend service reference from a Network Endpoint health status %v: %v", healthStatus, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue?

continue
}
if id != nil {
return id.Key
}
}

}
return false, nil
return nil
}

// hasHealthStatus returns true if there is at least 1 health status associated with the endpoint.
func hasHealthStatus(healthStatus *composite.NetworkEndpointWithHealthStatus) bool {
return healthStatus != nil && len(healthStatus.Healths) > 0
}

// getPod returns the namespaced name of a pod corresponds to an endpoint and whether the pod is registered
Expand Down
Loading