Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readiness reflector #748

Merged
merged 6 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 41 additions & 32 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
Expand All @@ -55,12 +56,10 @@ type Controller struct {
namer negtypes.NetworkEndpointGroupNamer
zoneGetter negtypes.ZoneGetter

ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
endpointSynced cache.InformerSynced
ingressLister cache.Indexer
serviceLister cache.Indexer
client kubernetes.Interface
hasSynced func() bool
ingressLister cache.Indexer
serviceLister cache.Indexer
client kubernetes.Interface

// serviceQueue takes service key as work item. Service key with format "namespace/name".
serviceQueue workqueue.RateLimitingInterface
Expand All @@ -69,6 +68,9 @@ type Controller struct {

// syncTracker tracks the latest time that service and endpoint changes are processed
syncTracker utils.TimeTracker

// reflector handles NEG readiness gate and conditions for pods in NEG.
reflector readiness.Reflector
}

// NewController returns a network endpoint group controller.
Expand All @@ -90,24 +92,25 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme,
apiv1.EventSource{Component: "neg-controller"})

manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), negSyncerType)
manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), negSyncerType)
reflector := readiness.NewReadinessReflector(ctx, manager)
manager.reflector = reflector

negController := &Controller{
client: ctx.KubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
ingressSynced: ctx.IngressInformer.HasSynced,
serviceSynced: ctx.ServiceInformer.HasSynced,
endpointSynced: ctx.EndpointInformer.HasSynced,
ingressLister: ctx.IngressInformer.GetIndexer(),
serviceLister: ctx.ServiceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncTracker: utils.NewTimeTracker(),
client: ctx.KubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
hasSynced: ctx.HasSynced,
ingressLister: ctx.IngressInformer.GetIndexer(),
serviceLister: ctx.ServiceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
}

ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -157,14 +160,26 @@ func NewController(
negController.enqueueEndpoint(cur)
},
})

ctx.PodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*apiv1.Pod)
negController.reflector.SyncPod(pod)
},
UpdateFunc: func(old, cur interface{}) {
pod := cur.(*apiv1.Pod)
negController.reflector.SyncPod(pod)
},
})

ctx.AddHealthCheck("neg-controller", negController.IsHealthy)
return negController
}

func (c *Controller) Run(stopCh <-chan struct{}) {
wait.PollUntil(5*time.Second, func() (bool, error) {
klog.V(2).Infof("Waiting for initial sync")
return c.synced(), nil
return c.hasSynced(), nil
}, stopCh)

klog.V(2).Infof("Starting network endpoint group controller")
Expand All @@ -181,7 +196,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
time.Sleep(c.gcPeriod)
wait.Until(c.gc, c.gcPeriod, stopCh)
}()

go c.reflector.Run(stopCh)
<-stopCh
}

Expand Down Expand Up @@ -292,7 +307,7 @@ func (c *Controller) processService(key string) error {
// Only service ports referenced by ingress are synced for NEG
ings := getIngressServicesFromStore(c.ingressLister, service)
ingressSvcPorts := gatherPortMappingUsedByIngress(ings, service)
ingressPortInfoMap := negtypes.NewPortInfoMap(namespace, name, ingressSvcPorts, c.namer)
ingressPortInfoMap := negtypes.NewPortInfoMap(namespace, name, ingressSvcPorts, c.namer, true)
if err := portInfoMap.Merge(ingressPortInfoMap); err != nil {
return fmt.Errorf("failed to merge service ports referenced by ingress (%v): %v", ingressPortInfoMap, err)
}
Expand All @@ -310,7 +325,7 @@ func (c *Controller) processService(key string) error {
return err
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(namespace, name, exposedNegSvcPort, c.namer)); err != nil {
if err := portInfoMap.Merge(negtypes.NewPortInfoMap(namespace, name, exposedNegSvcPort, c.namer, false)); err != nil {
return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err)
}
}
Expand Down Expand Up @@ -409,12 +424,6 @@ func (c *Controller) gc() {
}
}

func (c *Controller) synced() bool {
return c.endpointSynced() &&
c.serviceSynced() &&
c.ingressSynced()
}

// gatherPortMappingUsedByIngress returns a map containing port:targetport
// of all service ports of the service that are referenced by ingresses
func gatherPortMappingUsedByIngress(ings []extensions.Ingress, svc *apiv1.Service) negtypes.SvcPortMap {
Expand Down
13 changes: 7 additions & 6 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func TestGatherPortMappingUsedByIngress(t *testing.T) {
}
}

