diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index afe328de38..88e8a91bc4 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -219,7 +219,7 @@ func runControllers(ctx *ingctx.ControllerContext) { fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values()) // TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController. - negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector) + negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller) go negController.Run(stopCh) klog.V(0).Infof("negController started") diff --git a/pkg/annotations/service.go b/pkg/annotations/service.go index 87b5386205..c133081c32 100644 --- a/pkg/annotations/service.go +++ b/pkg/annotations/service.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "k8s.io/api/core/v1" + "k8s.io/legacy-cloud-providers/gce" ) const ( @@ -162,6 +163,22 @@ func FromService(obj *v1.Service) *Service { return &Service{obj.Annotations} } +// WantsL4ILB checks if the given service requires L4 ILB. +// the function returns a boolean as well as the loadbalancer type(string). +func WantsL4ILB(service *v1.Service) (bool, string) { + if service == nil { + return false, "" + } + if service.Spec.Type != v1.ServiceTypeLoadBalancer { + return false, fmt.Sprintf("Type : %s", service.Spec.Type) + } + ltype := gce.GetLoadBalancerAnnotationType(service) + if ltype == gce.LBTypeInternal { + return true, fmt.Sprintf("Type : %s, LBType : %s", service.Spec.Type, ltype) + } + return false, fmt.Sprintf("Type : %s, LBType : %s", service.Spec.Type, ltype) +} + // ApplicationProtocols returns a map of port (name or number) to the protocol // on the port. func (svc *Service) ApplicationProtocols() (map[string]AppProtocol, error) { diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index ac7c7cbfc3..0c8540cc1b 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -90,6 +90,8 @@ var ( EnableNonGCPMode bool EnableDeleteUnusedFrontends bool EnableV2FrontendNamer bool + RunIngressController bool + RunL4Controller bool LeaderElection LeaderElectionConfiguration }{} @@ -210,6 +212,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 flag.BoolVar(&F.EnableNonGCPMode, "enable-non-gcp-mode", false, "Set to true when running on a non-GCP cluster.") flag.BoolVar(&F.EnableDeleteUnusedFrontends, "enable-delete-unused-frontends", false, "Enable deleting unused gce frontend resources.") flag.BoolVar(&F.EnableV2FrontendNamer, "enable-v2-frontend-namer", false, "Enable v2 ingress frontend naming policy.") + flag.BoolVar(&F.RunIngressController, "run-ingress-controller", true, `Optional, whether or not to run IngressController as part of glbc. If set to false, ingress resources will not be processed. Only the L4 Service controller will be run, if that flag is set to true.`) + flag.BoolVar(&F.RunL4Controller, "run-l4-controller", false, `Optional, whether or not to run L4 Service Controller as part of glbc. If set to true, services of Type:LoadBalancer with Internal annotation will be processed by this controller.`) } type RateLimitSpecs struct { diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 975d8ebe01..310618b02c 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + "k8s.io/cloud-provider/service/helpers" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/controller/translator" @@ -78,18 +79,25 @@ type Controller struct { serviceQueue workqueue.RateLimitingInterface // endpointQueue takes endpoint key as work item. Endpoint key with format "namespace/name". endpointQueue workqueue.RateLimitingInterface + // nodeQueue takes node name as work item. + nodeQueue workqueue.RateLimitingInterface // destinationRuleQueue takes Istio DestinationRule key as work item. DestinationRule key with format "namespace/name" destinationRuleQueue workqueue.RateLimitingInterface // syncTracker tracks the latest time that service and endpoint changes are processed syncTracker utils.TimeTracker + // nodeSyncTracker tracks the latest time that node changes are processed + nodeSyncTracker utils.TimeTracker // reflector handles NEG readiness gate and conditions for pods in NEG. reflector readiness.Reflector // collector collects NEG usage metrics collector usage.NegMetricsCollector + + // runL4 indicates whether to run NEG controller that processes L4 ILB services + runL4 bool } // NewController returns a network endpoint group controller. @@ -101,6 +109,8 @@ func NewController( resyncPeriod time.Duration, gcPeriod time.Duration, enableReadinessReflector bool, + runIngress bool, + runL4Controller bool, ) *Controller { // init event recorder // TODO: move event recorder initializer to main. Reuse it among controllers. @@ -112,7 +122,7 @@ func NewController( recorder := eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "neg-controller"}) - manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer()) + manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), ctx.NodeInformer.GetIndexer()) var reflector readiness.Reflector if enableReadinessReflector { reflector = readiness.NewReadinessReflector(ctx, manager) @@ -135,43 +145,57 @@ func NewController( serviceLister: ctx.ServiceInformer.GetIndexer(), serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), syncTracker: utils.NewTimeTracker(), reflector: reflector, collector: ctx.ControllerMetrics, + runL4: runL4Controller, } - ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - addIng := obj.(*v1beta1.Ingress) - if !utils.IsGLBCIngress(addIng) { - klog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", common.NamespacedName(addIng), annotations.IngressClassKey) - return - } - negController.enqueueIngressServices(addIng) - }, - DeleteFunc: func(obj interface{}) { - delIng := obj.(*v1beta1.Ingress) - if !utils.IsGLBCIngress(delIng) { - klog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", common.NamespacedName(delIng), annotations.IngressClassKey) - return - } - negController.enqueueIngressServices(delIng) - }, - UpdateFunc: func(old, cur interface{}) { - oldIng := cur.(*v1beta1.Ingress) - curIng := cur.(*v1beta1.Ingress) - if !utils.IsGLBCIngress(curIng) { - klog.V(4).Infof("Ignoring update for ingress %v based on annotation %v", common.NamespacedName(curIng), annotations.IngressClassKey) - return - } - keys := gatherIngressServiceKeys(oldIng) - keys = keys.Union(gatherIngressServiceKeys(curIng)) - for _, key := range keys.List() { - negController.enqueueService(cache.ExplicitKey(key)) - } - }, - }) + if runIngress { + ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addIng := obj.(*v1beta1.Ingress) + if !utils.IsGLBCIngress(addIng) { + klog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", common.NamespacedName(addIng), annotations.IngressClassKey) + return + } + negController.enqueueIngressServices(addIng) + }, + DeleteFunc: func(obj interface{}) { + delIng := obj.(*v1beta1.Ingress) + if !utils.IsGLBCIngress(delIng) { + klog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", common.NamespacedName(delIng), annotations.IngressClassKey) + return + } + negController.enqueueIngressServices(delIng) + }, + UpdateFunc: func(old, cur interface{}) { + oldIng := cur.(*v1beta1.Ingress) + curIng := cur.(*v1beta1.Ingress) + if !utils.IsGLBCIngress(curIng) { + klog.V(4).Infof("Ignoring update for ingress %v based on annotation %v", common.NamespacedName(curIng), annotations.IngressClassKey) + return + } + keys := gatherIngressServiceKeys(oldIng) + keys = keys.Union(gatherIngressServiceKeys(curIng)) + for _, key := range keys.List() { + negController.enqueueService(cache.ExplicitKey(key)) + } + }, + }) + ctx.PodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*apiv1.Pod) + negController.reflector.SyncPod(pod) + }, + UpdateFunc: func(old, cur interface{}) { + pod := cur.(*apiv1.Pod) + negController.reflector.SyncPod(pod) + }, + }) + } ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: negController.enqueueService, DeleteFunc: negController.enqueueService, @@ -188,16 +212,18 @@ func NewController( }, }) - ctx.PodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*apiv1.Pod) - negController.reflector.SyncPod(pod) - }, - UpdateFunc: func(old, cur interface{}) { - pod := cur.(*apiv1.Pod) - negController.reflector.SyncPod(pod) - }, - }) + if negController.runL4 { + ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*apiv1.Node) + negController.enqueueNode(node) + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*apiv1.Node) + negController.enqueueNode(node) + }, + }) + } if ctx.EnableASMConfigMap { cmconfig := ctx.ASMConfigController.GetConfig() @@ -234,6 +260,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { go wait.Until(c.serviceWorker, time.Second, stopCh) go wait.Until(c.endpointWorker, time.Second, stopCh) + go wait.Until(c.nodeWorker, time.Second, stopCh) go func() { // Wait for gcPeriod to run the first GC // This is to make sure that all services are fully processed before running GC. @@ -245,6 +272,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } func (c *Controller) IsHealthy() error { + // log the last node sync + klog.V(5).Infof("Last node sync was at %v", c.nodeSyncTracker.Get()) // check if last seen service and endpoint processing is more than an hour ago if c.syncTracker.Get().Before(time.Now().Add(-time.Hour)) { msg := fmt.Sprintf("NEG controller has not processed any service "+ @@ -276,6 +305,29 @@ func (c *Controller) endpointWorker() { } } +func (c *Controller) nodeWorker() { + for { + func() { + key, quit := c.nodeQueue.Get() + if quit { + return + } + c.processNode() + c.nodeQueue.Done(key) + }() + } +} + +// processNode finds the related syncers and signal it to sync +// use a semaphore approach where all vm_ip syncers can wake up. +func (c *Controller) processNode() { + defer func() { + now := c.nodeSyncTracker.Track() + metrics.LastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) + }() + c.manager.SyncNodes() +} + // processEndpoint finds the related syncers and signal it to sync func (c *Controller) processEndpoint(key string) { defer func() { @@ -356,7 +408,11 @@ func (c *Controller) processService(key string) error { if err := svcPortInfoMap.Merge(csmSVCPortInfoMap); err != nil { return fmt.Errorf("failed to merge CSM service PortInfoMap: %v, error: %v", csmSVCPortInfoMap, err) } - + if c.runL4 { + if err := c.mergeVmPrimaryIpNEGsPortInfo(service, types.NamespacedName{Namespace: namespace, Name: name}, svcPortInfoMap); err != nil { + return err + } + } if len(svcPortInfoMap) != 0 || len(destinationRulesPortInfoMap) != 0 { klog.V(2).Infof("Syncing service %q", key) if err = c.syncNegStatusAnnotation(namespace, name, svcPortInfoMap); err != nil { @@ -402,7 +458,7 @@ func (c *Controller) mergeIngressPortInfo(service *apiv1.Service, name types.Nam return nil } -// mergeStandaloneNEGsPortInfo merge Sandaloon NEG PortInfo into portInfoMap +// mergeStandaloneNEGsPortInfo merge Standalone NEG PortInfo into portInfoMap func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap) error { negAnnotation, foundNEGAnnotation, err := annotations.FromService(service).NEGAnnotation() if err != nil { @@ -437,6 +493,20 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty return nil } +// mergeVmPrimaryIpNEGsPortInfo merges the PortInfo for ILB services using GCE_VM_PRIMARY_IP NEGs into portInfoMap +func (c *Controller) mergeVmPrimaryIpNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap) error { + if wantsILB, _ := annotations.WantsL4ILB(service); !wantsILB { + return nil + } + if utils.IsLegacyL4ILBService(service) { + msg := fmt.Sprintf("Ignoring ILB Service %s, namespace %s as it contains legacy resources created by service controller", service.Name, service.Namespace) + klog.Warning(msg) + c.recorder.Eventf(service, apiv1.EventTypeWarning, "ProcessServiceFailed", msg) + } + return portInfoMap.Merge(negtypes.NewPortInfoMapForPrimaryIPNEG(name.Namespace, name.Name, c.namer, + !helpers.RequestsOnlyLocalTraffic(service))) +} + // mergeDefaultBackendServicePortInfoMap merge the PortInfoMap for the default backend service into portInfoMap // The default backend service needs special handling since it is not explicitly referenced // in the ingress spec. It is either inferred and then managed by the controller, or @@ -613,6 +683,15 @@ func (c *Controller) enqueueEndpoint(obj interface{}) { c.endpointQueue.Add(key) } +func (c *Controller) enqueueNode(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + klog.Errorf("Failed to generate endpoint key: %v", err) + return + } + c.nodeQueue.Add(key) +} + func (c *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index f0dde5ec18..c64f251a9a 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -113,6 +113,8 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { 1*time.Second, // TODO(freehan): enable readiness reflector for unit tests false, + true, + false, ) return controller } @@ -278,6 +280,31 @@ func TestEnableNEGServiceWithIngress(t *testing.T) { validateServiceStateAnnotation(t, svc, svcPorts, controller.namer) } +//TestEnableNEGSeviceWithL4ILB tests L4 ILB service with NEGs enabled. +func TestEnableNEGServiceWithL4ILB(t *testing.T) { + controller := newTestController(fake.NewSimpleClientset()) + controller.runL4 = true + defer controller.stop() + for _, randomize := range []bool{false, true} { + controller.serviceLister.Add(newTestILBService(controller, !randomize, 80)) + svcClient := controller.client.CoreV1().Services(testServiceNamespace) + svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName) + err := controller.processService(svcKey) + if err != nil { + t.Fatalf("Failed to process service: %v", err) + } + svc, err := svcClient.Get(testServiceName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Service was not created.(*apiv1.Service) successfully, err: %v", err) + } + validateSyncers(t, controller, 1, false) + expectedPortInfoMap := negtypes.NewPortInfoMapForPrimaryIPNEG(testServiceNamespace, testServiceName, + controller.namer, randomize) + validateSyncerManagerWithPortInfoMap(t, controller, testServiceNamespace, testServiceName, expectedPortInfoMap) + validateServiceAnnotationWithPortInfoMap(t, svc, expectedPortInfoMap) + } +} + // TestEnableNEGServiceWithILBIngress tests ILB service with NEG enabled func TestEnableNEGServiceWithILBIngress(t *testing.T) { // Not running in parallel since enabling global flag @@ -1069,6 +1096,28 @@ func getTestSvcPortTuple(svcPort int32) negtypes.SvcPortTuple { return negtypes.SvcPortTuple{} } +func newTestILBService(c *Controller, onlyLocal bool, port int) *apiv1.Service { + svc := &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + Annotations: map[string]string{gce.ServiceAnnotationLoadBalancerType: string(gce.LBTypeInternal)}, + }, + Spec: apiv1.ServiceSpec{ + Type: apiv1.ServiceTypeLoadBalancer, + Ports: []apiv1.ServicePort{ + {Name: "testport", Port: int32(port)}, + }, + }, + } + if onlyLocal { + svc.Spec.ExternalTrafficPolicy = apiv1.ServiceExternalTrafficPolicyTypeLocal + } + + c.client.CoreV1().Services(testServiceNamespace).Create(svc) + return svc +} + func newTestService(c *Controller, negIngress bool, negSvcPorts []int32) *apiv1.Service { svcAnnotations := map[string]string{} if negIngress || len(negSvcPorts) > 0 { diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 30b4491116..63dd7b9293 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -51,6 +51,7 @@ type syncerManager struct { cloud negtypes.NetworkEndpointGroupCloud zoneGetter negtypes.ZoneGetter + nodeLister cache.Indexer podLister cache.Indexer serviceLister cache.Indexer endpointLister cache.Indexer @@ -68,12 +69,13 @@ type syncerManager struct { reflector readiness.Reflector } -func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer) *syncerManager { +func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister, serviceLister, endpointLister, nodeLister cache.Indexer) *syncerManager { return &syncerManager{ namer: namer, recorder: recorder, cloud: cloud, zoneGetter: zoneGetter, + nodeLister: nodeLister, podLister: podLister, serviceLister: serviceLister, endpointLister: endpointLister, @@ -116,6 +118,9 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg syncerKey := getSyncerKey(namespace, name, svcPort, portInfo) syncer, ok := manager.syncerMap[syncerKey] if !ok { + // determine the implementation that calculates NEG endpoints on each sync. + epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter, + syncerKey, portInfo.RandomizeEndpoints) syncer = negsyncer.NewTransactionSyncer( syncerKey, portInfo.NegName, @@ -125,7 +130,9 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg manager.podLister, manager.serviceLister, manager.endpointLister, + manager.nodeLister, manager.reflector, + epc, ) manager.syncerMap[syncerKey] = syncer } @@ -171,6 +178,18 @@ func (manager *syncerManager) Sync(namespace, name string) { } } +// SyncNodes signals all GCE_VM_PRIMARY_IP syncers to sync. +// Only these use nodes selected at random as endpoints and hence need to sync upon node updates. +func (manager *syncerManager) SyncNodes() { + manager.mu.Lock() + defer manager.mu.Unlock() + for key, syncer := range manager.syncerMap { + if key.NegType == negtypes.VmPrimaryIpEndpointType && !syncer.IsStopped() { + syncer.Sync() + } + } +} + // ShutDown signals all syncers to stop func (manager *syncerManager) ShutDown() { manager.mu.Lock() @@ -316,6 +335,9 @@ func getSyncerKey(namespace, name string, servicePortKey negtypes.PortInfoMapKey if flags.F.EnableNonGCPMode { networkEndpointType = negtypes.NonGCPPrivateEndpointType } + if portInfo.PortTuple.Empty() { + networkEndpointType = negtypes.VmPrimaryIpEndpointType + } return negtypes.NegSyncerKey{ Namespace: namespace, diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 68cdc8c591..cc9a866f8c 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -84,6 +84,7 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { context.PodInformer.GetIndexer(), context.ServiceInformer.GetIndexer(), context.EndpointInformer.GetIndexer(), + context.NodeInformer.GetIndexer(), ) manager.reflector = readiness.NewReadinessReflector(context, manager) return manager @@ -216,6 +217,20 @@ func TestEnsureAndStopSyncer(t *testing.T) { }, expectEnsureError: false, }, + { + desc: "add a new l4 ilb port for ns2/n1 service", + namespace: svcNamespace1, + name: svcName, + stop: false, + portInfoMap: negtypes.NewPortInfoMap(svcNamespace1, svcName, types.NewSvcPortTupleSet(negtypes.SvcPortTuple{Name: portName2, Port: 3000, TargetPort: "80"}, negtypes.SvcPortTuple{Name: portName0, Port: 4000, TargetPort: "bar"}, negtypes.SvcPortTuple{Name: string(negtypes.VmPrimaryIpEndpointType), Port: 0}), namer, true), + expectInternals: map[negtypes.NegSyncerKey]bool{ + getSyncerKey(svcNamespace2, svcName, negtypes.PortInfoMapKey{ServicePort: 3000, Subset: ""}, negtypes.PortInfo{PortTuple: negtypes.SvcPortTuple{Port: 3000, TargetPort: "80"}}): false, + getSyncerKey(svcNamespace1, svcName, negtypes.PortInfoMapKey{ServicePort: 3000, Subset: ""}, negtypes.PortInfo{PortTuple: negtypes.SvcPortTuple{Name: portName2, Port: 3000, TargetPort: "80"}}): true, + getSyncerKey(svcNamespace1, svcName, negtypes.PortInfoMapKey{ServicePort: 4000, Subset: ""}, negtypes.PortInfo{PortTuple: negtypes.SvcPortTuple{Name: portName0, Port: 4000, TargetPort: "bar"}}): true, + getSyncerKey(svcNamespace1, svcName, negtypes.PortInfoMapKey{ServicePort: 0, Subset: ""}, negtypes.PortInfo{PortTuple: negtypes.SvcPortTuple{Name: string(negtypes.VmPrimaryIpEndpointType), Port: 0}}): true, + }, + expectEnsureError: false, + }, } for _, tc := range testCases { @@ -349,15 +364,19 @@ func TestGarbageCollectionNEG(t *testing.T) { t.Fatalf("Failed to ensure syncer: %v", err) } - for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType} { + version := meta.VersionGA + for _, networkEndpointType := range []negtypes.NetworkEndpointType{negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType, negtypes.VmPrimaryIpEndpointType} { + if networkEndpointType == negtypes.VmPrimaryIpEndpointType { + version = meta.VersionAlpha + } negName := manager.namer.NEG("test", "test", 80) manager.cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ - Version: meta.VersionGA, + Version: version, Name: negName, NetworkEndpointType: string(networkEndpointType), }, negtypes.TestZone1) manager.cloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{ - Version: meta.VersionGA, + Version: version, Name: negName, NetworkEndpointType: string(networkEndpointType), }, negtypes.TestZone2) @@ -366,7 +385,7 @@ func TestGarbageCollectionNEG(t *testing.T) { t.Fatalf("Failed to GC: %v", err) } for _, zone := range []string{negtypes.TestZone1, negtypes.TestZone2} { - negs, _ := manager.cloud.ListNetworkEndpointGroup(zone, meta.VersionGA) + negs, _ := manager.cloud.ListNetworkEndpointGroup(zone, version) for _, neg := range negs { if neg.Name == negName { t.Errorf("Expect NEG %q in zone %q to be GCed.", negName, zone) diff --git a/pkg/neg/syncers/endpoints_calculator.go b/pkg/neg/syncers/endpoints_calculator.go new file mode 100644 index 0000000000..44534b91bb --- /dev/null +++ b/pkg/neg/syncers/endpoints_calculator.go @@ -0,0 +1,210 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncers + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/klog" +) + +// LocalL4ILBEndpointGetter implements the NetworkEndpointsCalculator interface. +// It exposes methods to calculate Network endpoints for VM_PRIMARY_IP NEGs when the service +// uses "ExternalTrafficPolicy: Local" mode. +// In this mode, the endpoints of the NEG are calculated by listing the nodes that host the service endpoints(pods) +// for the given service. These candidate nodes picked as is, if the count is less than the subset size limit(250). +// Otherwise, a subset of nodes is selected. +// In a cluster with nodes node1... node 50. If nodes node10 to node 45 run the pods for a given ILB service, all these +// nodes - node10, node 11 ... node45 will be part of the subset. +type LocalL4ILBEndpointsCalculator struct { + nodeLister listers.NodeLister + zoneGetter types.ZoneGetter + subsetSizeLimit int + svcId string +} + +func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string) *LocalL4ILBEndpointsCalculator { + return &LocalL4ILBEndpointsCalculator{nodeLister: nodeLister, zoneGetter: zoneGetter, subsetSizeLimit: maxSubsetSizeLocal, svcId: svcId} +} + +// Mode indicates the mode that the EndpointsCalculator is operating in. +func (l *LocalL4ILBEndpointsCalculator) Mode() types.EndpointsCalculatorMode { + return types.L4LocalMode +} + +// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. +func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(ep *v1.Endpoints, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) { + // List all nodes where the service endpoints are running. Get a subset of the desired count. + zoneNodeMap := make(map[string][]*v1.Node) + nodeNames := sets.String{} + numEndpoints := 0 + for _, curEp := range ep.Subsets { + for _, addr := range curEp.Addresses { + if addr.NodeName == nil { + klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", addr.IP, ep.Namespace, ep.Name) + continue + } + if addr.TargetRef == nil { + klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", addr.IP, ep.Namespace, ep.Name) + continue + } + numEndpoints++ + if nodeNames.Has(*addr.NodeName) { + continue + } + nodeNames.Insert(*addr.NodeName) + node, err := l.nodeLister.Get(*addr.NodeName) + if err != nil { + klog.Errorf("failed to retrieve node object for %q: %v", *addr.NodeName, err) + continue + } + zone, err := l.zoneGetter.GetZoneForNode(node.Name) + if err != nil { + klog.Errorf("Unable to find zone for node %s, err %v, skipping", node.Name, err) + continue + } + zoneNodeMap[zone] = append(zoneNodeMap[zone], node) + } + } + if numEndpoints == 0 { + // TODO verify the behavior seen by a client when accessing an ILB whose NEGs have no endpoints. + return nil, nil, nil + } + // This denotes zones where the endpoint pods are running + numZones := len(zoneNodeMap) + perZoneCount := l.getPerZoneSubsetCount(numZones, numEndpoints) + // Compute the networkEndpoints, with endpointSet size in each zone being atmost `perZoneCount` in size + // TODO fix this logic to pick upto a total of l.SubsetSizeLimit if there are more than perZoneCount nodes in one + // zone and fewer in another. + subsetMap, err := getSubsetPerZone(zoneNodeMap, perZoneCount, l.svcId, currentMap) + return subsetMap, nil, err +} + +// getPerZoneSubsetCount returns the max size limit of each zonal NEG, given the number of zones and service endpoints. +// The subset size will be proportional to the endpoint size, as long as endpoints size is within the limit. +func (l *LocalL4ILBEndpointsCalculator) getPerZoneSubsetCount(numZones, numEndpoints int) int { + if numZones == 0 { + return 0 + } + // Dividing by numZones can cause an off-by-one error depending on the numZones value. + // For instance, 250/3 = 83, 83*3 = 249, i.e 250 - 1 + if numEndpoints > l.subsetSizeLimit { + return l.subsetSizeLimit / numZones + } + // If there are 2 endpoints and 3 zones, we want to pick atleast one per zone. + if numEndpoints > 0 && numEndpoints < numZones { + return 1 + } + return numEndpoints / numZones +} + +// ClusterL4ILBEndpointGetter implements the NetworkEndpointsCalculator interface. +// It exposes methods to calculate Network endpoints for VM_PRIMARY_IP NEGs when the service +// uses "ExternalTrafficPolicy: Cluster" mode This is the default mode. +// In this mode, the endpoints of the NEG are calculated by selecting nodes at random. Upto 25(subset size limit in this +// mode) are selected. +type ClusterL4ILBEndpointsCalculator struct { + // nodeLister is used for listing all the nodes in the cluster when calculating the subset. + nodeLister listers.NodeLister + // zoneGetter looks up the zone for a given node when calculating subsets. + zoneGetter types.ZoneGetter + // subsetSizeLimit is the max value of the subset size in this mode. + subsetSizeLimit int + // svcId is the unique identifier for the service, that is used as a salt when hashing nodenames. + svcId string +} + +func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string) *ClusterL4ILBEndpointsCalculator { + return &ClusterL4ILBEndpointsCalculator{nodeLister: nodeLister, zoneGetter: zoneGetter, + subsetSizeLimit: maxSubsetSizeDefault, svcId: svcId} +} + +// Mode indicates the mode that the EndpointsCalculator is operating in. +func (l *ClusterL4ILBEndpointsCalculator) Mode() types.EndpointsCalculatorMode { + return types.L4ClusterMode +} + +// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. +func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(ep *v1.Endpoints, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) { + // In this mode, any of the cluster nodes can be part of the subset, whether or not a matching pod runs on it. + nodes, _ := l.nodeLister.ListWithPredicate(utils.GetNodeConditionPredicate()) + + nodeZoneMap := make(map[string][]*v1.Node) + for _, node := range nodes { + zone, err := l.zoneGetter.GetZoneForNode(node.Name) + if err != nil { + klog.Errorf("Unable to find zone for node %s, err %v, skipping", node.Name, err) + continue + } + nodeZoneMap[zone] = append(nodeZoneMap[zone], node) + } + numZones := len(nodeZoneMap) + // This value is always SubsetSizeLimit/numZones, in this mode. Passing in numEndpoints as 0 to avoid unnecessary + // calculation. + // If number of endpoints matter in the calculation, this can be changed to: + // perZoneCount := l.getPerZoneSubsetCount(numZones, utils.NumEndpoints(ep)) + perZoneCount := l.getPerZoneSubsetCount(numZones, 0) + // Compute the networkEndpoints, with endpointSet size in each zone being atmost `perZoneCount` in size + // TODO fix this logic to pick upto a total of l.SubsetSizeLimit if there are more than perZoneCount nodes in one + // zone and fewer in another. + subsetMap, err := getSubsetPerZone(nodeZoneMap, perZoneCount, l.svcId, currentMap) + return subsetMap, nil, err +} + +// getPerZoneSubsetCount returns the max size limit of each zonal NEG, given the number of zones and service endpoints. +func (l *ClusterL4ILBEndpointsCalculator) getPerZoneSubsetCount(numZones, numEndpoints int) int { + if numZones == 0 { + return 0 + } + // Use the static limit instead of making it proportional to service size. + // This will help minimize changes to the NEGs. Since NEG endpoints are picked at random in this mode, + // irrespective of service endpoints, using the static limit is ok. + return l.subsetSizeLimit / numZones +} + +// L7EndpointsCalculator implements methods to calculate Network endpoints for VM_IP_PORT NEGs +type L7EndpointsCalculator struct { + zoneGetter types.ZoneGetter + servicePortName string + podLister cache.Indexer + subsetLabels string + networkEndpointType types.NetworkEndpointType +} + +func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName, subsetLabels string, endpointType types.NetworkEndpointType) *L7EndpointsCalculator { + return &L7EndpointsCalculator{ + zoneGetter: zoneGetter, + servicePortName: svcPortName, + podLister: podLister, + subsetLabels: subsetLabels, + networkEndpointType: endpointType, + } +} + +// Mode indicates the mode that the EndpointsCalculator is operating in. +func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode { + return types.L7Mode +} + +// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. +func (l *L7EndpointsCalculator) CalculateEndpoints(ep *v1.Endpoints, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) { + return toZoneNetworkEndpointMap(ep, l.zoneGetter, l.servicePortName, l.podLister, l.subsetLabels, "") +} diff --git a/pkg/neg/syncers/endpoints_calculator_test.go b/pkg/neg/syncers/endpoints_calculator_test.go new file mode 100644 index 0000000000..195a457155 --- /dev/null +++ b/pkg/neg/syncers/endpoints_calculator_test.go @@ -0,0 +1,209 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncers + +import ( + "fmt" + "reflect" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + listers "k8s.io/client-go/listers/core/v1" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/legacy-cloud-providers/gce" +) + +// TestLocalGetEndpointSet verifies the GetEndpointSet method implemented by the LocalL4ILBEndpointsCalculator. +// The L7 implementation is tested in TestToZoneNetworkEndpointMapUtil. +func TestLocalGetEndpointSet(t *testing.T) { + t.Parallel() + _, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), false) + nodeNames := []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6} + for i := 0; i < len(nodeNames); i++ { + err := transactionSyncer.nodeLister.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + //Namespace: testServiceNamespace, + Name: nodeNames[i], + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: fmt.Sprintf("1.2.3.%d", i+1), + }, + }, + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }) + if err != nil { + t.Errorf("Failed to add node %s to syncer's nodeLister, err %v", nodeNames[i], err) + } + } + zoneGetter := negtypes.NewFakeZoneGetter() + nodeLister := listers.NewNodeLister(transactionSyncer.nodeLister) + + testCases := []struct { + desc string + endpoints *v1.Endpoints + endpointSets map[string]negtypes.NetworkEndpointSet + networkEndpointType negtypes.NetworkEndpointType + }{ + { + desc: "default endpoints", + endpoints: getDefaultEndpoint(), + // only 4 out of 6 nodes are picked since there are > 4 endpoints, but they are found only on 4 nodes. + endpointSets: map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}), + }, + networkEndpointType: negtypes.VmPrimaryIpEndpointType, + }, + { + desc: "no endpoints", + endpoints: &v1.Endpoints{}, + // No nodes are picked as there are no service endpoints. + endpointSets: nil, + networkEndpointType: negtypes.VmPrimaryIpEndpointType, + }, + } + svcKey := fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace) + ec := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey) + for _, tc := range testCases { + retSet, _, err := ec.CalculateEndpoints(tc.endpoints, nil) + if err != nil { + t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err) + } + if !reflect.DeepEqual(retSet, tc.endpointSets) { + t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, retSet) + } + } +} + +// TestClusterGetEndpointSet verifies the GetEndpointSet method implemented by the ClusterL4ILBEndpointsCalculator. +func TestClusterGetEndpointSet(t *testing.T) { + t.Parallel() + _, transactionSyncer := newL4ILBTestTransactionSyncer(negtypes.NewAdapter(gce.NewFakeGCECloud(gce.DefaultTestClusterValues())), true) + nodeNames := []string{testInstance1, testInstance2, testInstance3, testInstance4, testInstance5, testInstance6} + for i := 0; i < len(nodeNames); i++ { + err := transactionSyncer.nodeLister.Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeNames[i], + }, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + { + Type: v1.NodeInternalIP, + Address: fmt.Sprintf("1.2.3.%d", i+1), + }, + }, + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }) + if err != nil { + t.Errorf("Failed to add node %s to syncer's nodeLister, err %v", nodeNames[i], err) + } + } + zoneGetter := negtypes.NewFakeZoneGetter() + nodeLister := listers.NewNodeLister(transactionSyncer.nodeLister) + testCases := []struct { + desc string + endpoints *v1.Endpoints + endpointSets map[string]negtypes.NetworkEndpointSet + networkEndpointType negtypes.NetworkEndpointType + }{ + { + desc: "default endpoints", + endpoints: getDefaultEndpoint(), + // all nodes are picked since, in this mode, endpoints running do not need to run on the selected node. + endpointSets: map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}, + negtypes.NetworkEndpoint{IP: "1.2.3.5", Node: testInstance5}, negtypes.NetworkEndpoint{IP: "1.2.3.6", Node: testInstance6}), + }, + networkEndpointType: negtypes.VmPrimaryIpEndpointType, + }, + { + desc: "no endpoints", + // all nodes are picked since, in this mode, endpoints running do not need to run on the selected node. + // Even when there are no service endpoints, nodes are selected at random. + endpoints: &v1.Endpoints{}, + endpointSets: map[string]negtypes.NetworkEndpointSet{ + negtypes.TestZone1: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.1", Node: testInstance1}, negtypes.NetworkEndpoint{IP: "1.2.3.2", Node: testInstance2}), + negtypes.TestZone2: negtypes.NewNetworkEndpointSet(negtypes.NetworkEndpoint{IP: "1.2.3.3", Node: testInstance3}, negtypes.NetworkEndpoint{IP: "1.2.3.4", Node: testInstance4}, + negtypes.NetworkEndpoint{IP: "1.2.3.5", Node: testInstance5}, negtypes.NetworkEndpoint{IP: "1.2.3.6", Node: testInstance6}), + }, + networkEndpointType: negtypes.VmPrimaryIpEndpointType, + }, + } + svcKey := fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace) + ec := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey) + for _, tc := range testCases { + retSet, _, err := ec.CalculateEndpoints(tc.endpoints, nil) + if err != nil { + t.Errorf("For case %q, expect nil error, but got %v.", tc.desc, err) + } + if !reflect.DeepEqual(retSet, tc.endpointSets) { + t.Errorf("For case %q, expecting endpoint set %v, but got %v.", tc.desc, tc.endpointSets, retSet) + } + } +} + +// TestGetPerZoneSubsetCount verifies the perZoneSubsetCount method. +func TestGetPerZoneSubsetCount(t *testing.T) { + t.Parallel() + zoneCount := 3 + result := 0 + tcs := []struct { + desc string + randomize bool + startCount int + endCount int + expectedCount int + }{ + {desc: "start with endpoints, drop to none, ExternalTrafficPolicy:Local", startCount: 5, endCount: 0, expectedCount: 0}, + {desc: "no endpoints, ExternalTrafficPolicy:Local", startCount: 0, endCount: 0, expectedCount: 0}, + {desc: "valid endpoints increase, ExternalTrafficPolicy:Local", startCount: 5, endCount: 10, expectedCount: 10 / zoneCount}, + // If total number of nodes is less than the number of zones, per zone count will be 1. + {desc: "valid endpoints decrease, ExternalTrafficPolicy:Local", startCount: 5, endCount: 2, expectedCount: 1}, + {desc: "valid endpoints > limit, ExternalTrafficPolicy:Local", startCount: 5, endCount: 258, expectedCount: maxSubsetSizeLocal / zoneCount}, + {desc: "start with endpoints, drop to none, ExternalTrafficPolicy:Cluster", randomize: true, startCount: 5, endCount: 0, expectedCount: maxSubsetSizeDefault / zoneCount}, + {desc: "no endpoints, random true, ExternalTrafficPolicy:Cluster", randomize: true, startCount: 0, endCount: 0, expectedCount: maxSubsetSizeDefault / zoneCount}, + {desc: "valid endpoints increase, ExternalTrafficPolicy:Cluster", randomize: true, startCount: 5, endCount: 10, expectedCount: maxSubsetSizeDefault / zoneCount}, + {desc: "valid endpoints decrease, ExternalTrafficPolicy:Cluster", randomize: true, startCount: 5, endCount: 2, expectedCount: maxSubsetSizeDefault / zoneCount}, + } + for _, tc := range tcs { + if tc.randomize { + result = NewClusterL4ILBEndpointsCalculator(nil, nil, "test").getPerZoneSubsetCount(zoneCount, tc.endCount) + } else { + result = NewLocalL4ILBEndpointsCalculator(nil, nil, "test").getPerZoneSubsetCount(zoneCount, tc.endCount) + } + if result != tc.expectedCount { + t.Errorf("For test case '%s', expected subsetCount of %d, but got %d", tc.desc, tc.expectedCount, result) + } + } +} diff --git a/pkg/neg/syncers/subsets.go b/pkg/neg/syncers/subsets.go new file mode 100644 index 0000000000..831d17cda8 --- /dev/null +++ b/pkg/neg/syncers/subsets.go @@ -0,0 +1,139 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncers + +import ( + "crypto/sha256" + "encoding/hex" + "sort" + + "k8s.io/api/core/v1" + negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/ingress-gce/pkg/utils" +) + +const ( + // Max number of subsets in ExternalTrafficPolicy:Local + maxSubsetSizeLocal = 250 + // Max number of subsets in ExternalTrafficPolicy:Cluster, which is the default mode. + maxSubsetSizeDefault = 25 +) + +// NodeInfo stores node metadata used to sort nodes and pick a subset. +type NodeInfo struct { + // index stores the index of the given node in the input node list. This is useful to + // identify the node in the list after sorting. + index int + // hashedName is the sha256 hash of the given node name along with a salt. + hashedName string + // skip indicates if this node has already been selected in the subset and hence needs + // to be skipped. + skip bool +} + +func getHashedName(nodeName, salt string) string { + hashSum := sha256.Sum256([]byte(nodeName + ":" + salt)) + return hex.EncodeToString(hashSum[:]) +} + +// pickSubsetsMinRemovals ensures that there are no node removals from current subset unless the node no longer exists +// or the subset size has reduced. Subset size can reduce if a new zone got added in the cluster and the per-zone limit +// now reduces. +// This function takes a list of nodes, hash salt, count, current set and returns a subset of size - 'count'. +// If the input list is smaller than the desired subset count, the entire list is returned. The hash salt +// is used so that a different subset is returned even when the same node list is passed in, for a different salt value. +// It also keeps the subset relatively stable for the same service. +// Example 1 - Recalculate subset, subset size increase. +// nodes = [node1 node2 node3 node4 node5], Current subset - [node3, node2, node5], count 4 +// sorted list is [node3 node2 node5 node4 node1] +// Output [node3, node2, node5, node4] - No removals in existing subset. +// --------------------------------------------------------------------------------------------------------- +// Example 2 - Recalculate subset, new node got added. +// nodes = [node1 node2 node3 node4 node5, node6], Current subset - [node3, node2, node5, node4], count 4 +// sorted list is [node3 node6 node2 node5 node4 node1] +// Output [node3, node2, node5, node4] - No removals in existing subset even though node6 shows up at a lower index +// in the sorted list. +// --------------------------------------------------------------------------------------------------------- +// Example 2 - Recalculate subset, node3 got removed. +// nodes = [node1 node2 node4 node5, node6], Current subset - [node3, node2, node5, node4], count 4 +// sorted list is [node6 node2 node5 node4 node1] +// Output [node2, node5, node4 node6] +func pickSubsetsMinRemovals(nodes []*v1.Node, salt string, count int, current []negtypes.NetworkEndpoint) []*v1.Node { + if len(nodes) < count { + return nodes + } + subset := make([]*v1.Node, 0, count) + info := make([]*NodeInfo, len(nodes)) + // Generate hashed names for all cluster nodes and sort them alphabetically, based on the hashed string. + for i, node := range nodes { + info[i] = &NodeInfo{i, getHashedName(node.Name, salt), false} + } + sort.Slice(info, func(i, j int) bool { + return info[i].hashedName < info[j].hashedName + }) + // Pick all nodes from existing subset if still available. + for _, ep := range current { + curHashName := getHashedName(ep.Node, salt) + for _, nodeInfo := range info { + if nodeInfo.hashedName == curHashName { + subset = append(subset, nodes[nodeInfo.index]) + nodeInfo.skip = true + } else if nodeInfo.hashedName > curHashName { + break + } + } + } + if len(subset) >= count { + // trim the subset to the given subset size, remove extra nodes. + subset = subset[:count] + return subset + } + for _, val := range info { + if val.skip { + // This node was already picked as it is part of the current subset. + continue + } + subset = append(subset, nodes[val.index]) + if len(subset) == count { + break + } + } + return subset +} + +// getSubsetPerZone creates a subset of nodes from the given list of nodes, for each zone provided. +// The output is a map of zone string to NEG subset. +func getSubsetPerZone(nodesPerZone map[string][]*v1.Node, perZoneCount int, svcID string, currentMap map[string]negtypes.NetworkEndpointSet) (map[string]negtypes.NetworkEndpointSet, error) { + result := make(map[string]negtypes.NetworkEndpointSet) + var currentList []negtypes.NetworkEndpoint + + for zone, nodes := range nodesPerZone { + result[zone] = negtypes.NewNetworkEndpointSet() + if currentMap != nil { + if zset, ok := currentMap[zone]; ok && zset != nil { + currentList = zset.List() + } else { + currentList = nil + } + } + subset := pickSubsetsMinRemovals(nodes, svcID, perZoneCount, currentList) + for _, node := range subset { + result[zone].Insert(negtypes.NetworkEndpoint{Node: node.Name, IP: utils.GetNodePrimaryIP(node)}) + } + } + return result, nil +} diff --git a/pkg/neg/syncers/subsets_test.go b/pkg/neg/syncers/subsets_test.go new file mode 100644 index 0000000000..3b1f252e4b --- /dev/null +++ b/pkg/neg/syncers/subsets_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package syncers + +import ( + "k8s.io/ingress-gce/pkg/neg/types" + "strings" + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestBasicSubset(t *testing.T) { + t.Parallel() + nodes := []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node986"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node25"}}, + } + count := 3 + subset1 := pickSubsetsMinRemovals(nodes, "svc123", count, nil) + if len(subset1) < 3 { + t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset1), subset1) + } + if !validateSubset(subset1, nodes) { + t.Errorf("Invalid subset list %v from %v", subset1, nodes) + } + subset2 := pickSubsetsMinRemovals(nodes, "svc345", count, nil) + subset3 := pickSubsetsMinRemovals(nodes, "svc56", count, nil) + t.Logf("Subset2 is %s", nodeNames(subset2)) + t.Logf("Subset3 is %s", nodeNames(subset3)) + if isIdentical(subset1, subset2) || isIdentical(subset3, subset2) || isIdentical(subset1, subset3) { + t.Errorf("2 out of 3 subsets are identical") + } +} + +func TestEmptyNodes(t *testing.T) { + t.Parallel() + count := 3 + subset1 := pickSubsetsMinRemovals(nil, "svc123", count, nil) + if len(subset1) != 0 { + t.Errorf("Expected empty subset, got - %s", nodeNames(subset1)) + } +} + +// Tests the case where there are fewer nodes than subsets +func TestFewerNodes(t *testing.T) { + t.Parallel() + nodes := []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node986"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node25"}}, + } + count := 10 + subset1 := pickSubsetsMinRemovals(nodes, "svc123", count, nil) + if len(subset1) != len(nodes) { + t.Errorf("Expected subset of length %d, got %d, subsets - %s", len(nodes), len(subset1), nodeNames(subset1)) + } + if !isIdentical(nodes, subset1) { + t.Errorf("Subset list is different from list of nodes, subsets - %s", nodeNames(subset1)) + } +} + +func TestNoRemovals(t *testing.T) { + t.Parallel() + nodes := []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node986"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node25"}}, + } + count := 5 + subset1 := pickSubsetsMinRemovals(nodes, "svc123", count, nil) + if len(subset1) < 5 { + t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset1), subset1) + } + // nodeName abcd shows up 2nd in the sorted list for the given salt. So picking a subset of 5 will remove one of the + // existing nodes. + nodes = append(nodes, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node:abcd"}}) + subset2 := pickSubsetsMinRemovals(nodes, "svc123", count, nil) + if len(subset2) < 5 { + t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset2), subset2) + } + if isIdentical(subset1, subset2) { + t.Errorf("Got identical subsets %+v", subset1) + } + existingEp := []types.NetworkEndpoint{} + for _, node := range subset1 { + existingEp = append(existingEp, types.NetworkEndpoint{Node: node.Name}) + } + subset3 := pickSubsetsMinRemovals(nodes, "svc123", count, existingEp) + if len(subset3) < 5 { + t.Errorf("Expected %d subsets, got only %d - %v", count, len(subset3), subset3) + } + if !isIdentical(subset1, subset3) { + t.Errorf("Got subsets %+v and %+v, expected identical subsets %+v", subset1, subset3, subset1) + } +} + +func validateSubset(subset []*v1.Node, nodes []*v1.Node) bool { + for _, val := range subset { + found := false + for _, node := range nodes { + if val == node { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func nodeNames(subset []*v1.Node) string { + names := []string{} + for _, n := range subset { + names = append(names, n.Name) + } + return strings.Join(names, " ") +} + +func isIdentical(subset1, subset2 []*v1.Node) bool { + foundCount := 0 + if len(subset1) != len(subset2) { + return false + } + for _, node1 := range subset1 { + found := false + for _, node2 := range subset2 { + if node1 == node2 { + found = true + break + } + } + if found { + foundCount = foundCount + 1 + } + } + return foundCount == len(subset1) +} diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 32f70acf6c..f0a1fd504d 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -24,12 +24,14 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" + listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/ingress-gce/pkg/composite" "k8s.io/ingress-gce/pkg/neg/readiness" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/klog" + "strings" ) type transactionSyncer struct { @@ -51,12 +53,14 @@ type transactionSyncer struct { // transactions stores each transaction transactions networkEndpointTransactionTable - podLister cache.Indexer - serviceLister cache.Indexer - endpointLister cache.Indexer - recorder record.EventRecorder - cloud negtypes.NetworkEndpointGroupCloud - zoneGetter negtypes.ZoneGetter + podLister cache.Indexer + serviceLister cache.Indexer + endpointLister cache.Indexer + nodeLister cache.Indexer + recorder record.EventRecorder + cloud negtypes.NetworkEndpointGroupCloud + zoneGetter negtypes.ZoneGetter + endpointsCalculator negtypes.NetworkEndpointsCalculator // retry handles back off retry for NEG API operations retry retryHandler @@ -65,20 +69,22 @@ type transactionSyncer struct { reflector readiness.Reflector } -func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, reflector readiness.Reflector) negtypes.NegSyncer { +func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, nodeLister cache.Indexer, reflector readiness.Reflector, epc negtypes.NetworkEndpointsCalculator) negtypes.NegSyncer { // TransactionSyncer implements the syncer core ts := &transactionSyncer{ - NegSyncerKey: negSyncerKey, - negName: networkEndpointGroupName, - needInit: true, - transactions: NewTransactionTable(), - podLister: podLister, - serviceLister: serviceLister, - endpointLister: endpointLister, - recorder: recorder, - cloud: cloud, - zoneGetter: zoneGetter, - reflector: reflector, + NegSyncerKey: negSyncerKey, + negName: networkEndpointGroupName, + needInit: true, + transactions: NewTransactionTable(), + nodeLister: nodeLister, + podLister: podLister, + serviceLister: serviceLister, + endpointLister: endpointLister, + recorder: recorder, + cloud: cloud, + zoneGetter: zoneGetter, + endpointsCalculator: epc, + reflector: reflector, } // Syncer implements life cycle logic syncer := newSyncer(negSyncerKey, networkEndpointGroupName, serviceLister, recorder, ts) @@ -88,6 +94,19 @@ func NewTransactionSyncer(negSyncerKey negtypes.NegSyncerKey, networkEndpointGro return syncer } +func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, syncerKey negtypes.NegSyncerKey, randomizeEndpoints bool) negtypes.NetworkEndpointsCalculator { + serviceKey := strings.Join([]string{syncerKey.Name, syncerKey.Namespace}, "/") + if syncerKey.NegType == negtypes.VmPrimaryIpEndpointType { + nodeLister := listers.NewNodeLister(nodeLister) + if randomizeEndpoints { + return NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, serviceKey) + } + return NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, serviceKey) + } + return NewL7EndpointsCalculator(zoneGetter, podLister, syncerKey.PortTuple.Name, + syncerKey.SubsetLabels, syncerKey.NegType) +} + func (s *transactionSyncer) sync() error { err := s.syncInternal() if err != nil { @@ -111,7 +130,8 @@ func (s *transactionSyncer) syncInternal() error { klog.V(4).Infof("Skip syncing NEG %q for %s.", s.negName, s.NegSyncerKey.String()) return nil } - klog.V(2).Infof("Sync NEG %q for %s.", s.negName, s.NegSyncerKey.String()) + klog.V(2).Infof("Sync NEG %q for %s, Endpoints Calculator mode %s", s.negName, + s.NegSyncerKey.String(), s.endpointsCalculator.Mode()) ep, exists, err := s.endpointLister.Get( &apiv1.Endpoints{ @@ -130,21 +150,22 @@ func (s *transactionSyncer) syncInternal() error { return nil } - targetMap, endpointPodMap, err := toZoneNetworkEndpointMap(ep.(*apiv1.Endpoints), s.zoneGetter, s.PortTuple.Name, s.podLister, s.NegSyncerKey.SubsetLabels, s.NegSyncerKey.NegType) - if err != nil { - return err - } - currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.negName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion()) if err != nil { return err } - // Merge the current state from cloud with the transaction table together // The combined state represents the eventual result when all transactions completed mergeTransactionIntoZoneEndpointMap(currentMap, s.transactions) + + targetMap, endpointPodMap, err := s.endpointsCalculator.CalculateEndpoints(ep.(*apiv1.Endpoints), currentMap) + // Calculate the endpoints to add and delete to transform the current state to desire state addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap) + if s.NegType == negtypes.VmPrimaryIpEndpointType && len(removeEndpoints) > 0 { + // Make removals minimum since the traffic will be abruptly stopped. Log removals + klog.V(3).Infof("Removing endpoints %+v from GCE_VM_PRIMARY_IP NEG %s", removeEndpoints, s.negName) + } // Calculate Pods that are already in the NEG _, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap) // Filter out the endpoints with existing transaction @@ -156,6 +177,7 @@ func (s *transactionSyncer) syncInternal() error { // filter out the endpoints that are in transaction filterEndpointByTransaction(committedEndpoints, s.transactions) + // no-op in case of VmPrimaryIp NEGs. s.commitPods(committedEndpoints, endpointPodMap) if len(addEndpoints) == 0 && len(removeEndpoints) == 0 { diff --git a/pkg/neg/syncers/transaction_test.go b/pkg/neg/syncers/transaction_test.go index 649199e1a0..d771e1f631 100644 --- a/pkg/neg/syncers/transaction_test.go +++ b/pkg/neg/syncers/transaction_test.go @@ -47,6 +47,8 @@ const ( testZone2 = "zone2" testInstance3 = "instance3" testInstance4 = "instance4" + testInstance5 = "instance5" + testInstance6 = "instance6" testNamespace = "ns" testService = "svc" ) @@ -826,6 +828,12 @@ func TestCommitPods(t *testing.T) { } } +func newL4ILBTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, randomize bool) (negtypes.NegSyncer, *transactionSyncer) { + negsyncer, ts := newTestTransactionSyncer(fakeGCE, negtypes.VmPrimaryIpEndpointType) + ts.endpointsCalculator = GetEndpointsCalculator(ts.nodeLister, ts.podLister, ts.zoneGetter, ts.NegSyncerKey, randomize) + return negsyncer, ts +} + func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negType negtypes.NetworkEndpointType) (negtypes.NegSyncer, *transactionSyncer) { kubeClient := fake.NewSimpleClientset() backendConfigClient := backendconfigclient.NewSimpleClientset() @@ -848,6 +856,7 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp if negType == negtypes.VmPrimaryIpEndpointType { svcPort.PortTuple.Port = 0 svcPort.PortTuple.TargetPort = "" + svcPort.PortTuple.Name = string(negtypes.VmPrimaryIpEndpointType) } // TODO(freehan): use real readiness reflector @@ -861,7 +870,10 @@ func newTestTransactionSyncer(fakeGCE negtypes.NetworkEndpointGroupCloud, negTyp context.PodInformer.GetIndexer(), context.ServiceInformer.GetIndexer(), context.EndpointInformer.GetIndexer(), - reflector) + context.NodeInformer.GetIndexer(), + reflector, + GetEndpointsCalculator(context.NodeInformer.GetIndexer(), context.PodInformer.GetIndexer(), negtypes.NewFakeZoneGetter(), + svcPort, false)) transactionSyncer := negsyncer.(*syncer).core.(*transactionSyncer) return negsyncer, transactionSyncer } diff --git a/pkg/neg/types/fakes.go b/pkg/neg/types/fakes.go index e6b22ebe58..21afc728b5 100644 --- a/pkg/neg/types/fakes.go +++ b/pkg/neg/types/fakes.go @@ -34,6 +34,8 @@ const ( TestInstance2 = "instance2" TestInstance3 = "instance3" TestInstance4 = "instance4" + TestInstance5 = "instance5" + TestInstance6 = "instance6" ) type fakeZoneGetter struct { @@ -44,7 +46,7 @@ func NewFakeZoneGetter() *fakeZoneGetter { return &fakeZoneGetter{ zoneInstanceMap: map[string]sets.String{ TestZone1: sets.NewString(TestInstance1, TestInstance2), - TestZone2: sets.NewString(TestInstance3, TestInstance4), + TestZone2: sets.NewString(TestInstance3, TestInstance4, TestInstance5, TestInstance6), }, } } diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index c092c205df..80810c5b14 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -18,6 +18,7 @@ package types import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "k8s.io/api/core/v1" "k8s.io/ingress-gce/pkg/composite" ) @@ -44,6 +45,7 @@ type NetworkEndpointGroupCloud interface { // NetworkEndpointGroupNamer is an interface for generating network endpoint group name. type NetworkEndpointGroupNamer interface { NEG(namespace, name string, port int32) string + PrimaryIPNEG(namespace, name string) string NEGWithSubset(namespace, name, subset string, port int32) string IsNEG(name string) bool } @@ -71,8 +73,18 @@ type NegSyncerManager interface { StopSyncer(namespace, name string) // Sync signals all syncers related to the service to sync. This call is asynchronous. Sync(namespace, name string) + // SyncNodes signals all syncers watching nodes to sync. This call is asynchronous. + SyncNodes() // GC garbage collects network endpoint group and syncers GC() error // ShutDown shuts down the manager ShutDown() } + +type NetworkEndpointsCalculator interface { + // CalculateEndpoints computes the NEG endpoints based on service endpoints and the current NEG state and returns a + // map of zone name to network endpoint set + CalculateEndpoints(ep *v1.Endpoints, currentMap map[string]NetworkEndpointSet) (map[string]NetworkEndpointSet, EndpointPodMap, error) + // Mode indicates the mode that the EndpointsCalculator is operating in. + Mode() EndpointsCalculatorMode +} diff --git a/pkg/neg/types/network_endpoint.go b/pkg/neg/types/network_endpoint.go index c66723ed2d..49595c7958 100644 --- a/pkg/neg/types/network_endpoint.go +++ b/pkg/neg/types/network_endpoint.go @@ -96,6 +96,9 @@ func (s NetworkEndpointSet) HasAny(items ...NetworkEndpoint) bool { // s1.Difference(s2) = {a3} // s2.Difference(s1) = {a4, a5} func (s NetworkEndpointSet) Difference(s2 NetworkEndpointSet) NetworkEndpointSet { + if s2 == nil { + return s + } result := NewNetworkEndpointSet() for key := range s { if !s2.Has(key) { @@ -130,6 +133,9 @@ func (s1 NetworkEndpointSet) Union(s2 NetworkEndpointSet) NetworkEndpointSet { func (s1 NetworkEndpointSet) Intersection(s2 NetworkEndpointSet) NetworkEndpointSet { var walk, other NetworkEndpointSet result := NewNetworkEndpointSet() + if s2 == nil { + return result + } if s1.Len() < s2.Len() { walk = s1 other = s2 diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index f4d6349232..d058122bb5 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -32,11 +32,15 @@ import ( ) type NetworkEndpointType string +type EndpointsCalculatorMode string const ( VmIpPortEndpointType = NetworkEndpointType("GCE_VM_IP_PORT") VmPrimaryIpEndpointType = NetworkEndpointType("GCE_VM_PRIMARY_IP") NonGCPPrivateEndpointType = NetworkEndpointType("NON_GCP_PRIVATE_IP_PORT") + L7Mode = EndpointsCalculatorMode("L7") + L4LocalMode = EndpointsCalculatorMode("L4, ExternalTrafficPolicy:Local") + L4ClusterMode = EndpointsCalculatorMode("L4, ExternalTrafficPolicy:Cluster") ) // SvcPortTuple is the tuple representing one service port @@ -50,6 +54,10 @@ type SvcPortTuple struct { TargetPort string } +func (t SvcPortTuple) Empty() bool { + return t.Port == 0 && t.Name == "" && t.TargetPort == "" +} + // String returns the string representation of SvcPortTuple func (t SvcPortTuple) String() string { return fmt.Sprintf("%s/%v-%s", t.Name, t.Port, t.TargetPort) @@ -100,6 +108,10 @@ type PortInfo struct { // This is enabled with service port is reference by ingress. // If the service port is only exposed as stand alone NEG, it should not be enbled. ReadinessGate bool + // RandomizeEndpoints indicates if the endpoints for the NEG associated with this port need to + // be selected at random, rather than selecting the endpoints of this service. This is applicable + // in GCE_VM_PRIMARY_IP where the endpoints are the nodes instead of pods. + RandomizeEndpoints bool } // PortInfoMapKey is the Key of PortInfoMap @@ -126,6 +138,25 @@ func NewPortInfoMap(namespace, name string, svcPortTupleSet SvcPortTupleSet, nam return ret } +// NewPortInfoMapForPrimaryIPNEG creates PortInfoMap with empty port tuple. Since PRIMARY_VM_IP NEGs target +// the node instead of the pod, there is no port info to be stored. +func NewPortInfoMapForPrimaryIPNEG(namespace, name string, namer NetworkEndpointGroupNamer, randomize bool) PortInfoMap { + ret := PortInfoMap{} + svcPortSet := make(SvcPortTupleSet) + svcPortSet.Insert( + // Insert Empty PortTuple for VmPrimaryIp NEGs. + SvcPortTuple{}, + ) + for svcPortTuple := range svcPortSet { + ret[PortInfoMapKey{svcPortTuple.Port, ""}] = PortInfo{ + PortTuple: svcPortTuple, + NegName: namer.PrimaryIPNEG(namespace, name), + RandomizeEndpoints: randomize, + } + } + return ret +} + // NewPortInfoMapWithDestinationRule create PortInfoMap based on a gaven DesinationRule. // Return error message if the DestinationRule contains duplicated subsets. func NewPortInfoMapWithDestinationRule(namespace, name string, svcPortTupleSet SvcPortTupleSet, namer NetworkEndpointGroupNamer, readinessGate bool, @@ -157,6 +188,8 @@ func NewPortInfoMapWithDestinationRule(namespace, name string, svcPortTupleSet S // It assumes the same key (service port) will have the same target port and negName // If not, it will throw error // If a key in p1 or p2 has readiness gate enabled, the merged port info will also has readiness gate enabled +// If a key in p1 or p2 has randomize endpoints enabled, the merged port info will also has randomize endpoints enabled. +// This field is only applicable for VMPrimaryIP NEGs. func (p1 PortInfoMap) Merge(p2 PortInfoMap) error { var err error for mapKey, portInfo := range p2 { @@ -171,12 +204,16 @@ func (p1 PortInfoMap) Merge(p2 PortInfoMap) error { if existingPortInfo.Subset != portInfo.Subset { return fmt.Errorf("for service port %v, Subset name in existing map is %q, but the merge map has %q", mapKey, existingPortInfo.Subset, portInfo.Subset) } + if existingPortInfo.RandomizeEndpoints != portInfo.RandomizeEndpoints { + return fmt.Errorf("For service port %v, Existing map has RandomizeEndpoints %v, but the merge map has %v", mapKey, existingPortInfo.RandomizeEndpoints, portInfo.RandomizeEndpoints) + } mergedInfo.ReadinessGate = existingPortInfo.ReadinessGate } mergedInfo.PortTuple = portInfo.PortTuple mergedInfo.NegName = portInfo.NegName // Turn on the readiness gate if one of them is on mergedInfo.ReadinessGate = mergedInfo.ReadinessGate || portInfo.ReadinessGate + mergedInfo.RandomizeEndpoints = portInfo.RandomizeEndpoints mergedInfo.Subset = portInfo.Subset mergedInfo.SubsetLabels = portInfo.SubsetLabels @@ -252,7 +289,7 @@ type NegSyncerKey struct { } func (key NegSyncerKey) String() string { - return fmt.Sprintf("%s/%s-%s-%s", key.Namespace, key.Name, key.Subset, key.PortTuple.String()) + return fmt.Sprintf("%s/%s-%s-%s-%s", key.Namespace, key.Name, key.Subset, key.PortTuple.String(), string(key.NegType)) } // GetAPIVersion returns the compute API version to be used in order diff --git a/pkg/neg/types/types_test.go b/pkg/neg/types/types_test.go index beed3b389a..ea350895bd 100644 --- a/pkg/neg/types/types_test.go +++ b/pkg/neg/types/types_test.go @@ -32,6 +32,10 @@ func (*negNamer) NEG(namespace, name string, svcPort int32) string { return fmt.Sprintf("%v-%v-%v", namespace, name, svcPort) } +func (*negNamer) PrimaryIPNEG(namespace, name string) string { + return fmt.Sprintf("%v-%v", namespace, name) +} + func (*negNamer) NEGWithSubset(namespace, name, subset string, svcPort int32) string { return fmt.Sprintf("%v-%v-%v-%v", namespace, name, subset, svcPort) } diff --git a/pkg/utils/common/finalizer.go b/pkg/utils/common/finalizer.go index 78984c7708..4c23506baf 100644 --- a/pkg/utils/common/finalizer.go +++ b/pkg/utils/common/finalizer.go @@ -29,6 +29,11 @@ const ( // FinalizerKeyV2 is the string representing the Ingress finalizer version. // Ingress with V2 finalizer uses V2 frontend naming scheme. FinalizerKeyV2 = "networking.gke.io/ingress-finalizer-V2" + // FinalizerKeyL4 is the string representing the L4 ILB controller finalizer in this repo. + FinalizerKeyL4 = "networking.gke.io/l4-ilb-v2" + // FinalizerKeyL4V1 is the string representing the service controller finalizer. A service with this finalizer + // is managed by k/k service controller. + FinalizerKeyL4V1 = "networking.gke.io/l4-ilb-v1" ) // IsDeletionCandidate is true if the passed in meta contains an ingress finalizer. diff --git a/pkg/utils/namer/namer.go b/pkg/utils/namer/namer.go index 6c3b689dfe..baa6221676 100644 --- a/pkg/utils/namer/namer.go +++ b/pkg/utils/namer/namer.go @@ -463,6 +463,25 @@ func (n *Namer) NEGWithSubset(namespace, name, subset string, port int32) string return fmt.Sprintf("%s-%s-%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, truncPort, truncSubset, negSuffix(n.shortUID(), namespace, name, portStr, subset)) } +// PrimaryIPNEG returns the gce neg name based on the service namespace and name +// NEG naming convention: +// +// {prefix}{version}-{clusterid}-{namespace}-{name}-{hash} +// +// Output name is at most 63 characters. NEG tries to keep as much +// information as possible. +// +// WARNING: Controllers depend on the naming pattern to get the list +// of all NEGs associated with the current cluster. Any modifications +// must be backward compatible. +func (n *Namer) PrimaryIPNEG(namespace, name string) string { + truncFields := TrimFieldsEvenly(maxNEGDescriptiveLabel, namespace, name) + truncNamespace := truncFields[0] + truncName := truncFields[1] + // Use the full cluster UID in the suffix to reduce chance of collision. + return fmt.Sprintf("%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, negSuffix(n.UID(), namespace, name, "", "")) +} + // IsNEG returns true if the name is a NEG owned by this cluster. // It checks that the UID is present and a substring of the // cluster uid, since the NEG naming schema truncates it to 8 characters. diff --git a/pkg/utils/serviceport.go b/pkg/utils/serviceport.go index 710e08f1b0..842538dd15 100644 --- a/pkg/utils/serviceport.go +++ b/pkg/utils/serviceport.go @@ -59,7 +59,7 @@ type ServicePort struct { // for creating NEGs associated with the given ServicePort. func GetAPIVersionFromServicePort(sp *ServicePort) meta.Version { if sp == nil { - // this uses VM_PRIMARY_IP_NEGS which requires alpha API + // this uses GCE_VM_PRIMARY_IP NEGS which requires alpha API return meta.VersionAlpha } return meta.VersionGA diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 3e73a064e1..62f2bf28ac 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -36,6 +36,7 @@ import ( "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils/common" "k8s.io/klog" + "k8s.io/kubernetes/pkg/util/node" ) const ( @@ -350,6 +351,15 @@ func GetNodeConditionPredicate() listers.NodeConditionPredicate { } } +// GetNodePrimaryIP returns a primary internal IP address of the node. +func GetNodePrimaryIP(inputNode *api_v1.Node) string { + ip, err := node.GetPreferredNodeAddress(inputNode, []api_v1.NodeAddressType{api_v1.NodeInternalIP}) + if err != nil { + klog.Errorf("Failed to get IP address for node %s", inputNode.Name) + } + return ip +} + // NewNamespaceIndexer returns a new Indexer for use by SharedIndexInformers func NewNamespaceIndexer() cache.Indexers { return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} @@ -410,3 +420,22 @@ func HasVIP(ing *v1beta1.Ingress) bool { } return true } + +// NumEndpoints returns the count of endpoints in the given endpoints object. +func NumEndpoints(ep *api_v1.Endpoints) (result int) { + for _, subset := range ep.Subsets { + result = result + len(subset.Addresses)*len(subset.Ports) + } + return result +} + +// IsLegacyL4ILBService returns true if the given LoadBalancer service is managed by service controller. +func IsLegacyL4ILBService(svc *api_v1.Service) bool { + for _, key := range svc.ObjectMeta.Finalizers { + if key == common.FinalizerKeyL4V1 { + // service has v1 finalizer, this is handled by service controller code. + return true + } + } + return false +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index a979ac5f32..6541e6fca7 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils/common" @@ -30,6 +30,7 @@ import ( "k8s.io/api/networking/v1beta1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/legacy-cloud-providers/gce" ) func TestResourcePath(t *testing.T) { @@ -719,3 +720,65 @@ func TestHasVIP(t *testing.T) { }) } } + +func TestGetNodePrimaryIP(t *testing.T) { + t.Parallel() + internalIP := "1.2.3.4" + node := &api_v1.Node{ + Status: api_v1.NodeStatus{ + Addresses: []api_v1.NodeAddress{ + { + Type: api_v1.NodeInternalIP, + Address: internalIP, + }, + }, + }, + } + out := GetNodePrimaryIP(node) + if out != internalIP { + t.Errorf("Expected Primary IP %s, got %s", internalIP, out) + } + + node = &api_v1.Node{ + Status: api_v1.NodeStatus{ + Addresses: []api_v1.NodeAddress{ + { + Type: api_v1.NodeExternalIP, + Address: "11.12.13.14", + }, + }, + }, + } + out = GetNodePrimaryIP(node) + if out != "" { + t.Errorf("Expected Primary IP '', got %s", out) + } +} + +func TestIsLegacyL4ILBService(t *testing.T) { + t.Parallel() + svc := &api_v1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: "testsvc", + Namespace: "default", + Annotations: map[string]string{gce.ServiceAnnotationLoadBalancerType: string(gce.LBTypeInternal)}, + Finalizers: []string{common.FinalizerKeyL4V1}, + }, + Spec: api_v1.ServiceSpec{ + Type: api_v1.ServiceTypeLoadBalancer, + Ports: []api_v1.ServicePort{ + {Name: "testport", Port: int32(80)}, + }, + }, + } + if !IsLegacyL4ILBService(svc) { + t.Errorf("Expected True for Legacy service %s, got False", svc.Name) + } + + // Remove the finalizer and ensure the check returns False. + svc.ObjectMeta.Finalizers = nil + if IsLegacyL4ILBService(svc) { + t.Errorf("Expected False for Legacy service %s, got True", svc.Name) + } + +} diff --git a/vendor/k8s.io/kubernetes/pkg/util/node/BUILD b/vendor/k8s.io/kubernetes/pkg/util/node/BUILD new file mode 100644 index 0000000000..08692c148a --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/util/node/BUILD @@ -0,0 +1,45 @@ +package(default_visibility = ["//visibility:public"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_library( + name = "go_default_library", + srcs = ["node.go"], + importpath = "k8s.io/kubernetes/pkg/util/node", + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["node_test.go"], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/vendor/k8s.io/kubernetes/pkg/util/node/node.go b/vendor/k8s.io/kubernetes/pkg/util/node/node.go new file mode 100644 index 0000000000..087a0bc82d --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/util/node/node.go @@ -0,0 +1,213 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package node + +import ( + "encoding/json" + "fmt" + "net" + "os" + "strings" + "time" + + "k8s.io/klog" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" + clientset "k8s.io/client-go/kubernetes" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" +) + +const ( + // NodeUnreachablePodReason is the reason on a pod when its state cannot be confirmed as kubelet is unresponsive + // on the node it is (was) running. + NodeUnreachablePodReason = "NodeLost" + // NodeUnreachablePodMessage is the message on a pod when its state cannot be confirmed as kubelet is unresponsive + // on the node it is (was) running. + NodeUnreachablePodMessage = "Node %v which was running pod %v is unresponsive" +) + +// GetHostname returns OS's hostname if 'hostnameOverride' is empty; otherwise, return 'hostnameOverride'. +func GetHostname(hostnameOverride string) (string, error) { + hostName := hostnameOverride + if len(hostName) == 0 { + nodeName, err := os.Hostname() + if err != nil { + return "", fmt.Errorf("couldn't determine hostname: %v", err) + } + hostName = nodeName + } + + // Trim whitespaces first to avoid getting an empty hostname + // For linux, the hostname is read from file /proc/sys/kernel/hostname directly + hostName = strings.TrimSpace(hostName) + if len(hostName) == 0 { + return "", fmt.Errorf("empty hostname is invalid") + } + return strings.ToLower(hostName), nil +} + +// NoMatchError is a typed implementation of the error interface. It indicates a failure to get a matching Node. +type NoMatchError struct { + addresses []v1.NodeAddress +} + +// Error is the implementation of the conventional interface for +// representing an error condition, with the nil value representing no error. +func (e *NoMatchError) Error() string { + return fmt.Sprintf("no preferred addresses found; known addresses: %v", e.addresses) +} + +// GetPreferredNodeAddress returns the address of the provided node, using the provided preference order. +// If none of the preferred address types are found, an error is returned. +func GetPreferredNodeAddress(node *v1.Node, preferredAddressTypes []v1.NodeAddressType) (string, error) { + for _, addressType := range preferredAddressTypes { + for _, address := range node.Status.Addresses { + if address.Type == addressType { + return address.Address, nil + } + } + } + return "", &NoMatchError{addresses: node.Status.Addresses} +} + +// GetNodeHostIP returns the provided node's IP, based on the priority: +// 1. NodeInternalIP +// 2. NodeExternalIP +func GetNodeHostIP(node *v1.Node) (net.IP, error) { + addresses := node.Status.Addresses + addressMap := make(map[v1.NodeAddressType][]v1.NodeAddress) + for i := range addresses { + addressMap[addresses[i].Type] = append(addressMap[addresses[i].Type], addresses[i]) + } + if addresses, ok := addressMap[v1.NodeInternalIP]; ok { + return net.ParseIP(addresses[0].Address), nil + } + if addresses, ok := addressMap[v1.NodeExternalIP]; ok { + return net.ParseIP(addresses[0].Address), nil + } + return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses) +} + +// GetNodeIP returns the ip of node with the provided hostname +func GetNodeIP(client clientset.Interface, hostname string) net.IP { + var nodeIP net.IP + node, err := client.CoreV1().Nodes().Get(hostname, metav1.GetOptions{}) + if err != nil { + klog.Warningf("Failed to retrieve node info: %v", err) + return nil + } + nodeIP, err = GetNodeHostIP(node) + if err != nil { + klog.Warningf("Failed to retrieve node IP: %v", err) + return nil + } + return nodeIP +} + +// GetZoneKey is a helper function that builds a string identifier that is unique per failure-zone; +// it returns empty-string for no zone. +func GetZoneKey(node *v1.Node) string { + labels := node.Labels + if labels == nil { + return "" + } + + region, _ := labels[v1.LabelZoneRegion] + failureDomain, _ := labels[v1.LabelZoneFailureDomain] + + if region == "" && failureDomain == "" { + return "" + } + + // We include the null character just in case region or failureDomain has a colon + // (We do assume there's no null characters in a region or failureDomain) + // As a nice side-benefit, the null character is not printed by fmt.Print or glog + return region + ":\x00:" + failureDomain +} + +// SetNodeCondition updates specific node condition with patch operation. +func SetNodeCondition(c clientset.Interface, node types.NodeName, condition v1.NodeCondition) error { + generatePatch := func(condition v1.NodeCondition) ([]byte, error) { + raw, err := json.Marshal(&[]v1.NodeCondition{condition}) + if err != nil { + return nil, err + } + return []byte(fmt.Sprintf(`{"status":{"conditions":%s}}`, raw)), nil + } + condition.LastHeartbeatTime = metav1.NewTime(time.Now()) + patch, err := generatePatch(condition) + if err != nil { + return nil + } + _, err = c.CoreV1().Nodes().PatchStatus(string(node), patch) + return err +} + +// PatchNodeCIDR patches the specified node's CIDR to the given value. +func PatchNodeCIDR(c clientset.Interface, node types.NodeName, cidr string) error { + raw, err := json.Marshal(cidr) + if err != nil { + return fmt.Errorf("failed to json.Marshal CIDR: %v", err) + } + + patchBytes := []byte(fmt.Sprintf(`{"spec":{"podCIDR":%s}}`, raw)) + + if _, err := c.CoreV1().Nodes().Patch(string(node), types.StrategicMergePatchType, patchBytes); err != nil { + return fmt.Errorf("failed to patch node CIDR: %v", err) + } + return nil +} + +// PatchNodeStatus patches node status. +func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) { + patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode) + if err != nil { + return nil, nil, err + } + + updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") + if err != nil { + return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) + } + return updatedNode, patchBytes, nil +} + +func preparePatchBytesforNodeStatus(nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) ([]byte, error) { + oldData, err := json.Marshal(oldNode) + if err != nil { + return nil, fmt.Errorf("failed to Marshal oldData for node %q: %v", nodeName, err) + } + + // Reset spec to make sure only patch for Status or ObjectMeta is generated. + // Note that we don't reset ObjectMeta here, because: + // 1. This aligns with Nodes().UpdateStatus(). + // 2. Some component does use this to update node annotations. + newNode.Spec = oldNode.Spec + newData, err := json.Marshal(newNode) + if err != nil { + return nil, fmt.Errorf("failed to Marshal newData for node %q: %v", nodeName, err) + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + if err != nil { + return nil, fmt.Errorf("failed to CreateTwoWayMergePatch for node %q: %v", nodeName, err) + } + return patchBytes, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a396e454eb..0b6799bf4d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -519,6 +519,7 @@ k8s.io/kube-openapi/pkg/common k8s.io/kube-openapi/pkg/util/proto # k8s.io/kubernetes v1.15.0 => k8s.io/kubernetes v1.15.0 k8s.io/kubernetes/pkg/client/leaderelectionconfig +k8s.io/kubernetes/pkg/util/node k8s.io/kubernetes/pkg/util/slice # k8s.io/legacy-cloud-providers v0.0.0 => k8s.io/legacy-cloud-providers v0.0.0-20191210234026-b5fed2ccee23 k8s.io/legacy-cloud-providers/gce