From f018e565fef8473c7ceabb83dd72ac2e19197d42 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 15 May 2019 18:13:22 -0700 Subject: [PATCH 1/6] initial commit of readiness reflector --- pkg/neg/readiness/interface.go | 53 +++++++ pkg/neg/readiness/poller.go | 236 +++++++++++++++++++++++++++++++ pkg/neg/readiness/reflector.go | 248 +++++++++++++++++++++++++++++++++ pkg/neg/readiness/utils.go | 141 +++++++++++++++++++ 4 files changed, 678 insertions(+) create mode 100644 pkg/neg/readiness/interface.go create mode 100644 pkg/neg/readiness/poller.go create mode 100644 pkg/neg/readiness/reflector.go create mode 100644 pkg/neg/readiness/utils.go diff --git a/pkg/neg/readiness/interface.go b/pkg/neg/readiness/interface.go new file mode 100644 index 0000000000..4c5f928ad8 --- /dev/null +++ b/pkg/neg/readiness/interface.go @@ -0,0 +1,53 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readiness + +import ( + "k8s.io/api/core/v1" + negtypes "k8s.io/ingress-gce/pkg/neg/types" +) + +// Reflector defines the interaction between readiness reflector and other NEG controller components +type Reflector interface { + // Run starts the reflector. + // Closing stopCh will signal the reflector to stop running. + Run(stopCh <-chan struct{}) + // SyncPod signals the reflector to evaluate pod and patch pod status if needed. + SyncPod(pod *v1.Pod) + // CommitPods signals the reflector that pods has been added to and NEG it is time to poll the NEG health status + // syncerKey is the key to uniquely identify the NEG syncer + // negName is the name of the network endpoint group (NEG) in the zone (e.g. k8s1-1234567-namespace-name-80-1234567) + // zone is the corresponding zone of the NEG resource (e.g. us-central1-b) + // endpointMap contains mapping from all network endpoints to pods which have been added into the NEG + CommitPods(syncerKey negtypes.NegSyncerKey, negName string, zone string, endpointMap negtypes.EndpointPodMap) +} + +// NegLookup defines an interface for looking up pod membership. +type NegLookup interface { + // ReadinessGateEnabledNegs returns a list of NEGs which has readiness gate enabled for the input pod's namespace and labels. + ReadinessGateEnabledNegs(namespace string, labels map[string]string) []string + // ReadinessGateEnabled returns true if the NEG requires readiness feedback + ReadinessGateEnabled(syncerKey negtypes.NegSyncerKey) bool +} + +type NoopReflector struct{} + +func (*NoopReflector) Run(<-chan struct{}) {} + +func (*NoopReflector) SyncPod(*v1.Pod) {} + +func (*NoopReflector) CommitPods(negtypes.NegSyncerKey, string, string, negtypes.EndpointPodMap) {} diff --git a/pkg/neg/readiness/poller.go b/pkg/neg/readiness/poller.go new file mode 100644 index 0000000000..2f9b934a60 --- /dev/null +++ b/pkg/neg/readiness/poller.go @@ -0,0 +1,236 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readiness + +import ( + "fmt" + computebeta "google.golang.org/api/compute/v0.beta" + "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/tools/cache" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/klog" + "strconv" + "strings" + "sync" +) + +const ( + healthyState = "HEALTHY" +) + +// negMeta references a GCE NEG resource +type negMeta struct { + SyncerKey negtypes.NegSyncerKey + // Name is the name of the NEG + Name string + // Zone is the zone of the NEG resource + Zone string +} + +func (n negMeta) String() string { + return fmt.Sprintf("%s-%s-%s", n.SyncerKey.String(), n.Name, n.Zone) +} + +// podStatusPatcher interface allows patching pod status +type podStatusPatcher interface { + // syncPod patches the neg condition in the pod status to be True. + // key is the key to the pod. It is the namespaced name in the format of "namespace/name" + // negName is the name of the NEG resource + syncPod(key, negName string) error +} + +// pollTarget is the target for polling +type pollTarget struct { + // endpointMap maps network endpoint to namespaced name of pod + endpointMap negtypes.EndpointPodMap + // polling indicates if the NEG is being polled + polling bool +} + +// poller tracks the negs and corresponding targets needed to be polled. +type poller struct { + lock sync.Mutex + // pollMap contains negs and corresponding targets needed to be polled. + // all operations(read, write) to the pollMap are lock protected. + pollMap map[negMeta]*pollTarget + + podLister cache.Indexer + lookup NegLookup + patcher podStatusPatcher + negCloud negtypes.NetworkEndpointGroupCloud +} + +func NewPoller(podLister cache.Indexer, lookup NegLookup, patcher podStatusPatcher, negCloud negtypes.NetworkEndpointGroupCloud) *poller { + return &poller{ + pollMap: make(map[negMeta]*pollTarget), + podLister: podLister, + lookup: lookup, + patcher: patcher, + negCloud: negCloud, + } +} + +// RegisterNegEndpoints registered the endpoints that needed to be poll for the NEG with lock +func (p *poller) RegisterNegEndpoints(key negMeta, endpointMap negtypes.EndpointPodMap) { + p.lock.Lock() + defer p.lock.Unlock() + p.registerNegEndpoints(key, endpointMap) +} + +// registerNegEndpoints registered the endpoints that needed to be poll for the NEG +// It returns false if there is no endpoints needed to be polled, returns true if otherwise. +// Assumes p.lock is held when calling this method. +func (p *poller) registerNegEndpoints(key negMeta, endpointMap negtypes.EndpointPodMap) bool { + endpointsToPoll := needToPoll(key.SyncerKey, endpointMap, p.lookup, p.podLister) + if len(endpointsToPoll) == 0 { + delete(p.pollMap, key) + return false + } + + if v, ok := p.pollMap[key]; ok { + v.endpointMap = endpointsToPoll + } else { + p.pollMap[key] = &pollTarget{endpointMap: endpointsToPoll} + } + return true +} + +// ScanForWork returns the list of NEGs that should be polled +func (p *poller) ScanForWork() []negMeta { + p.lock.Lock() + defer p.lock.Unlock() + var ret []negMeta + for key, target := range p.pollMap { + if target.polling { + continue + } + if p.registerNegEndpoints(key, target.endpointMap) { + ret = append(ret, key) + } + } + return ret +} + +// Poll polls a NEG and returns error plus whether retry is needed +// This function is threadsafe. +func (p *poller) Poll(key negMeta) (retry bool, err error) { + if !p.markPolling(key) { + klog.V(4).Infof("NEG %q in zone %q as is already being polled or no longer needed to be polled.", key.Name, key.Zone) + return true, nil + } + defer p.unMarkPolling(key) + + // TODO(freehan): refactor errList from pkg/neg/syncers to be reused here + var errList []error + klog.V(2).Infof("polling NEG %q in zone %q", key.Name, key.Zone) + // TODO(freehan): filter the NEs that are in interest once the API supports it + res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true) + if err != nil { + return true, err + } + + // Traverse the response and check if the endpoints in interest are HEALTHY + func() { + p.lock.Lock() + defer p.lock.Unlock() + var healthyCount int + for _, r := range res { + healthy, err := p.processHealthStatus(key, r) + if healthy && err == nil { + healthyCount++ + } + if err != nil { + errList = append(errList, err) + } + } + if healthyCount != len(p.pollMap[key].endpointMap) { + retry = true + } + }() + return retry, utilerrors.NewAggregate(errList) +} + +// processHealthStatus evaluates the health status of the input network endpoint. +// Assumes p.lock is held when calling this method. +func (p *poller) processHealthStatus(key negMeta, healthStatus *computebeta.NetworkEndpointWithHealthStatus) (healthy bool, err error) { + ne := negtypes.NetworkEndpoint{ + IP: healthStatus.NetworkEndpoint.IpAddress, + Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10), + Node: healthStatus.NetworkEndpoint.Instance, + } + podName, ok := p.getPod(key, ne) + if !ok { + return false, nil + } + + for _, hs := range healthStatus.Healths { + if hs == nil { + continue + } + if hs.BackendService == nil { + klog.Warningf("Backend service is nil in health status of network endpoint %v: %v", ne, hs) + continue + } + + // This assumes the ingress backend service uses the NEG naming scheme. Hence the backend service share the same name as NEG. + if strings.Contains(hs.BackendService.BackendService, key.Name) { + if hs.HealthState == healthyState { + healthy = true + err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), key.Name) + return healthy, err + } + } + + } + return false, nil +} + +// getPod returns the namespaced name of a pod corresponds to an endpoint and whether the pod is registered +// Assumes p.lock is held when calling this method. +func (p *poller) getPod(key negMeta, endpoint negtypes.NetworkEndpoint) (namespacedName types.NamespacedName, exists bool) { + t, ok := p.pollMap[key] + if !ok { + return types.NamespacedName{}, false + } + ret, ok := t.endpointMap[endpoint] + return ret, ok +} + +// markPolling returns true if the NEG is successfully marked as polling +func (p *poller) markPolling(key negMeta) bool { + p.lock.Lock() + defer p.lock.Unlock() + t, ok := p.pollMap[key] + if !ok { + return false + } + if t.polling { + return false + } + t.polling = true + return true +} + +// unMarkPolling unmarks the NEG +func (p *poller) unMarkPolling(key negMeta) { + p.lock.Lock() + defer p.lock.Unlock() + if t, ok := p.pollMap[key]; ok { + t.polling = false + } +} diff --git a/pkg/neg/readiness/reflector.go b/pkg/neg/readiness/reflector.go new file mode 100644 index 0000000000..caf44b2ef9 --- /dev/null +++ b/pkg/neg/readiness/reflector.go @@ -0,0 +1,248 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readiness + +import ( + "sync" + "time" + + "fmt" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/ingress-gce/pkg/context" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/ingress-gce/pkg/neg/types/shared" + "k8s.io/klog" + "reflect" +) + +const ( + maxRetries = 15 + negReadyReason = "LoadBalancerNegReady" + negNotReadyReason = "LoadBalancerNegNotReady" +) + +// readinessReflector implements the Reflector interface +type readinessReflector struct { + // podUpdateLock ensures that at any time there is only one + podUpdateLock sync.Mutex + client kubernetes.Interface + + // pollerLock ensures there is only poll + pollerLock sync.Mutex + poller *poller + + podLister cache.Indexer + lookup NegLookup + + eventBroadcaster record.EventBroadcaster + eventRecorder record.EventRecorder + + queue workqueue.RateLimitingInterface +} + +func NewReadinessReflector(cc *context.ControllerContext, lookup NegLookup) Reflector { + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(klog.Infof) + broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ + Interface: cc.KubeClient.CoreV1().Events(""), + }) + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "neg-readiness-reflector"}) + reflector := &readinessReflector{ + client: cc.KubeClient, + podLister: cc.PodInformer.GetIndexer(), + lookup: lookup, + eventBroadcaster: broadcaster, + eventRecorder: recorder, + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + } + poller := NewPoller(cc.PodInformer.GetIndexer(), lookup, reflector, cc.Cloud) + reflector.poller = poller + return reflector +} + +func (r *readinessReflector) Run(stopCh <-chan struct{}) { + defer r.queue.ShutDown() + klog.V(2).Infof("Starting NEG readiness reflector") + defer klog.V(2).Infof("Shutting down NEG readiness reflector") + + go wait.Until(r.worker, time.Second, stopCh) + <-stopCh +} + +func (r *readinessReflector) worker() { + for r.processNextWorkItem() { + } +} + +func (r *readinessReflector) processNextWorkItem() bool { + key, quit := r.queue.Get() + if quit { + return false + } + defer r.queue.Done(key) + + err := r.syncPod(key.(string), "") + r.handleErr(err, key) + return true +} + +// handleErr handles errors from syncPod +func (r *readinessReflector) handleErr(err error, key interface{}) { + if err == nil { + r.queue.Forget(key) + return + } + + if r.queue.NumRequeues(key) < maxRetries { + klog.V(2).Infof("Error syncing pod %q, retrying. Error: %v", key, err) + r.queue.AddRateLimited(key) + return + } + + klog.Warningf("Dropping pod %q out of the queue: %v", key, err) + r.queue.Forget(key) +} + +// syncPod process pod and patch the NEG readiness condition if needed +// if neg is specified, it means pod is Healthy in the NEG. +func (r *readinessReflector) syncPod(key string, neg string) (err error) { + // podUpdateLock to ensure there is no race in pod status update + r.podUpdateLock.Lock() + defer r.podUpdateLock.Unlock() + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + pod, exists, err := getPodFromStore(r.podLister, namespace, name) + if err != nil { + return err + } + if !exists { + klog.V(5).Infof("Pod %q is no longer exists. Skipping", key) + return nil + } + + // This is to prevent if the pod got updated after being added to the queue + if !needToProcess(pod) { + return nil + } + + klog.V(4).Infof("Syncing Pod %q", key) + expectedCondition := v1.PodCondition{Type: shared.NegReadinessGate} + var message, reason string + + if len(neg) > 0 { + expectedCondition.Status = v1.ConditionTrue + reason = negReadyReason + message = fmt.Sprintf("Pod has become Healthy in NEG %q. Marking condition %q to True.", neg, shared.NegReadinessGate) + } else { + negs := r.lookup.ReadinessGateEnabledNegs(pod.Namespace, pod.Labels) + // mark pod as ready if it belongs to no NEGs + if len(negs) == 0 { + expectedCondition.Status = v1.ConditionTrue + reason = negReadyReason + message = fmt.Sprintf("Pod does not belong to any NEG. Marking condition %q to True.", shared.NegReadinessGate) + } else { + // do not patch condition status in this case to prevent race condition: + // 1. poller marks a pod ready + // 2. syncPod gets call and does not retrieve the updated pod spec with true neg readiness condition + // 3. syncPod patches the neg readiness condition to be false + reason = negNotReadyReason + message = fmt.Sprintf("Waiting for pod to become healthy in at least one of the NEG(s): %v", negs) + } + } + expectedCondition.Reason = reason + expectedCondition.Message = message + return r.ensurePodNegCondition(pod, expectedCondition) +} + +// SyncPod filter the pods that needed to be processed and put it into queue +func (r *readinessReflector) SyncPod(pod *v1.Pod) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pod) + if err != nil { + klog.Errorf("Failed to generate pod key: %v", err) + return + } + + if !needToProcess(pod) { + klog.V(6).Infof("Skip processing pod %q", key) + } + r.queue.Add(key) +} + +// CommitPods registers the current network endpoints in a NEG and starts polling them if needed +func (r *readinessReflector) CommitPods(syncerKey negtypes.NegSyncerKey, negName string, zone string, endpointMap negtypes.EndpointPodMap) { + key := negMeta{ + SyncerKey: syncerKey, + Name: negName, + Zone: zone, + } + r.poller.RegisterNegEndpoints(key, endpointMap) + r.poll() +} + +// poll spins off go routines to poll NEGs +func (r *readinessReflector) poll() { + r.pollerLock.Lock() + defer r.pollerLock.Unlock() + for _, key := range r.poller.ScanForWork() { + go r.pollNeg(key) + } +} + +// pollNeg polls a NEG +func (r *readinessReflector) pollNeg(key negMeta) { + klog.V(4).Infof("Polling NEG %q", key.String()) + retry, err := r.poller.Poll(key) + if err != nil { + klog.Errorf("Failed to poll %q: %v", key, err) + } + if retry { + r.poll() + } +} + +// ensurePodNegCondition ensures the pod neg condition is as expected +// TODO(freehan): also populate lastTransitionTime in the condition +func (r *readinessReflector) ensurePodNegCondition(pod *v1.Pod, expectedCondition v1.PodCondition) error { + // check if it is necessary to patch + condition, ok := NegReadinessConditionStatus(pod) + if ok && reflect.DeepEqual(expectedCondition, condition) { + klog.V(4).Infof("NEG condition for pod %s/%s is expected, skip patching", pod.Namespace, pod.Name) + return nil + } + + // calculate patch bytes, send patch and record event + oldStatus := pod.Status.DeepCopy() + SetNegReadinessConditionStatus(pod, expectedCondition) + patchBytes, err := preparePatchBytesforPodStatus(*oldStatus, pod.Status) + if err != nil { + return fmt.Errorf("failed to prepare patch bytes for pod %v: %v", pod, err) + } + r.eventRecorder.Eventf(pod, v1.EventTypeNormal, expectedCondition.Reason, expectedCondition.Message) + _, _, err = patchPodStatus(r.client, pod.Namespace, pod.Name, patchBytes) + return err +} diff --git a/pkg/neg/readiness/utils.go b/pkg/neg/readiness/utils.go new file mode 100644 index 0000000000..91902c09f3 --- /dev/null +++ b/pkg/neg/readiness/utils.go @@ -0,0 +1,141 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readiness + +import ( + "fmt" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/ingress-gce/pkg/neg/types/shared" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" +) + +// NegReadinessConditionStatus return (cond, true) if neg condition exists, otherwise (_, false) +func NegReadinessConditionStatus(pod *v1.Pod) (negCondition v1.PodCondition, negConditionStatus bool) { + for _, condition := range pod.Status.Conditions { + if condition.Type == shared.NegReadinessGate { + return condition, true + } + } + return v1.PodCondition{}, false +} + +// evalNegReadinessGate returns if the pod readiness gate includes the NEG readiness condition and the condition status is true +func evalNegReadinessGate(pod *v1.Pod) (negReady bool, readinessGateExists bool) { + for _, gate := range pod.Spec.ReadinessGates { + if gate.ConditionType == shared.NegReadinessGate { + readinessGateExists = true + } + } + if condition, ok := NegReadinessConditionStatus(pod); ok { + if condition.Status == v1.ConditionTrue { + negReady = true + } + } + return negReady, readinessGateExists +} + +func keyFunc(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + +// getPodFromStore return (pod, exists, nil) if it is able to successfully retrieve it from podLister. +func getPodFromStore(podLister cache.Indexer, namespace, name string) (pod *v1.Pod, exists bool, err error) { + if podLister == nil { + return nil, false, fmt.Errorf("podLister is nil") + } + key := keyFunc(namespace, name) + obj, exists, err := podLister.GetByKey(key) + if err != nil { + return nil, false, fmt.Errorf("failed to retrieve pod %q from store: %v", key, err) + } + + if !exists { + return nil, false, nil + } + + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, false, fmt.Errorf("Failed to convert obj type %T to *v1.Pod", obj) + } + return pod, true, nil +} + +// SetNegReadinessConditionStatus sets the status of the NEG readiness condition +func SetNegReadinessConditionStatus(pod *v1.Pod, condition v1.PodCondition) { + for i, cond := range pod.Status.Conditions { + if cond.Type == shared.NegReadinessGate { + pod.Status.Conditions[i] = condition + return + } + } + pod.Status.Conditions = append(pod.Status.Conditions, condition) +} + +// patchPodStatus patches pod status with given patchBytes +func patchPodStatus(c clientset.Interface, namespace, name string, patchBytes []byte) (*v1.Pod, []byte, error) { + updatedPod, err := c.CoreV1().Pods(namespace).Patch(name, types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err) + } + return updatedPod, patchBytes, nil +} + +// preparePatchBytesforPodStatus generates patch bytes based on the old and new pod status +func preparePatchBytesforPodStatus(oldPodStatus, newPodStatus v1.PodStatus) ([]byte, error) { + patchBytes, err := utils.StrategicMergePatchBytes(v1.Pod{Status: oldPodStatus}, v1.Pod{Status: newPodStatus}, v1.Pod{}) + return patchBytes, err +} + +// needToPoll filter out the network endpoint that needs to be polled based on the following conditions: +// 1. neg syncer has readiness gate enabled +// 2. the pod exists +// 3. the pod has neg readiness gate +// 4. the pod's neg readiness condition is not True +func needToPoll(syncerKey negtypes.NegSyncerKey, endpointMap negtypes.EndpointPodMap, lookup NegLookup, podLister cache.Indexer) negtypes.EndpointPodMap { + if !lookup.ReadinessGateEnabled(syncerKey) { + return negtypes.EndpointPodMap{} + } + removeIrrelevantEndpoints(endpointMap, podLister) + return endpointMap +} + +// removeIrrelevantEndpoints will filter out the endpoints that does not need health status polling from the input endpoint map +func removeIrrelevantEndpoints(endpointMap negtypes.EndpointPodMap, podLister cache.Indexer) { + for endpoint, namespacedName := range endpointMap { + pod, exists, err := getPodFromStore(podLister, namespacedName.Namespace, namespacedName.Name) + if err != nil { + klog.Warningf("Failed to retrieve pod %q from store: %v", namespacedName.String(), err) + } + if err == nil && exists && needToProcess(pod) { + continue + } + delete(endpointMap, endpoint) + } +} + +// needToProcess check if the pod needs to be processed by readiness reflector +// If pod has neg readiness gate and its condition is False, then return true. +func needToProcess(pod *v1.Pod) bool { + negConditionReady, readinessGateExists := evalNegReadinessGate(pod) + return readinessGateExists && !negConditionReady +} From 4e7766c65036b6f5cf49bd00748d344a63ec7bbf Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 15 May 2019 18:15:08 -0700 Subject: [PATCH 2/6] modify common NEG types to accomendate readiness gate --- pkg/neg/types/types.go | 63 ++++++++++++++++++++++++++++++++----- pkg/neg/types/types_test.go | 52 +++++++++++++++--------------- 2 files changed, 82 insertions(+), 33 deletions(-) diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index 9fca1eca8b..fcfeb44535 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -18,9 +18,12 @@ package types import ( "fmt" - "k8s.io/ingress-gce/pkg/annotations" "reflect" "strconv" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/ingress-gce/pkg/annotations" ) // SvcPortMap is a map of ServicePort:TargetPort @@ -33,34 +36,48 @@ type PortInfo struct { TargetPort string // NegName is the name of the NEG NegName string + // ReadinessGate indicates if the NEG associated with the port has NEG readiness gate enabled + // This is enabled with service port is reference by ingress. + // If the service port is only exposed as stand alone NEG, it should not be enbled. + ReadinessGate bool } // PortInfoMap is a map of ServicePort:PortInfo type PortInfoMap map[int32]PortInfo -func NewPortInfoMap(namespace, name string, svcPortMap SvcPortMap, namer NetworkEndpointGroupNamer) PortInfoMap { +func NewPortInfoMap(namespace, name string, svcPortMap SvcPortMap, namer NetworkEndpointGroupNamer, readinessGate bool) PortInfoMap { ret := PortInfoMap{} for svcPort, targetPort := range svcPortMap { ret[svcPort] = PortInfo{ - TargetPort: targetPort, - NegName: namer.NEG(namespace, name, svcPort), + TargetPort: targetPort, + NegName: namer.NEG(namespace, name, svcPort), + ReadinessGate: readinessGate, } } return ret } // Merge merges p2 into p1 PortInfoMap -// It assumes the same key will have the same PortInfo +// It assumes the same key (service port) will have the same target port and negName // If not, it will throw error +// If a key in p1 or p2 has readiness gate enabled, the merged port info will also has readiness gate enabled func (p1 PortInfoMap) Merge(p2 PortInfoMap) error { var err error for svcPort, portInfo := range p2 { + mergedInfo := PortInfo{} if existingPortInfo, ok := p1[svcPort]; ok { - if !reflect.DeepEqual(existingPortInfo, portInfo) { - return fmt.Errorf("key %d in PortInfoMaps has different values. Existing value %v while new value: %v", svcPort, existingPortInfo, portInfo) + if existingPortInfo.TargetPort != portInfo.TargetPort { + return fmt.Errorf("for service port %d, target port in existing map is %q, but the merge map has %q", svcPort, existingPortInfo.TargetPort, portInfo.TargetPort) + } + if existingPortInfo.NegName != portInfo.NegName { + return fmt.Errorf("for service port %d, NEG name in existing map is %q, but the merge map has %q", svcPort, existingPortInfo.NegName, portInfo.NegName) } + mergedInfo.ReadinessGate = existingPortInfo.ReadinessGate } - p1[svcPort] = portInfo + mergedInfo.TargetPort = portInfo.TargetPort + mergedInfo.NegName = portInfo.NegName + mergedInfo.ReadinessGate = mergedInfo.ReadinessGate || portInfo.ReadinessGate + p1[svcPort] = mergedInfo } return err } @@ -87,3 +104,33 @@ func (p1 PortInfoMap) ToPortNegMap() annotations.PortNegMap { } return ret } + +// NegsWithReadinessGate returns the NegNames which has readiness gate enabled +func (p1 PortInfoMap) NegsWithReadinessGate() sets.String { + ret := sets.NewString() + for _, info := range p1 { + if info.ReadinessGate { + ret.Insert(info.NegName) + } + } + return ret +} + +// NegSyncerKey includes information to uniquely identify a NEG syncer +type NegSyncerKey struct { + // Namespace of service + Namespace string + // Name of service + Name string + // Service port + Port int32 + // Service target port + TargetPort string +} + +func (key NegSyncerKey) String() string { + return fmt.Sprintf("%s/%s-%v/%s", key.Namespace, key.Name, key.Port, key.TargetPort) +} + +// EndpointPodMap is a map from network endpoint to a namespaced name of a pod +type EndpointPodMap map[NetworkEndpoint]types.NamespacedName diff --git a/pkg/neg/types/types_test.go b/pkg/neg/types/types_test.go index be512e3776..859a2cc352 100644 --- a/pkg/neg/types/types_test.go +++ b/pkg/neg/types/types_test.go @@ -33,6 +33,7 @@ func (*negNamer) IsNEG(name string) bool { return false } +// TODO(freehan): include test cases with different ReadinessGate setup func TestPortInfoMapMerge(t *testing.T) { namer := &negNamer{} namespace := "namespace" @@ -54,22 +55,22 @@ func TestPortInfoMapMerge(t *testing.T) { { "empty map union a non-empty map is the non-empty map", PortInfoMap{}, - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer, false), false, }, { "union of two non-empty maps", - NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8080: "9000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8080: "9000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, namer, false), false, }, { "error on inconsistent value", - NewPortInfoMap(namespace, name, SvcPortMap{80: "3000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8000: "9000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "3000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8000: "9000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, namer, false), true, }, } @@ -94,6 +95,7 @@ func TestPortInfoMapMerge(t *testing.T) { } } +// TODO(freehan): include test cases with different ReadinessGate setup func TestPortInfoMapDifference(t *testing.T) { namer := &negNamer{} namespace := "namespace" @@ -113,44 +115,44 @@ func TestPortInfoMapDifference(t *testing.T) { { "empty map difference a non-empty map is empty map", PortInfoMap{}, - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer, false), PortInfoMap{}, }, { "non-empty map difference a non-empty map is the non-empty map", - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer, false), PortInfoMap{}, - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer, false), }, { "difference of two non-empty maps with the same elements", - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000"}, namer, false), PortInfoMap{}, }, { "difference of two non-empty maps with no elements in common returns p1", - NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8080: "9000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8080: "9000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer, false), }, { "difference of two non-empty maps with elements in common", - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8080: "9000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "3000", 5000: "6000", 8080: "9000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 8080: "9000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{443: "3000", 5000: "6000"}, namer, false), }, { "difference of two non-empty maps with a key in common but different in value", - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "8080", 8080: "9000"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "8080", 8080: "9000"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport"}, namer, false), }, { "difference of two non-empty maps with 2 keys in common but different in values", - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "8443"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "8080", 443: "9443"}, namer), - NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "8443"}, namer), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "8443"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "8080", 443: "9443"}, namer, false), + NewPortInfoMap(namespace, name, SvcPortMap{80: "namedport", 443: "8443"}, namer, false), }, } From fa03585d59c04aa441899cd0a995b82d2cbb0236 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 15 May 2019 18:16:19 -0700 Subject: [PATCH 3/6] include non-terminating pods to add in NEG --- pkg/neg/syncers/utils.go | 94 +++++++++++++++++++++++++---------- pkg/neg/syncers/utils_test.go | 3 +- 2 files changed, 69 insertions(+), 28 deletions(-) diff --git a/pkg/neg/syncers/utils.go b/pkg/neg/syncers/utils.go index de8748a629..c85bc8c870 100644 --- a/pkg/neg/syncers/utils.go +++ b/pkg/neg/syncers/utils.go @@ -23,7 +23,9 @@ import ( "time" "google.golang.org/api/compute/v0.beta" + "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -43,18 +45,6 @@ const ( negIPPortNetworkEndpointType = "GCE_VM_IP_PORT" ) -// NegSyncerKey includes information to uniquely identify a NEG -type NegSyncerKey struct { - Namespace string - Name string - Port int32 - TargetPort string -} - -func (key NegSyncerKey) String() string { - return fmt.Sprintf("%s/%s-%v/%s", key.Namespace, key.Name, key.Port, key.TargetPort) -} - // encodeEndpoint encodes ip and instance into a single string func encodeEndpoint(ip, instance, port string) string { return strings.Join([]string{ip, instance, port}, separator) @@ -170,11 +160,12 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService } // toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map -func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string) (map[string]negtypes.NetworkEndpointSet, error) { +func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.ZoneGetter, targetPort string, podLister cache.Indexer) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) { zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{} + networkEndpointPodMap := negtypes.EndpointPodMap{} if endpoints == nil { klog.Errorf("Endpoint object is nil") - return zoneNetworkEndpointMap, nil + return zoneNetworkEndpointMap, networkEndpointPodMap, nil } targetPortNum, _ := strconv.Atoi(targetPort) for _, subset := range endpoints.Subsets { @@ -202,22 +193,43 @@ func toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints, zoneGetter negtypes.Zo if len(matchPort) == 0 { continue } - for _, address := range subset.Addresses { - if address.NodeName == nil { - klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", address.IP, endpoints.Namespace, endpoints.Name) - continue - } - zone, err := zoneGetter.GetZoneForNode(*address.NodeName) - if err != nil { - return nil, err - } - if zoneNetworkEndpointMap[zone] == nil { - zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet() + + // processAddressFunc adds the qualified endpoints from the input list into the endpointSet group by zone + processAddressFunc := func(addresses []v1.EndpointAddress, includeAllEndpoints bool) error { + for _, address := range addresses { + if address.NodeName == nil { + klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", address.IP, endpoints.Namespace, endpoints.Name) + continue + } + if address.TargetRef == nil { + klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", address.IP, endpoints.Namespace, endpoints.Name) + continue + } + zone, err := zoneGetter.GetZoneForNode(*address.NodeName) + if err != nil { + return fmt.Errorf("failed to retrieve associated zone of node %q: %v", *address.NodeName, err) + } + if zoneNetworkEndpointMap[zone] == nil { + zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet() + } + + if includeAllEndpoints || shouldPodBeInNeg(podLister, address.TargetRef.Namespace, address.TargetRef.Name) { + networkEndpoint := negtypes.NetworkEndpoint{IP: address.IP, Port: matchPort, Node: *address.NodeName} + zoneNetworkEndpointMap[zone].Insert(networkEndpoint) + networkEndpointPodMap[networkEndpoint] = types.NamespacedName{Namespace: address.TargetRef.Namespace, Name: address.TargetRef.Name} + } } - zoneNetworkEndpointMap[zone].Insert(negtypes.NetworkEndpoint{IP: address.IP, Port: matchPort, Node: *address.NodeName}) + return nil + } + + if err := processAddressFunc(subset.Addresses, true); err != nil { + return nil, nil, err + } + if err := processAddressFunc(subset.NotReadyAddresses, false); err != nil { + return nil, nil, err } } - return zoneNetworkEndpointMap, nil + return zoneNetworkEndpointMap, networkEndpointPodMap, nil } // retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map @@ -265,3 +277,31 @@ func makeEndpointBatch(endpoints negtypes.NetworkEndpointSet) (map[negtypes.Netw } return endpointBatch, nil } + +func keyFunc(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +} + +// shouldPodBeInNeg returns true if pod is not in graceful termination state +func shouldPodBeInNeg(podLister cache.Indexer, namespace, name string) bool { + key := keyFunc(namespace, name) + obj, exists, err := podLister.GetByKey(key) + if err != nil { + klog.Errorf("Failed to retrieve pod %s from pod lister: %v", key, err) + return false + } + if !exists { + return false + } + pod, ok := obj.(*v1.Pod) + if !ok { + klog.Errorf("Failed to convert obj %s to v1.Pod. The object type is %T", key, obj) + return false + } + + // if pod has DeletionTimestamp, that means pod is in graceful termination state. + if pod.DeletionTimestamp != nil { + return false + } + return true +} diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 812b48b1f9..290cf60465 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -282,6 +282,7 @@ func TestNetworkEndpointCalculateDifference(t *testing.T) { } } +// TODO(freehan): add test cases with Endpoints with NotReady addresses func TestToZoneNetworkEndpointMapUtil(t *testing.T) { zoneGetter := negtypes.NewFakeZoneGetter() testCases := []struct { @@ -310,7 +311,7 @@ func TestToZoneNetworkEndpointMapUtil(t *testing.T) { } for _, tc := range testCases { - res, _ := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort) + res, _, _ := toZoneNetworkEndpointMap(getDefaultEndpoint(), zoneGetter, tc.targetPort, nil) if !reflect.DeepEqual(res, tc.expect) { t.Errorf("Expect %v, but got %v.", tc.expect, res) From f41819f7403eec8e06dd42d840353fd6012848f5 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 15 May 2019 18:17:15 -0700 Subject: [PATCH 4/6] move NegSyncerKey to pkg/neg/types --- pkg/neg/syncers/batch.go | 6 ++--- pkg/neg/syncers/batch_test.go | 46 +++++++++++++++++++++------------- pkg/neg/syncers/syncer.go | 5 ++-- pkg/neg/syncers/syncer_test.go | 2 +- 4 files changed, 36 insertions(+), 23 deletions(-) diff --git a/pkg/neg/syncers/batch.go b/pkg/neg/syncers/batch.go index 30e55f4cf8..1e7a698aab 100644 --- a/pkg/neg/syncers/batch.go +++ b/pkg/neg/syncers/batch.go @@ -23,7 +23,7 @@ import ( "sync" "time" - compute "google.golang.org/api/compute/v0.beta" + "google.golang.org/api/compute/v0.beta" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/clock" @@ -39,7 +39,7 @@ import ( // batchSyncer handles synchorizing NEGs for one service port. It handles sync, resync and retry on error. // It syncs NEG in batch and waits for all operation to complete before continue to the next batch. type batchSyncer struct { - NegSyncerKey + negtypes.NegSyncerKey negName string serviceLister cache.Indexer @@ -59,7 +59,7 @@ type batchSyncer struct { retryCount int } -func NewBatchSyncer(svcPort NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *batchSyncer { +func NewBatchSyncer(svcPort negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *batchSyncer { klog.V(2).Infof("New syncer for service %s/%s Port %s NEG %q", svcPort.Namespace, svcPort.Name, svcPort.TargetPort, networkEndpointGroupName) return &batchSyncer{ NegSyncerKey: svcPort, diff --git a/pkg/neg/syncers/batch_test.go b/pkg/neg/syncers/batch_test.go index 3dd5d2e4cb..8334b0234c 100644 --- a/pkg/neg/syncers/batch_test.go +++ b/pkg/neg/syncers/batch_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -41,7 +42,7 @@ func NewTestSyncer() *batchSyncer { DefaultBackendSvcPortID: defaultBackend, } context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) - svcPort := NegSyncerKey{ + svcPort := negtypes.NegSyncerKey{ Namespace: testServiceNamespace, Name: testServiceName, Port: 80, @@ -257,20 +258,24 @@ func getDefaultEndpoint() *apiv1.Endpoints { { Addresses: []apiv1.EndpointAddress{ { - IP: "10.100.1.1", - NodeName: &instance1, + IP: "10.100.1.1", + NodeName: &instance1, + TargetRef: &v1.ObjectReference{}, }, { - IP: "10.100.1.2", - NodeName: &instance1, + IP: "10.100.1.2", + NodeName: &instance1, + TargetRef: &v1.ObjectReference{}, }, { - IP: "10.100.2.1", - NodeName: &instance2, + IP: "10.100.2.1", + NodeName: &instance2, + TargetRef: &v1.ObjectReference{}, }, { - IP: "10.100.3.1", - NodeName: &instance3, + IP: "10.100.3.1", + NodeName: &instance3, + TargetRef: &v1.ObjectReference{}, }, }, Ports: []apiv1.EndpointPort{ @@ -280,16 +285,19 @@ func getDefaultEndpoint() *apiv1.Endpoints { Protocol: apiv1.ProtocolTCP, }, }, + NotReadyAddresses: []apiv1.EndpointAddress{}, }, { Addresses: []apiv1.EndpointAddress{ { - IP: "10.100.2.2", - NodeName: &instance2, + IP: "10.100.2.2", + NodeName: &instance2, + TargetRef: &v1.ObjectReference{}, }, { - IP: "10.100.4.1", - NodeName: &instance4, + IP: "10.100.4.1", + NodeName: &instance4, + TargetRef: &v1.ObjectReference{}, }, }, Ports: []apiv1.EndpointPort{ @@ -299,16 +307,19 @@ func getDefaultEndpoint() *apiv1.Endpoints { Protocol: apiv1.ProtocolTCP, }, }, + NotReadyAddresses: []apiv1.EndpointAddress{}, }, { Addresses: []apiv1.EndpointAddress{ { - IP: "10.100.3.2", - NodeName: &instance3, + IP: "10.100.3.2", + NodeName: &instance3, + TargetRef: &v1.ObjectReference{}, }, { - IP: "10.100.4.2", - NodeName: &instance4, + IP: "10.100.4.2", + NodeName: &instance4, + TargetRef: &v1.ObjectReference{}, }, }, Ports: []apiv1.EndpointPort{ @@ -318,6 +329,7 @@ func getDefaultEndpoint() *apiv1.Endpoints { Protocol: apiv1.ProtocolTCP, }, }, + NotReadyAddresses: []apiv1.EndpointAddress{}, }, }, } diff --git a/pkg/neg/syncers/syncer.go b/pkg/neg/syncers/syncer.go index fff59a687c..00a3b289ee 100644 --- a/pkg/neg/syncers/syncer.go +++ b/pkg/neg/syncers/syncer.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" ) @@ -36,7 +37,7 @@ type syncerCore interface { // It handles state transitions and backoff retry operations. type syncer struct { // metadata - NegSyncerKey + negtypes.NegSyncerKey negName string // NEG sync function @@ -57,7 +58,7 @@ type syncer struct { backoff backoffHandler } -func newSyncer(negSyncerKey NegSyncerKey, networkEndpointGroupName string, serviceLister cache.Indexer, recorder record.EventRecorder, core syncerCore) *syncer { +func newSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, serviceLister cache.Indexer, recorder record.EventRecorder, core syncerCore) *syncer { return &syncer{ NegSyncerKey: negSyncerKey, negName: networkEndpointGroupName, diff --git a/pkg/neg/syncers/syncer_test.go b/pkg/neg/syncers/syncer_test.go index 9c1eda3be4..6db85186a8 100644 --- a/pkg/neg/syncers/syncer_test.go +++ b/pkg/neg/syncers/syncer_test.go @@ -65,7 +65,7 @@ func newSyncerTester() *syncerTester { DefaultBackendSvcPortID: defaultBackend, } context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) - negSyncerKey := NegSyncerKey{ + negSyncerKey := negtypes.NegSyncerKey{ Namespace: testServiceNamespace, Name: testServiceName, Port: 80, From e1bdd068cacfb76b7dcf2d8cf5cec7a010d94642 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 15 May 2019 18:17:53 -0700 Subject: [PATCH 5/6] adapt NEG controller and Syncer Manager to accomendate readiness reflector --- pkg/neg/controller.go | 73 ++++++++++++++++++++---------------- pkg/neg/controller_test.go | 13 ++++--- pkg/neg/manager.go | 77 ++++++++++++++++++++++++++++++++++---- pkg/neg/manager_test.go | 22 +++++++---- 4 files changed, 132 insertions(+), 53 deletions(-) diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 90382798a3..757f078c34 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/neg/metrics" + "k8s.io/ingress-gce/pkg/neg/readiness" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" "k8s.io/klog" @@ -55,12 +56,10 @@ type Controller struct { namer negtypes.NetworkEndpointGroupNamer zoneGetter negtypes.ZoneGetter - ingressSynced cache.InformerSynced - serviceSynced cache.InformerSynced - endpointSynced cache.InformerSynced - ingressLister cache.Indexer - serviceLister cache.Indexer - client kubernetes.Interface + hasSynced func() bool + ingressLister cache.Indexer + serviceLister cache.Indexer + client kubernetes.Interface // serviceQueue takes service key as work item. Service key with format "namespace/name". serviceQueue workqueue.RateLimitingInterface @@ -69,6 +68,9 @@ type Controller struct { // syncTracker tracks the latest time that service and endpoint changes are processed syncTracker utils.TimeTracker + + // reflector handles NEG readiness gate and conditions for pods in NEG. + reflector readiness.Reflector } // NewController returns a network endpoint group controller. @@ -90,24 +92,25 @@ func NewController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "neg-controller"}) - manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), negSyncerType) + manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), negSyncerType) + reflector := readiness.NewReadinessReflector(ctx, manager) + manager.reflector = reflector negController := &Controller{ - client: ctx.KubeClient, - manager: manager, - resyncPeriod: resyncPeriod, - gcPeriod: gcPeriod, - recorder: recorder, - zoneGetter: zoneGetter, - namer: namer, - ingressSynced: ctx.IngressInformer.HasSynced, - serviceSynced: ctx.ServiceInformer.HasSynced, - endpointSynced: ctx.EndpointInformer.HasSynced, - ingressLister: ctx.IngressInformer.GetIndexer(), - serviceLister: ctx.ServiceInformer.GetIndexer(), - serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - syncTracker: utils.NewTimeTracker(), + client: ctx.KubeClient, + manager: manager, + resyncPeriod: resyncPeriod, + gcPeriod: gcPeriod, + recorder: recorder, + zoneGetter: zoneGetter, + namer: namer, + hasSynced: ctx.HasSynced, + ingressLister: ctx.IngressInformer.GetIndexer(), + serviceLister: ctx.ServiceInformer.GetIndexer(), + serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + syncTracker: utils.NewTimeTracker(), + reflector: reflector, } ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -157,6 +160,18 @@ func NewController( negController.enqueueEndpoint(cur) }, }) + + ctx.PodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*apiv1.Pod) + negController.reflector.SyncPod(pod) + }, + UpdateFunc: func(old, cur interface{}) { + pod := cur.(*apiv1.Pod) + negController.reflector.SyncPod(pod) + }, + }) + ctx.AddHealthCheck("neg-controller", negController.IsHealthy) return negController } @@ -164,7 +179,7 @@ func NewController( func (c *Controller) Run(stopCh <-chan struct{}) { wait.PollUntil(5*time.Second, func() (bool, error) { klog.V(2).Infof("Waiting for initial sync") - return c.synced(), nil + return c.hasSynced(), nil }, stopCh) klog.V(2).Infof("Starting network endpoint group controller") @@ -181,7 +196,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { time.Sleep(c.gcPeriod) wait.Until(c.gc, c.gcPeriod, stopCh) }() - + go c.reflector.Run(stopCh) <-stopCh } @@ -292,7 +307,7 @@ func (c *Controller) processService(key string) error { // Only service ports referenced by ingress are synced for NEG ings := getIngressServicesFromStore(c.ingressLister, service) ingressSvcPorts := gatherPortMappingUsedByIngress(ings, service) - ingressPortInfoMap := negtypes.NewPortInfoMap(namespace, name, ingressSvcPorts, c.namer) + ingressPortInfoMap := negtypes.NewPortInfoMap(namespace, name, ingressSvcPorts, c.namer, true) if err := portInfoMap.Merge(ingressPortInfoMap); err != nil { return fmt.Errorf("failed to merge service ports referenced by ingress (%v): %v", ingressPortInfoMap, err) } @@ -310,7 +325,7 @@ func (c *Controller) processService(key string) error { return err } - if err := portInfoMap.Merge(negtypes.NewPortInfoMap(namespace, name, exposedNegSvcPort, c.namer)); err != nil { + if err := portInfoMap.Merge(negtypes.NewPortInfoMap(namespace, name, exposedNegSvcPort, c.namer, false)); err != nil { return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err) } } @@ -409,12 +424,6 @@ func (c *Controller) gc() { } } -func (c *Controller) synced() bool { - return c.endpointSynced() && - c.serviceSynced() && - c.ingressSynced() -} - // gatherPortMappingUsedByIngress returns a map containing port:targetport // of all service ports of the service that are referenced by ingresses func gatherPortMappingUsedByIngress(ings []extensions.Ingress, svc *apiv1.Service) negtypes.SvcPortMap { diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 99f6ea471d..5f0b676e19 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -357,6 +357,7 @@ func TestGatherPortMappingUsedByIngress(t *testing.T) { } } +// TODO(freehan): include test cases with different ReadinessGate setup func TestSyncNegAnnotation(t *testing.T) { t.Parallel() // TODO: test that c.serviceLister.Update is called whenever the annotation @@ -375,21 +376,21 @@ func TestSyncNegAnnotation(t *testing.T) { }{ { desc: "apply new annotation with no previous annotation", - portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 443: "other_port"}, namer), + portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 443: "other_port"}, namer, false), }, { desc: "same annotation applied twice", - previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer), - portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer), + previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false), + portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false), }, { desc: "apply new annotation and override previous annotation", - previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer), - portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{3000: "6000", 4000: "8000"}, namer), + previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false), + portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{3000: "6000", 4000: "8000"}, namer, false), }, { desc: "remove previous annotation", - previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer), + previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false), }, { desc: "remove annotation with no previous annotation", diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 441ca371f2..3854c24651 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -20,10 +20,13 @@ import ( "fmt" "sync" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/neg/readiness" negsyncer "k8s.io/ingress-gce/pkg/neg/syncers" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" @@ -34,6 +37,10 @@ type serviceKey struct { name string } +func (k serviceKey) Key() string { + return fmt.Sprintf("%s/%s", k.namespace, k.name) +} + // syncerManager contains all the active syncer goroutines and manage their lifecycle. type syncerManager struct { negSyncerType NegSyncerType @@ -43,6 +50,7 @@ type syncerManager struct { cloud negtypes.NetworkEndpointGroupCloud zoneGetter negtypes.ZoneGetter + podLister cache.Indexer serviceLister cache.Indexer endpointLister cache.Indexer @@ -54,10 +62,12 @@ type syncerManager struct { svcPortMap map[serviceKey]negtypes.PortInfoMap // syncerMap stores the NEG syncer // key consists of service namespace, name and targetPort. Value is the corresponding syncer. - syncerMap map[negsyncer.NegSyncerKey]negtypes.NegSyncer + syncerMap map[negtypes.NegSyncerKey]negtypes.NegSyncer + // reflector handles NEG readiness gate and conditions for pods in NEG. + reflector readiness.Reflector } -func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer, negSyncerType NegSyncerType) *syncerManager { +func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, negSyncerType NegSyncerType) *syncerManager { klog.V(2).Infof("NEG controller will use NEG syncer type: %q", negSyncerType) return &syncerManager{ negSyncerType: negSyncerType, @@ -65,10 +75,11 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record. recorder: recorder, cloud: cloud, zoneGetter: zoneGetter, + podLister: podLister, serviceLister: serviceLister, endpointLister: endpointLister, svcPortMap: make(map[serviceKey]negtypes.PortInfoMap), - syncerMap: make(map[negsyncer.NegSyncerKey]negtypes.NegSyncer), + syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer), } } @@ -82,6 +93,10 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg currentPorts = make(negtypes.PortInfoMap) } + // TODO(freehan): change ignore ReadinessGate bool changes + // If the service port has NEG enabled, due to configuration changes, + // readinessGate may be turn on or off for the same service port, + // The current logic will result in syncer being recreated simply because readiness gate setting changed. removes := currentPorts.Difference(newPorts) adds := newPorts.Difference(currentPorts) @@ -100,7 +115,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg for svcPort, portInfo := range adds { syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, portInfo.TargetPort)] if !ok { - syncerKey := negsyncer.NegSyncerKey{ + syncerKey := negtypes.NegSyncerKey{ Namespace: namespace, Name: name, Port: svcPort, @@ -114,8 +129,10 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.recorder, manager.cloud, manager.zoneGetter, + manager.podLister, manager.serviceLister, manager.endpointLister, + manager.reflector, ) } else { // Use batch syncer by default @@ -139,7 +156,6 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg } } } - return utilerrors.NewAggregate(errList) } @@ -198,6 +214,53 @@ func (manager *syncerManager) GC() error { return nil } +// ReadinessGateEnabledNegs returns a list of NEGs which has readiness gate enabled for the input pod's namespace and labels. +func (manager *syncerManager) ReadinessGateEnabledNegs(namespace string, podLabels map[string]string) []string { + manager.mu.Lock() + defer manager.mu.Unlock() + ret := sets.NewString() + for svcKey, portMap := range manager.svcPortMap { + if svcKey.namespace != namespace { + continue + } + + obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key()) + if err != nil { + klog.Errorf("Failed to retrieve service %s from store: %v", svcKey.Key(), err) + continue + } + + if !exists { + continue + } + + service := obj.(*v1.Service) + + if service.Spec.Selector == nil { + // services with nil selectors match nothing, not everything. + continue + } + + selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated() + if selector.Matches(labels.Set(podLabels)) { + ret = ret.Union(portMap.NegsWithReadinessGate()) + } + } + return ret.List() +} + +// ReadinessGateEnabled returns true if the NEG requires readiness feedback +func (manager *syncerManager) ReadinessGateEnabled(syncerKey negtypes.NegSyncerKey) bool { + manager.mu.Lock() + defer manager.mu.Unlock() + if v, ok := manager.svcPortMap[serviceKey{namespace: syncerKey.Namespace, name: syncerKey.Name}]; ok { + if info, ok := v[syncerKey.Port]; ok { + return info.ReadinessGate + } + } + return false +} + // garbageCollectSyncer removes stopped syncer from syncerMap func (manager *syncerManager) garbageCollectSyncer() { manager.mu.Lock() @@ -262,8 +325,8 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string } // getSyncerKey encodes a service namespace, name, service port and targetPort into a string key -func getSyncerKey(namespace, name string, port int32, targetPort string) negsyncer.NegSyncerKey { - return negsyncer.NegSyncerKey{ +func getSyncerKey(namespace, name string, port int32, targetPort string) negtypes.NegSyncerKey { + return negtypes.NegSyncerKey{ Namespace: namespace, Name: name, Port: port, diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index bce526a720..96bd9cd9f6 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -20,7 +20,7 @@ import ( "testing" "time" - compute "google.golang.org/api/compute/v0.beta" + "google.golang.org/api/compute/v0.beta" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -29,7 +29,7 @@ import ( "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/context" - "k8s.io/ingress-gce/pkg/neg/syncers" + "k8s.io/ingress-gce/pkg/neg/readiness" "k8s.io/ingress-gce/pkg/neg/types" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" @@ -48,18 +48,23 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { DefaultBackendSvcPortID: defaultBackend, } context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) + manager := newSyncerManager( namer, record.NewFakeRecorder(100), negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), negtypes.NewFakeZoneGetter(), + context.PodInformer.GetIndexer(), context.ServiceInformer.GetIndexer(), context.EndpointInformer.GetIndexer(), transactionSyncer, ) + //TODO(freehan): use real readiness reflector for unit test + manager.reflector = &readiness.NoopReflector{} return manager } +// TODO(freehan): include test cases with different ReadinessGate setup func TestEnsureAndStopSyncer(t *testing.T) { t.Parallel() @@ -68,14 +73,14 @@ func TestEnsureAndStopSyncer(t *testing.T) { name string ports types.SvcPortMap stop bool - expect []syncers.NegSyncerKey // keys of running syncers + expect []negtypes.NegSyncerKey // keys of running syncers }{ { "ns1", "n1", types.SvcPortMap{1000: "80", 2000: "443"}, false, - []syncers.NegSyncerKey{ + []negtypes.NegSyncerKey{ getSyncerKey("ns1", "n1", 1000, "80"), getSyncerKey("ns1", "n1", 2000, "443"), }, @@ -85,7 +90,7 @@ func TestEnsureAndStopSyncer(t *testing.T) { "n1", types.SvcPortMap{3000: "80", 4000: "namedport"}, false, - []syncers.NegSyncerKey{ + []negtypes.NegSyncerKey{ getSyncerKey("ns1", "n1", 3000, "80"), getSyncerKey("ns1", "n1", 4000, "namedport"), }, @@ -95,7 +100,7 @@ func TestEnsureAndStopSyncer(t *testing.T) { "n1", types.SvcPortMap{3000: "80"}, false, - []syncers.NegSyncerKey{ + []negtypes.NegSyncerKey{ getSyncerKey("ns1", "n1", 3000, "80"), getSyncerKey("ns1", "n1", 4000, "namedport"), getSyncerKey("ns2", "n1", 3000, "80"), @@ -106,7 +111,7 @@ func TestEnsureAndStopSyncer(t *testing.T) { "n1", types.SvcPortMap{}, true, - []syncers.NegSyncerKey{ + []negtypes.NegSyncerKey{ getSyncerKey("ns2", "n1", 3000, "80"), }, }, @@ -117,7 +122,8 @@ func TestEnsureAndStopSyncer(t *testing.T) { if tc.stop { manager.StopSyncer(tc.namespace, tc.name) } else { - portInfoMap := negtypes.NewPortInfoMap(tc.namespace, tc.name, tc.ports, namer) + // TODO(freehan): include test cases with different ReadinessGate setup + portInfoMap := negtypes.NewPortInfoMap(tc.namespace, tc.name, tc.ports, namer, false) if err := manager.EnsureSyncers(tc.namespace, tc.name, portInfoMap); err != nil { t.Errorf("Failed to ensure syncer %s/%s-%v: %v", tc.namespace, tc.name, tc.ports, err) } From 5de93fde0add46df393a9f13de94b3773f3708fa Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 15 May 2019 18:18:22 -0700 Subject: [PATCH 6/6] adapt transaction NEG syncer to feedback into readiness reflector --- pkg/neg/syncers/transaction.go | 49 +++++++++++++++++++++-------- pkg/neg/syncers/transaction_test.go | 27 ++++++++++------ 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 72998a63a8..673c29b0a3 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -27,13 +27,14 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/neg/readiness" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" ) type transactionSyncer struct { // metadata - NegSyncerKey + negtypes.NegSyncerKey negName string // syncer provides syncer life cycle interfaces @@ -50,6 +51,7 @@ type transactionSyncer struct { // transactions stores each transaction transactions networkEndpointTransactionTable + podLister cache.Indexer serviceLister cache.Indexer endpointLister cache.Indexer recorder record.EventRecorder @@ -58,20 +60,25 @@ type transactionSyncer struct { // retry handles back off retry for NEG API operations retry retryHandler + + // reflector handles NEG readiness gate and conditions for pods in NEG. + reflector readiness.Reflector } -func NewTransactionSyncer(negSyncerKey NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) negtypes.NegSyncer { +func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer { // TransactionSyncer implements the syncer core ts := &transactionSyncer{ NegSyncerKey: negSyncerKey, negName: networkEndpointGroupName, needInit: true, transactions: NewTransactionTable(), + podLister: podLister, serviceLister: serviceLister, endpointLister: endpointLister, recorder: recorder, cloud: cloud, zoneGetter: zoneGetter, + reflector: reflector, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, networkEndpointGroupName, serviceLister, recorder, ts) @@ -123,7 +130,7 @@ func (s *transactionSyncer) syncInternal() error { return nil } - targetMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort) + targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.TargetPort, s.podLister) if err != nil { return err } @@ -140,12 +147,18 @@ func (s *transactionSyncer) syncInternal() error { reconcileTransactions(targetMap, s.transactions) // Calculate the endpoints to add and delete to transform the current state to desire state addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap) + // Calculate Pods that are already in the NEG + _, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap) // Filter out the endpoints with existing transaction // This mostly happens when transaction entry require reconciliation but the transaction is still progress // e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N. // This ensures the endpoint that requires reconciliation to wait till the existing transaction to complete. filterEndpointByTransaction(addEndpoints, s.transactions) filterEndpointByTransaction(removeEndpoints, s.transactions) + // filter out the endpoints that are in transaction + filterEndpointByTransaction(committedEndpoints, s.transactions) + + s.commitPods(committedEndpoints, endpointPodMap) if len(addEndpoints) == 0 && len(removeEndpoints) == 0 { klog.V(4).Infof("No endpoint change for %s/%s, skip syncing NEG. ", s.Namespace, s.Name) @@ -274,8 +287,6 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[ // If any transaction needs reconciliation, trigger resync. // needRetry indicates if the transaction needs to backoff and retry needRetry := false - // needSync indicates if the transaction needs to trigger resync immediately - needSync := false if err != nil { // Trigger NEG initialization if error occurs @@ -286,31 +297,43 @@ func (s *transactionSyncer) commitTransaction(err error, networkEndpointMap map[ for networkEndpoint := range networkEndpointMap { entry, ok := s.transactions.Get(networkEndpoint) + // clear transaction if !ok { klog.Errorf("Endpoint %q was not found in the transaction table.", networkEndpoint) - needSync = true continue } + // TODO: Remove NeedReconcile in the transation entry (freehan) if entry.NeedReconcile == true { klog.Errorf("Endpoint %q in NEG %q need to be reconciled.", networkEndpoint, s.NegSyncerKey.String()) - needSync = true } s.transactions.Delete(networkEndpoint) } - if needSync { - s.syncer.Sync() - return - } - if needRetry { if retryErr := s.retry.Retry(); retryErr != nil { s.recordEvent(apiv1.EventTypeWarning, "RetryFailed", fmt.Sprintf("Failed to retry NEG sync for %q: %v", s.NegSyncerKey.String(), retryErr)) } return } - s.retry.Reset() + // always trigger Sync to commit pods + s.syncer.Sync() +} + +// commitPods groups the endpoints by zone and signals the readiness reflector to poll pods of the NEG +func (s *transactionSyncer) commitPods(endpointMap map[string]negtypes.NetworkEndpointSet, endpointPodMap negtypes.EndpointPodMap) { + for zone, endpointSet := range endpointMap { + zoneEndpointMap := negtypes.EndpointPodMap{} + for _, endpoint := range endpointSet.List() { + podName, ok := endpointPodMap[endpoint] + if !ok { + klog.Warningf("Endpoint %v is not included in the endpointPodMap %v", endpoint, endpointPodMap) + continue + } + zoneEndpointMap[endpoint] = podName + } + s.reflector.CommitPods(s.NegSyncerKey, s.negName, zone, zoneEndpointMap) + } } // filterEndpointByTransaction removes the all endpoints from endpoint map if they exists in the transaction table diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 26c9097760..e7a9e6f363 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/context" + readiness "k8s.io/ingress-gce/pkg/neg/readiness" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" @@ -169,6 +170,7 @@ func TestTransactionSyncNetworkEndpoints(t *testing.T) { } } +// TODO(freehan): instead of only checking sync count. Also check the retry count func TestCommitTransaction(t *testing.T) { t.Parallel() s, transactionSyncer := newTestTransactionSyncer(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())) @@ -194,7 +196,7 @@ func TestCommitTransaction(t *testing.T) { map[negtypes.NetworkEndpoint]*compute.NetworkEndpoint{}, func() networkEndpointTransactionTable { return NewTransactionTable() }, func() networkEndpointTransactionTable { return NewTransactionTable() }, - 0, + 1, false, }, { @@ -207,7 +209,7 @@ func TestCommitTransaction(t *testing.T) { return table }, func() networkEndpointTransactionTable { return NewTransactionTable() }, - 0, + 2, false, }, { @@ -221,7 +223,7 @@ func TestCommitTransaction(t *testing.T) { return table }, func() networkEndpointTransactionTable { return NewTransactionTable() }, - 0, + 3, false, }, { @@ -240,7 +242,7 @@ func TestCommitTransaction(t *testing.T) { generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp, NeedReconcile: false}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") return table }, - 0, + 4, false, }, { @@ -257,7 +259,7 @@ func TestCommitTransaction(t *testing.T) { generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp, NeedReconcile: false}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") return table }, - 1, + 5, true, }, { @@ -276,7 +278,7 @@ func TestCommitTransaction(t *testing.T) { generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp, NeedReconcile: false}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") return table }, - 2, + 6, true, }, { @@ -289,7 +291,7 @@ func TestCommitTransaction(t *testing.T) { return table }, func() networkEndpointTransactionTable { return NewTransactionTable() }, - 3, + 7, false, }, { @@ -308,7 +310,7 @@ func TestCommitTransaction(t *testing.T) { generateTransaction(table, transactionEntry{Zone: testZone2, Operation: attachOp, NeedReconcile: false}, net.ParseIP("1.1.3.1"), 10, testInstance3, "8080") return table }, - 4, + 8, false, }, } @@ -836,20 +838,25 @@ func newTestTransactionSyncer(fakeGCE *gce.Cloud) (negtypes.NegSyncer, *transact DefaultBackendSvcPortID: defaultBackend, } context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) - svcPort := NegSyncerKey{ + svcPort := negtypes.NegSyncerKey{ Namespace: testNamespace, Name: testService, Port: 80, TargetPort: "8080", } + // TODO(freehan): use real readiness reflector + reflector := &readiness.NoopReflector{} + negsyncer := NewTransactionSyncer(svcPort, testNegName, record.NewFakeRecorder(100), fakeGCE, negtypes.NewFakeZoneGetter(), + context.PodInformer.GetIndexer(), context.ServiceInformer.GetIndexer(), - context.EndpointInformer.GetIndexer()) + context.EndpointInformer.GetIndexer(), + reflector) transactionSyncer := negsyncer.(*syncer).core.(*transactionSyncer) return negsyncer, transactionSyncer }