From 5603304b271e1181809676a8e117d497c922a1a2 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Mon, 4 Nov 2019 14:31:46 -0800 Subject: [PATCH] enable readiness reflector for Standalone NEG --- pkg/neg/controller.go | 2 +- pkg/neg/readiness/poller.go | 147 ++++++++++++++------ pkg/neg/readiness/poller_test.go | 208 +++++++++++++++++++++------- pkg/neg/readiness/reflector.go | 41 ++++-- pkg/neg/readiness/reflector_test.go | 73 +++++++--- pkg/neg/types/mock.go | 9 +- 6 files changed, 345 insertions(+), 135 deletions(-) diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 5b202e27c8..a36ef434e8 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -416,7 +416,7 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty return err } - if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer, false)); err != nil { + if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true)); err != nil { return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err) } } diff --git a/pkg/neg/readiness/poller.go b/pkg/neg/readiness/poller.go index a50d9c1dd4..4a81d73755 100644 --- a/pkg/neg/readiness/poller.go +++ b/pkg/neg/readiness/poller.go @@ -18,6 +18,8 @@ package readiness import ( "fmt" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/cache" @@ -25,7 +27,6 @@ import ( negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" "strconv" - "strings" "sync" ) @@ -48,10 +49,11 @@ func (n negMeta) String() string { // podStatusPatcher interface allows patching pod status type podStatusPatcher interface { - // syncPod patches the neg condition in the pod status to be True. - // key is the key to the pod. It is the namespaced name in the format of "namespace/name" - // negName is the name of the NEG resource - syncPod(key, negName string) error + // syncPod syncs the NEG readiness gate condition of the given pod. + // podKey is the key to the pod. It is the namespaced name in the format of "namespace/name" + // neg is the key of the NEG resource + // backendService is the key of the BackendService resource. + syncPod(podKey string, neg, backendService *meta.Key) error } // pollTarget is the target for polling @@ -135,8 +137,6 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) { } defer p.unMarkPolling(key) - // TODO(freehan): refactor errList from pkg/neg/syncers to be reused here - var errList []error klog.V(2).Infof("polling NEG %q in zone %q", key.Name, key.Zone) // TODO(freehan): filter the NEs that are in interest once the API supports it res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true, key.SyncerKey.GetAPIVersion()) @@ -144,60 +144,119 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) { return true, err } - // Traverse the response and check if the endpoints in interest are HEALTHY - func() { - p.lock.Lock() - defer p.lock.Unlock() - var healthyCount int - for _, r := range res { - healthy, err := p.processHealthStatus(key, r) - if healthy && err == nil { - healthyCount++ - } - if err != nil { - errList = append(errList, err) - } + return p.processHealthStatus(key, res) +} + +// processHealthStatus updates Pod readiness gates based on the input health status response. +// +// We update the pod (using the patcher) when: +// 1. if the endpoint considered healthy with one of the backend service health check +// 2. if the NEG is not associated with any health checks +// It returns true if retry is needed. +func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.NetworkEndpointWithHealthStatus) (bool, error) { + p.lock.Lock() + defer p.lock.Unlock() + klog.V(4).Infof("processHealthStatus(%q, %+v)", key.String(), healthStatuses) + + var ( + errList []error + // healthChecked indicates whether at least one of the endpoint in response has health status. + // If a NEG is attached to a Backend Service with health check, all endpoints + // in the NEG will be health checked. However, for new endpoints, it may take a while for the + // health check to start. The assumption is that if at least one of the endpoint in a NEG has + // health status, all the endpoints in the NEG should have health status eventually. + healthChecked bool + // patchCount is the count of the pod got patched + patchCount int + unhealthyPods []types.NamespacedName + ) + + for _, healthStatus := range healthStatuses { + if healthStatus == nil { + klog.Warningf("healthStatus is nil from response %+v", healthStatuses) + continue } - if healthyCount != len(p.pollMap[key].endpointMap) { - retry = true + + if healthStatus.NetworkEndpoint == nil { + klog.Warningf("Health status has nil associated network endpoint: %v", healthStatus) + continue } - }() - return retry, utilerrors.NewAggregate(errList) -} -// processHealthStatus evaluates the health status of the input network endpoint. -// Assumes p.lock is held when calling this method. -func (p *poller) processHealthStatus(key negMeta, healthStatus *composite.NetworkEndpointWithHealthStatus) (healthy bool, err error) { - ne := negtypes.NetworkEndpoint{ - IP: healthStatus.NetworkEndpoint.IpAddress, - Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10), - Node: healthStatus.NetworkEndpoint.Instance, + healthChecked = healthChecked || hasHealthStatus(healthStatus) + + ne := negtypes.NetworkEndpoint{ + IP: healthStatus.NetworkEndpoint.IpAddress, + Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10), + Node: healthStatus.NetworkEndpoint.Instance, + } + + podName, ok := p.getPod(key, ne) + if !ok { + // The pod is not in interest. Skip + continue + } + + bsKey := getHealthyBackendService(healthStatus) + if bsKey == nil { + unhealthyPods = append(unhealthyPods, podName) + continue + } + + err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), bsKey) + if err != nil { + errList = append(errList, err) + continue + } + patchCount++ } - podName, ok := p.getPod(key, ne) - if !ok { - return false, nil + + // if the NEG is not health checked, signal the patcher to mark the unhealthy pods to be Ready. + // This is most likely due to health check is not configured for the NEG. Hence none of the endpoints + // in the NEG has health status. + if !healthChecked { + for _, podName := range unhealthyPods { + err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), nil) + if err != nil { + errList = append(errList, err) + continue + } + patchCount++ + } } + // If we didn't patch all of the endpoints, we must keep polling for health status + return patchCount < len(p.pollMap[key].endpointMap), utilerrors.NewAggregate(errList) +} + +// getHealthyBackendService returns one of the first backend service key where the endpoint is considered healthy. +func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus) *meta.Key { for _, hs := range healthStatus.Healths { if hs == nil { + klog.Errorf("Health status is nil in health status of network endpoint %v ", healthStatus) continue } if hs.BackendService == nil { - klog.Warningf("Backend service is nil in health status of network endpoint %v: %v", ne, hs) + klog.Errorf("Backend service is nil in health status of network endpoint %v", healthStatus) continue } - // This assumes the ingress backend service uses the NEG naming scheme. Hence the backend service share the same name as NEG. - if strings.Contains(hs.BackendService.BackendService, key.Name) { - if hs.HealthState == healthyState { - healthy = true - err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), key.Name) - return healthy, err + if hs.HealthState == healthyState { + id, err := cloud.ParseResourceURL(hs.BackendService.BackendService) + if err != nil { + klog.Errorf("Failed to parse backend service reference from a Network Endpoint health status %v: %v", healthStatus, err) + continue + } + if id != nil { + return id.Key } } - } - return false, nil + return nil +} + +// hasHealthStatus returns true if there is at least 1 health status associated with the endpoint. +func hasHealthStatus(healthStatus *composite.NetworkEndpointWithHealthStatus) bool { + return healthStatus != nil && len(healthStatus.Healths) > 0 } // getPod returns the namespaced name of a pod corresponds to an endpoint and whether the pod is registered diff --git a/pkg/neg/readiness/poller_test.go b/pkg/neg/readiness/poller_test.go index 0af3b43cb4..20756e187b 100644 --- a/pkg/neg/readiness/poller_test.go +++ b/pkg/neg/readiness/poller_test.go @@ -22,16 +22,49 @@ import ( "strconv" "testing" + "fmt" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/types" negtypes "k8s.io/ingress-gce/pkg/neg/types" namer_util "k8s.io/ingress-gce/pkg/utils/namer" + "reflect" ) +type testPatcher struct { + count int + lastPod string + lastNegKey *meta.Key + lastBsKey *meta.Key +} + +func (p *testPatcher) syncPod(pod string, negKey, bsKey *meta.Key) error { + p.count++ + p.lastPod = pod + p.lastNegKey = negKey + p.lastBsKey = bsKey + return nil +} + +func (p *testPatcher) Eval(t *testing.T, pod string, negKey, bsKey *meta.Key) { + if p.lastPod != pod { + t.Errorf("expect pod = %q, but got %q", pod, p.lastPod) + } + + if !reflect.DeepEqual(p.lastNegKey, negKey) { + t.Errorf("expect neg key = %v, but got %v", negKey, p.lastNegKey) + } + + if !reflect.DeepEqual(p.lastBsKey, bsKey) { + t.Errorf("expect backend service key = %v, but got %v", bsKey, p.lastBsKey) + } +} + func newFakePoller() *poller { reflector := newTestReadinessReflector(fakeContext()) - return reflector.poller + poller := reflector.poller + poller.patcher = &testPatcher{} + return poller } func TestPollerEndpointRegistrationAndScanForWork(t *testing.T) { @@ -314,6 +347,7 @@ func TestPoll(t *testing.T) { t.Parallel() poller := newFakePoller() + pacherTester := poller.patcher.(*testPatcher) negCloud := poller.negCloud namer := namer_util.NewNamer("clusteruid", "") @@ -329,8 +363,38 @@ func TestPoll(t *testing.T) { ip := "10.1.2.3" port := int64(80) instance := "k8s-node-xxxxxx" + irrelevantEntry := negtypes.NetworkEndpointEntry{ + NetworkEndpoint: &compute.NetworkEndpoint{ + IpAddress: ip, + Port: port, + Instance: "foo-instance", + }, + Healths: []*compute.HealthStatusForNetworkEndpoint{ + { + BackendService: &compute.BackendServiceReference{ + BackendService: negName, + }, + HealthState: "HEALTHY", + }, + }, + } - // mark polling to true + pollAndValidate := func(desc string, expectErr bool, expectRetry bool, expectPatchCount int) { + retry, err := poller.Poll(key) + if expectErr && err == nil { + t.Errorf("For case %q, expect err, but got %v", desc, err) + } else if !expectErr && err != nil { + t.Errorf("For case %q, does not expect err, but got %v", desc, err) + } + if retry != expectRetry { + t.Errorf("For case %q, expect retry = %v, but got %v", desc, expectRetry, retry) + } + if pacherTester.count != expectPatchCount { + t.Errorf("For case %q, expect pacherTester.count = %v, but got %v", desc, expectPatchCount, pacherTester.count) + } + } + + step := "mark polling to true" poller.pollMap[key] = &pollTarget{ endpointMap: negtypes.EndpointPodMap{ negtypes.NetworkEndpoint{IP: ip, Port: strconv.FormatInt(port, 10), Node: instance}: types.NamespacedName{Namespace: ns, Name: podName}, @@ -338,71 +402,109 @@ func TestPoll(t *testing.T) { polling: true, } - retry, err := poller.Poll(key) - if err != nil { - t.Errorf("Does not expect err, but got %v", err) - } - if retry != true { - t.Errorf("Expect retry = true, but got %v", retry) - } + pollAndValidate(step, false, true, 0) + pollAndValidate(step, false, true, 0) - // unmark polling + step = "unmark polling" poller.pollMap[key].polling = false - retry, err = poller.Poll(key) - // expect NEG not exist error - if err == nil { - t.Errorf("Expect err, but got %v", err) - } - if retry != true { - t.Errorf("Expect retry = true, but got %v", retry) - } + pollAndValidate(step, true, true, 0) + pollAndValidate(step, true, true, 0) + step = "NEG exists, but with no endpoint" // create NEG, but with no endpoint negCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{Name: negName, Zone: zone, Version: meta.VersionGA}, zone) - retry, err = poller.Poll(key) - if err != nil { - t.Errorf("Does not expect err, but got %v", err) - } - if retry != true { - t.Errorf("Expect retry = true, but got %v", retry) - } + pollAndValidate(step, false, true, 0) + pollAndValidate(step, false, true, 0) - // add NE to the NEG, but NE not healthy - ne := &composite.NetworkEndpoint{ + step = "NE added to the NEG, but NE health status is empty" + ne := &compute.NetworkEndpoint{ IpAddress: ip, Port: port, Instance: instance, } - negCloud.AttachNetworkEndpoints(negName, zone, []*composite.NetworkEndpoint{ne}, meta.VersionGA) - retry, err = poller.Poll(key) - if err != nil { - t.Errorf("Does not expect err, but got %v", err) - } - if retry != true { - t.Errorf("Expect retry = true, but got %v", retry) - } - // add NE with healthy status - negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), negtypes.NetworkEndpointEntry{ - NetworkEndpoint: &compute.NetworkEndpoint{ - IpAddress: ip, - Port: port, - Instance: instance, + negCloud.AttachNetworkEndpoints(negName, zone, []*composite.NetworkEndpoint{{ + IpAddress: ip, + Port: port, + Instance: instance, + }}, meta.VersionGA) + // add NE with empty healthy status + negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), []negtypes.NetworkEndpointEntry{ + { + NetworkEndpoint: ne, + Healths: []*compute.HealthStatusForNetworkEndpoint{}, }, - Healths: []*compute.HealthStatusForNetworkEndpoint{ - { - BackendService: &compute.BackendServiceReference{ - BackendService: negName, + }) + + pollAndValidate(step, false, false, 1) + pollAndValidate(step, false, false, 2) + pacherTester.Eval(t, fmt.Sprintf("%v/%v", ns, podName), meta.ZonalKey(negName, zone), nil) + + step = "NE health status is empty and there are other endpoint with health status in NEG" + negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), []negtypes.NetworkEndpointEntry{ + irrelevantEntry, + { + NetworkEndpoint: ne, + Healths: []*compute.HealthStatusForNetworkEndpoint{}, + }, + }) + pollAndValidate(step, false, true, 2) + pollAndValidate(step, false, true, 2) + + step = "NE has nonhealthy status" + negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), []negtypes.NetworkEndpointEntry{ + { + NetworkEndpoint: ne, + Healths: []*compute.HealthStatusForNetworkEndpoint{ + { + BackendService: &compute.BackendServiceReference{ + BackendService: negName, + }, + HealthState: "UNKNOWN", }, - HealthState: healthyState, }, }, }) - retry, err = poller.Poll(key) - if err != nil { - t.Errorf("Does not expect err, but got %v", err) - } - if retry != false { - t.Errorf("Expect retry = false, but got %v", retry) - } + pollAndValidate(step, false, true, 2) + pollAndValidate(step, false, true, 2) + + step = "NE has nonhealthy status with irrelevant entry" + negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), []negtypes.NetworkEndpointEntry{ + irrelevantEntry, + { + NetworkEndpoint: ne, + Healths: []*compute.HealthStatusForNetworkEndpoint{ + { + BackendService: &compute.BackendServiceReference{ + BackendService: negName, + }, + HealthState: "UNKNOWN", + }, + }, + }, + }) + pollAndValidate(step, false, true, 2) + pollAndValidate(step, false, true, 2) + + step = "NE has healthy status" + bsName := "bar" + backendServiceUrl := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/foo/global/backendServices/%v", bsName) + negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), []negtypes.NetworkEndpointEntry{ + { + NetworkEndpoint: ne, + Healths: []*compute.HealthStatusForNetworkEndpoint{ + { + BackendService: &compute.BackendServiceReference{ + BackendService: backendServiceUrl, + }, + HealthState: healthyState, + }, + }, + }, + irrelevantEntry, + }) + pollAndValidate(step, false, false, 3) + pacherTester.Eval(t, fmt.Sprintf("%v/%v", ns, podName), meta.ZonalKey(negName, zone), meta.GlobalKey(bsName)) + pollAndValidate(step, false, false, 4) + pacherTester.Eval(t, fmt.Sprintf("%v/%v", ns, podName), meta.ZonalKey(negName, zone), meta.GlobalKey(bsName)) } diff --git a/pkg/neg/readiness/reflector.go b/pkg/neg/readiness/reflector.go index c232d3d625..11c03b3b7a 100644 --- a/pkg/neg/readiness/reflector.go +++ b/pkg/neg/readiness/reflector.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" @@ -43,6 +44,8 @@ const ( negReadyReason = "LoadBalancerNegReady" // negReadyTimedOutReason is the pod condition reason when timeout is reached but pod is still not healthy in NEG negReadyTimedOutReason = "LoadBalancerNegTimeout" + // negReadyUnhealthCheckedReason is the pod condition reason when pod is in a NEG without associated health checking + negReadyUnhealthCheckedReason = "LoadBalancerNegWithoutHealthCheck" // negNotReadyReason is the pod condition reason when pod is not healthy in NEG negNotReadyReason = "LoadBalancerNegNotReady" // unreadyTimeout is the timeout for health status feedback for pod readiness. If load balancer health @@ -114,7 +117,7 @@ func (r *readinessReflector) processNextWorkItem() bool { } defer r.queue.Done(key) - err := r.syncPod(key.(string), "") + err := r.syncPod(key.(string), nil, nil) r.handleErr(err, key) return true } @@ -137,13 +140,13 @@ func (r *readinessReflector) handleErr(err error, key interface{}) { } // syncPod process pod and patch the NEG readiness condition if needed -// if neg is specified, it means pod is Healthy in the NEG. -func (r *readinessReflector) syncPod(key string, neg string) (err error) { +// if neg and backendService is specified, it means pod is Healthy in the NEG attached to backendService. +func (r *readinessReflector) syncPod(podKey string, neg, backendService *meta.Key) (err error) { // podUpdateLock to ensure there is no race in pod status update r.podUpdateLock.Lock() defer r.podUpdateLock.Unlock() - namespace, name, err := cache.SplitMetaNamespaceKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(podKey) if err != nil { return err } @@ -153,7 +156,7 @@ func (r *readinessReflector) syncPod(key string, neg string) (err error) { return err } if !exists { - klog.V(5).Infof("Pod %q is no longer exists. Skipping", key) + klog.V(5).Infof("Pod %q is no longer exists. Skipping", podKey) return nil } @@ -162,18 +165,29 @@ func (r *readinessReflector) syncPod(key string, neg string) (err error) { return nil } - klog.V(4).Infof("Syncing Pod %q", key) - expectedCondition := r.getExpectedNegCondition(pod, neg) + klog.V(4).Infof("syncPod(%q, %v, %v)", podKey, neg, backendService) + expectedCondition := r.getExpectedNegCondition(pod, neg, backendService) return r.ensurePodNegCondition(pod, expectedCondition) } // getExpectedCondition returns the expected NEG readiness condition for the given pod -func (r *readinessReflector) getExpectedNegCondition(pod *v1.Pod, neg string) v1.PodCondition { +func (r *readinessReflector) getExpectedNegCondition(pod *v1.Pod, neg, backendService *meta.Key) v1.PodCondition { expectedCondition := v1.PodCondition{Type: shared.NegReadinessGate} - if len(neg) > 0 { - expectedCondition.Status = v1.ConditionTrue - expectedCondition.Reason = negReadyReason - expectedCondition.Message = fmt.Sprintf("Pod has become Healthy in NEG %q. Marking condition %q to True.", neg, shared.NegReadinessGate) + if pod == nil { + expectedCondition.Message = fmt.Sprintf("Unkown status for unkown pod.") + return expectedCondition + } + + if neg != nil { + if backendService != nil { + expectedCondition.Status = v1.ConditionTrue + expectedCondition.Reason = negReadyReason + expectedCondition.Message = fmt.Sprintf("Pod has become Healthy in NEG %q attached to BackendService %q. Marking condition %q to True.", neg.String(), backendService.String(), shared.NegReadinessGate) + } else { + expectedCondition.Status = v1.ConditionTrue + expectedCondition.Reason = negReadyUnhealthCheckedReason + expectedCondition.Message = fmt.Sprintf("Pod is in NEG %q. NEG is not attached to any BackendService with health checking. Marking condition %q to True.", neg.String(), shared.NegReadinessGate) + } return expectedCondition } @@ -252,6 +266,9 @@ func (r *readinessReflector) pollNeg(key negMeta) { // ensurePodNegCondition ensures the pod neg condition is as expected // TODO(freehan): also populate lastTransitionTime in the condition func (r *readinessReflector) ensurePodNegCondition(pod *v1.Pod, expectedCondition v1.PodCondition) error { + if pod == nil { + return nil + } // check if it is necessary to patch condition, ok := NegReadinessConditionStatus(pod) if ok && reflect.DeepEqual(expectedCondition, condition) { diff --git a/pkg/neg/readiness/reflector_test.go b/pkg/neg/readiness/reflector_test.go index b2f5ee75c3..5e30c48092 100644 --- a/pkg/neg/readiness/reflector_test.go +++ b/pkg/neg/readiness/reflector_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" apiv1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -84,12 +85,13 @@ func TestSyncPod(t *testing.T) { now := metav1.NewTime(fakeClock.Now()).Rfc3339Copy() for _, tc := range []struct { - desc string - mutateState func() - inputKey string - inputNeg string - expectExists bool - expectPod *v1.Pod + desc string + mutateState func() + inputKey string + inputNeg *meta.Key + inputBackendService *meta.Key + expectExists bool + expectPod *v1.Pod }{ { desc: "empty input", @@ -103,7 +105,7 @@ func TestSyncPod(t *testing.T) { client.CoreV1().Pods(testNamespace).Create(pod) }, inputKey: keyFunc(testNamespace, podName), - inputNeg: "", + inputNeg: nil, expectExists: true, expectPod: generatePod(testNamespace, podName, false, true, true), }, @@ -115,7 +117,7 @@ func TestSyncPod(t *testing.T) { client.CoreV1().Pods(testNamespace).Update(pod) }, inputKey: keyFunc(testNamespace, podName), - inputNeg: "", + inputNeg: nil, expectExists: true, expectPod: generatePod(testNamespace, podName, true, true, true), }, @@ -127,7 +129,7 @@ func TestSyncPod(t *testing.T) { client.CoreV1().Pods(testNamespace).Update(pod) }, inputKey: keyFunc(testNamespace, podName), - inputNeg: "", + inputNeg: nil, expectExists: true, expectPod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -161,7 +163,7 @@ func TestSyncPod(t *testing.T) { testlookUp.readinessGateEnabledNegs = []string{"neg1", "neg2"} }, inputKey: keyFunc(testNamespace, podName), - inputNeg: "", + inputNeg: nil, expectExists: true, expectPod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -186,16 +188,17 @@ func TestSyncPod(t *testing.T) { }, }, { - desc: "need to update pod: pod is healthy in a NEG", + desc: "need to update pod: pod is not attached to health check", mutateState: func() { pod := generatePod(testNamespace, podName, true, false, false) podLister.Update(pod) client.CoreV1().Pods(testNamespace).Update(pod) testlookUp.readinessGateEnabledNegs = []string{"neg1", "neg2"} }, - inputKey: keyFunc(testNamespace, podName), - inputNeg: "neg1", - expectExists: true, + inputKey: keyFunc(testNamespace, podName), + inputNeg: meta.ZonalKey("neg1", "zone1"), + inputBackendService: nil, + expectExists: true, expectPod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNamespace, @@ -210,9 +213,9 @@ func TestSyncPod(t *testing.T) { Conditions: []v1.PodCondition{ { Type: shared.NegReadinessGate, - Reason: negReadyReason, + Reason: negReadyUnhealthCheckedReason, Status: v1.ConditionTrue, - Message: fmt.Sprintf("Pod has become Healthy in NEG %q. Marking condition %q to True.", "neg1", shared.NegReadinessGate), + Message: fmt.Sprintf("Pod is in NEG %q. NEG is not attached to any BackendService with health checking. Marking condition %q to True.", meta.ZonalKey("neg1", "zone1").String(), shared.NegReadinessGate), }, }, }, @@ -229,7 +232,7 @@ func TestSyncPod(t *testing.T) { fakeClock.Step(unreadyTimeout) }, inputKey: keyFunc(testNamespace, podName), - inputNeg: "", + inputNeg: nil, expectExists: true, expectPod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -254,9 +257,43 @@ func TestSyncPod(t *testing.T) { }, }, }, + { + desc: "need to update pod: pod is healthy in NEG ", + mutateState: func() { + pod := generatePod(testNamespace, podName, true, false, false) + podLister.Update(pod) + client.CoreV1().Pods(testNamespace).Update(pod) + testlookUp.readinessGateEnabledNegs = []string{"neg1", "neg2"} + }, + inputKey: keyFunc(testNamespace, podName), + inputNeg: meta.ZonalKey("neg1", "zone1"), + inputBackendService: meta.GlobalKey("k8s-backendservice"), + expectExists: true, + expectPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: podName, + }, + Spec: v1.PodSpec{ + ReadinessGates: []v1.PodReadinessGate{ + {ConditionType: shared.NegReadinessGate}, + }, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: shared.NegReadinessGate, + Reason: negReadyReason, + Status: v1.ConditionTrue, + Message: fmt.Sprintf("Pod has become Healthy in NEG %q attached to BackendService %q. Marking condition %q to True.", meta.ZonalKey("neg1", "zone1").String(), meta.GlobalKey("k8s-backendservice").String(), shared.NegReadinessGate), + }, + }, + }, + }, + }, } { tc.mutateState() - err := testReadinessReflector.syncPod(tc.inputKey, tc.inputNeg) + err := testReadinessReflector.syncPod(tc.inputKey, tc.inputNeg, tc.inputBackendService) if err != nil { t.Errorf("For test case %q, expect err to be nil, but got %v", tc.desc, err) } diff --git a/pkg/neg/types/mock.go b/pkg/neg/types/mock.go index dbf3bc4aba..2ac4ab48d6 100644 --- a/pkg/neg/types/mock.go +++ b/pkg/neg/types/mock.go @@ -37,13 +37,8 @@ type NetworkEndpointEntry struct { type NetworkEndpointStore map[meta.Key][]NetworkEndpointEntry -func (s NetworkEndpointStore) AddNetworkEndpointHealthStatus(key meta.Key, entry NetworkEndpointEntry) { - v, ok := s[key] - if !ok { - v = []NetworkEndpointEntry{} - } - v = append(v, entry) - s[key] = v +func (s NetworkEndpointStore) AddNetworkEndpointHealthStatus(key meta.Key, entries []NetworkEndpointEntry) { + s[key] = entries } // GetNetworkEndpointStore is a helper function to access the NetworkEndpointStore of the mock NEG cloud