// TODO(freehan): include test cases with different ReadinessGate setup
func TestSyncNegAnnotation(t *testing.T) {
t.Parallel()
// TODO: test that c.serviceLister.Update is called whenever the annotation
Expand All @@ -375,21 +376,21 @@ func TestSyncNegAnnotation(t *testing.T) {
}{
{
desc: "apply new annotation with no previous annotation",
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 443: "other_port"}, namer),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 443: "other_port"}, namer, false),
},
{
desc: "same annotation applied twice",
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
},
{
desc: "apply new annotation and override previous annotation",
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{3000: "6000", 4000: "8000"}, namer),
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{3000: "6000", 4000: "8000"}, namer, false),
},
{
desc: "remove previous annotation",
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
},
{
desc: "remove annotation with no previous annotation",
Expand Down
77 changes: 70 additions & 7 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"fmt"
"sync"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/neg/readiness"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog"
Expand All @@ -34,6 +37,10 @@ type serviceKey struct {
name string
}

func (k serviceKey) Key() string {
return fmt.Sprintf("%s/%s", k.namespace, k.name)
}

// syncerManager contains all the active syncer goroutines and manage their lifecycle.
type syncerManager struct {
negSyncerType NegSyncerType
Expand All @@ -43,6 +50,7 @@ type syncerManager struct {
cloud negtypes.NetworkEndpointGroupCloud
zoneGetter negtypes.ZoneGetter

podLister cache.Indexer
serviceLister cache.Indexer
endpointLister cache.Indexer

Expand All @@ -54,21 +62,24 @@ type syncerManager struct {
svcPortMap map[serviceKey]negtypes.PortInfoMap
// syncerMap stores the NEG syncer
// key consists of service namespace, name and targetPort. Value is the corresponding syncer.
syncerMap map[negsyncer.NegSyncerKey]negtypes.NegSyncer
syncerMap map[negtypes.NegSyncerKey]negtypes.NegSyncer
// reflector handles NEG readiness gate and conditions for pods in NEG.
reflector readiness.Reflector
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer, negSyncerType NegSyncerType) *syncerManager {
func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, negSyncerType NegSyncerType) *syncerManager {
klog.V(2).Infof("NEG controller will use NEG syncer type: %q", negSyncerType)
return &syncerManager{
negSyncerType: negSyncerType,
namer: namer,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
svcPortMap: make(map[serviceKey]negtypes.PortInfoMap),
syncerMap: make(map[negsyncer.NegSyncerKey]negtypes.NegSyncer),
syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer),
}
}

Expand All @@ -82,6 +93,10 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
currentPorts = make(negtypes.PortInfoMap)
}

// TODO(freehan): change ignore ReadinessGate bool changes
// If the service port has NEG enabled, due to configuration changes,
// readinessGate may be turn on or off for the same service port,
// The current logic will result in syncer being recreated simply because readiness gate setting changed.
removes := currentPorts.Difference(newPorts)
adds := newPorts.Difference(currentPorts)

Expand All @@ -100,7 +115,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
for svcPort, portInfo := range adds {
syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, portInfo.TargetPort)]
if !ok {
syncerKey := negsyncer.NegSyncerKey{
syncerKey := negtypes.NegSyncerKey{
Namespace: namespace,
Name: name,
Port: svcPort,
Expand All @@ -114,8 +129,10 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
manager.recorder,
manager.cloud,
manager.zoneGetter,
manager.podLister,
manager.serviceLister,
manager.endpointLister,
manager.reflector,
)
} else {
// Use batch syncer by default
Expand All @@ -139,7 +156,6 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
}
}
}

return utilerrors.NewAggregate(errList)
}

Expand Down Expand Up @@ -198,6 +214,53 @@ func (manager *syncerManager) GC() error {
return nil
}

// ReadinessGateEnabledNegs returns a list of NEGs which has readiness gate enabled for the input pod's namespace and labels.
func (manager *syncerManager) ReadinessGateEnabledNegs(namespace string, podLabels map[string]string) []string {
manager.mu.Lock()
defer manager.mu.Unlock()
ret := sets.NewString()
for svcKey, portMap := range manager.svcPortMap {
if svcKey.namespace != namespace {
continue
}

obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key())
if err != nil {
klog.Errorf("Failed to retrieve service %s from store: %v", svcKey.Key(), err)
continue
}

if !exists {
continue
}

service := obj.(*v1.Service)

if service.Spec.Selector == nil {
// services with nil selectors match nothing, not everything.
continue
}

selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
if selector.Matches(labels.Set(podLabels)) {
ret = ret.Union(portMap.NegsWithReadinessGate())
}
}
return ret.List()
}

// ReadinessGateEnabled returns true if the NEG requires readiness feedback
func (manager *syncerManager) ReadinessGateEnabled(syncerKey negtypes.NegSyncerKey) bool {
manager.mu.Lock()
defer manager.mu.Unlock()
if v, ok := manager.svcPortMap[serviceKey{namespace: syncerKey.Namespace, name: syncerKey.Name}]; ok {
if info, ok := v[syncerKey.Port]; ok {
return info.ReadinessGate
}
}
return false
}

// garbageCollectSyncer removes stopped syncer from syncerMap
func (manager *syncerManager) garbageCollectSyncer() {
manager.mu.Lock()
Expand Down Expand Up @@ -262,8 +325,8 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string
}

// getSyncerKey encodes a service namespace, name, service port and targetPort into a string key
func getSyncerKey(namespace, name string, port int32, targetPort string) negsyncer.NegSyncerKey {
return negsyncer.NegSyncerKey{
func getSyncerKey(namespace, name string, port int32, targetPort string) negtypes.NegSyncerKey {
return negtypes.NegSyncerKey{
Namespace: namespace,
Name: name,
Port: port,
Expand Down
Loading