From 154c88613e0cfb7c675aa52781b3eef36dacc5a8 Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Thu, 19 Apr 2018 08:01:25 -0700 Subject: [PATCH] Implement target ingress watchers and push target ingress in controller --- cmd/glbc/main.go | 5 +- pkg/annotations/ingress.go | 7 + pkg/backends/backends.go | 9 +- pkg/context/context.go | 25 +++ pkg/controller/cluster_manager.go | 10 +- pkg/controller/controller.go | 158 +++++++++++++------ pkg/controller/translator/translator.go | 18 +-- pkg/controller/translator/translator_test.go | 3 - pkg/controller/utils.go | 106 +++---------- pkg/controller/utils_test.go | 27 ++++ pkg/firewalls/firewalls.go | 24 +-- pkg/firewalls/firewalls_test.go | 28 ++-- pkg/firewalls/interfaces.go | 2 +- pkg/informer/informer.go | 2 +- pkg/mci/controller.go | 83 ++++++---- pkg/mci/utils.go | 52 ++++++ pkg/target/interfaces.go | 2 +- pkg/target/target.go | 33 ++-- pkg/target/utils.go | 32 +++- pkg/utils/utils.go | 93 +++++++++++ pkg/utils/utils_test.go | 23 +++ 21 files changed, 498 insertions(+), 244 deletions(-) create mode 100644 pkg/mci/utils.go diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index b2a792a908..037d34d640 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -106,6 +106,7 @@ func main() { } enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup) + mciEnabled := flags.F.MultiCluster stopCh := make(chan struct{}) ctx := context.NewControllerContext(kubeClient, registryClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG) lbc, err := controller.NewLoadBalancerController(ctx, clusterManager, enableNEG, stopCh) @@ -116,7 +117,7 @@ func main() { if clusterManager.ClusterNamer.UID() != "" { glog.V(0).Infof("Cluster name is %+v", clusterManager.ClusterNamer.UID()) } - clusterManager.Init(lbc.Translator, lbc.Translator) + clusterManager.Init(lbc.Translator, lbc.Translator, mciEnabled) glog.V(0).Infof("clusterManager initialized") if enableNEG { @@ -125,7 +126,7 @@ func main() { glog.V(0).Infof("negController started") } - if flags.F.MultiCluster { + if mciEnabled { mciController, _ := mci.NewController(ctx, flags.F.ResyncPeriod, lbc) go mciController.Run(stopCh) glog.V(0).Infof("Multi-Cluster Ingress Controller started") diff --git a/pkg/annotations/ingress.go b/pkg/annotations/ingress.go index 484373d97f..db853d7d6f 100644 --- a/pkg/annotations/ingress.go +++ b/pkg/annotations/ingress.go @@ -114,3 +114,10 @@ func (ing *Ingress) IngressClass() string { } return val } + +func AddAnnotation(ing *extensions.Ingress, key, val string) { + if ing.Annotations == nil { + ing.Annotations = map[string]string{} + } + ing.Annotations[key] = val +} diff --git a/pkg/backends/backends.go b/pkg/backends/backends.go index f5ec83929a..388c651e28 100644 --- a/pkg/backends/backends.go +++ b/pkg/backends/backends.go @@ -73,7 +73,7 @@ const ( // received by GCLB above this RPS are NOT dropped, GCLB continues to distribute // them across IGs. // TODO: Should this be math.MaxInt64? -const maxRPS = 1 +const maxRPS = 1e14 // Backends implements BackendPool. type Backends struct { @@ -433,6 +433,13 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr // edgeHop checks the links of the given backend by executing an edge hop. // It fixes broken links and updates the Backend accordingly. func (b *Backends) edgeHop(be *BackendService, igs []*compute.InstanceGroup) error { + // Note: Clearing the backends in the BackendService is an ugly hack to make mci work. + if be.Alpha != nil { + be.Alpha.Backends = nil + } else { + be.Ga.Backends = nil + } + addIGs := getInstanceGroupsToAdd(be, igs) if len(addIGs) == 0 { return nil diff --git a/pkg/context/context.go b/pkg/context/context.go index 956cccc085..392d47c0ad 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -33,6 +33,8 @@ import ( crinformerv1alpha1 "k8s.io/cluster-registry/pkg/client/informers_generated/externalversions/clusterregistry/v1alpha1" "k8s.io/ingress-gce/pkg/informer" "k8s.io/ingress-gce/pkg/mapper" + "k8s.io/ingress-gce/pkg/target" + "k8s.io/ingress-gce/pkg/utils" ) // ControllerContext holds resources necessary for the general @@ -60,6 +62,7 @@ type MultiClusterContext struct { MCIEnabled bool ClusterClients map[string]kubernetes.Interface ClusterInformerManagers map[string]informer.ClusterInformerManager + ClusterResourceManagers map[string]target.TargetResourceManager ClusterServiceMappers map[string]mapper.ClusterServiceMapper } @@ -87,6 +90,7 @@ func NewControllerContext(kubeClient kubernetes.Interface, registryClient crclie context.MC.ClusterClients = make(map[string]kubernetes.Interface) context.MC.ClusterInformerManagers = make(map[string]informer.ClusterInformerManager) context.MC.ClusterServiceMappers = make(map[string]mapper.ClusterServiceMapper) + context.MC.ClusterResourceManagers = make(map[string]target.TargetResourceManager) } return context @@ -145,3 +149,24 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) { go ctx.MC.ClusterInformer.Run(stopCh) } } + +func (ctx *ControllerContext) ResourceManagers() (resourceManagers map[string]target.TargetResourceManager) { + if !ctx.MC.MCIEnabled { + return nil + } + return ctx.MC.ClusterResourceManagers +} + +// If in MCI mode, ServiceMappers() gets mappers for all clusters in the +// cluster registry. This is because for now, we assume that all ingresses live +// in all "target" clusters. This will change once we start taking the +// MultiClusterConfig into account. If we are not in MCI mode, +// then ServiceMappers() consists of the mapper for the local cluster +func (ctx *ControllerContext) ServiceMappers() (svcMappers map[string]mapper.ClusterServiceMapper) { + if ctx.MC.MCIEnabled { + return ctx.MC.ClusterServiceMappers + } + // Create a ClusterServiceMapper for the local cluster. + svcGetter := utils.SvcGetter{Store: ctx.ServiceInformer.GetStore()} + return map[string]mapper.ClusterServiceMapper{"local": mapper.NewClusterServiceMapper(svcGetter.Get, nil)} +} diff --git a/pkg/controller/cluster_manager.go b/pkg/controller/cluster_manager.go index 70b11370c9..0a131c23dc 100644 --- a/pkg/controller/cluster_manager.go +++ b/pkg/controller/cluster_manager.go @@ -57,9 +57,11 @@ type ClusterManager struct { } // Init initializes the cluster manager. -func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider) { +func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider, mciEnabled bool) { c.instancePool.Init(zl) - c.backendPool.Init(pp) + if !mciEnabled { + c.backendPool.Init(pp) + } // TODO: Initialize other members as needed. } @@ -135,8 +137,8 @@ func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servic return igs, err } -func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string) error { - return c.firewallPool.Sync(nodeNames, endpointPorts...) +func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string, mciEnabled bool) error { + return c.firewallPool.Sync(nodeNames, mciEnabled, endpointPorts...) } // GC garbage collects unused resources. diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 4cda944c48..72c09cfc25 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,7 +48,8 @@ import ( ) const ( - // DefaultFirewallName is the default firewall name. + // DefaultFirewallName is the name to user for firewall rules created + // by an L7 controller when the --fireall-rule is not used. DefaultFirewallName = "" // Frequency to poll on local stores to sync. storeSyncPollPeriod = 5 * time.Second @@ -55,8 +57,6 @@ const ( var ( keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc - // DefaultFirewallName is the name to user for firewall rules created - // by an L7 controller when the --fireall-rule is not used. ) // LoadBalancerController watches the kubernetes api and adds/removes services @@ -64,7 +64,7 @@ var ( type LoadBalancerController struct { ctx *context.ControllerContext - ingLister StoreToIngressLister + ingLister utils.StoreToIngressLister nodeLister cache.Indexer nodes *NodeController // endpoint lister is needed when translating service target port to real endpoint target ports. @@ -101,7 +101,7 @@ func NewLoadBalancerController(ctx *context.ControllerContext, clusterManager *C }) lbc := LoadBalancerController{ ctx: ctx, - ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, + ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, nodeLister: ctx.NodeInformer.GetIndexer(), nodes: NewNodeController(ctx, clusterManager), CloudClusterManager: clusterManager, @@ -119,7 +119,7 @@ func NewLoadBalancerController(ctx *context.ControllerContext, clusterManager *C ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) - if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) { + if !utils.IsGCEIngress(addIng) && !utils.IsGCEMultiClusterIngress(addIng) { glog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, annotations.IngressClassKey) return } @@ -130,7 +130,7 @@ func NewLoadBalancerController(ctx *context.ControllerContext, clusterManager *C }, DeleteFunc: func(obj interface{}) { delIng := obj.(*extensions.Ingress) - if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) { + if !utils.IsGCEIngress(delIng) && !utils.IsGCEMultiClusterIngress(delIng) { glog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, annotations.IngressClassKey) return } @@ -140,7 +140,7 @@ func NewLoadBalancerController(ctx *context.ControllerContext, clusterManager *C }, UpdateFunc: func(old, cur interface{}) { curIng := cur.(*extensions.Ingress) - if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) { + if !utils.IsGCEIngress(curIng) && !utils.IsGCEMultiClusterIngress(curIng) { return } if reflect.DeepEqual(old, cur) { @@ -168,13 +168,11 @@ func NewLoadBalancerController(ctx *context.ControllerContext, clusterManager *C if ctx.EndpointInformer != nil { endpointIndexer = ctx.EndpointInformer.GetIndexer() } - svcGetter := utils.SvcGetter{Store: ctx.ServiceInformer.GetStore()} lbc.Translator = translator.New(lbc.ctx, lbc.CloudClusterManager.ClusterNamer, ctx.ServiceInformer.GetIndexer(), ctx.NodeInformer.GetIndexer(), ctx.PodInformer.GetIndexer(), endpointIndexer, - mapper.NewClusterServiceMapper(svcGetter.Get, nil), negEnabled) lbc.tlsLoader = &tls.TLSCertsFromSecretsLoader{Client: lbc.ctx.KubeClient} @@ -196,6 +194,11 @@ func (lbc *LoadBalancerController) EnqueueAllIngresses() error { return nil } +// Implements MCIEnqueue +func (lbc *LoadBalancerController) EnqueueIngress(ing *extensions.Ingress) { + lbc.ingQueue.Enqueue(ing) +} + // enqueueIngressForService enqueues all the Ingress' for a Service. func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) { svc := obj.(*apiv1.Service) @@ -205,7 +208,7 @@ func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) { return } for _, ing := range ings { - if !isGCEIngress(&ing) { + if !utils.IsGCEIngress(&ing) { continue } lbc.ingQueue.Enqueue(&ing) @@ -254,29 +257,22 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { } glog.V(3).Infof("Syncing %v", key) - if lbc.ctx.MC.MCIEnabled { - // This is a temporary short-circuit to just verify that - // the MCI controller properly queues ingresses. - return nil - } - gceIngresses, err := lbc.ingLister.ListGCEIngresses() if err != nil { return err } - // gceNodePorts contains the ServicePorts used by only single-cluster ingress. + svcMappers := lbc.ctx.ServiceMappers() + // gceNodePorts contains ServicePort's for all Ingresses. var gceNodePorts []backends.ServicePort for _, gceIngress := range gceIngresses.Items { - svcPortMapping, err := lbc.Translator.ServicePortMapping(&gceIngress) - if err != nil { - glog.Infof("%v", err.Error()) + for cluster, svcMapper := range svcMappers { + svcPorts, _, err := servicePorts(&gceIngress, svcMapper) + if err != nil { + glog.Infof("Error getting NodePort's for cluster %v: %v", cluster, err.Error()) + } + gceNodePorts = append(gceNodePorts, svcPorts...) } - gceNodePorts = append(gceNodePorts, extractSvcPorts(svcPortMapping)...) - } - nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) - if err != nil { - return err } lbNames := lbc.ingLister.Store.ListKeys() @@ -284,8 +280,20 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { if err != nil { return err } + if !ingExists { glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key) + resourceManagers := lbc.ctx.ResourceManagers() + for cluster, resourceManager := range resourceManagers { + ingNamespace, ingName, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("Error extracting (namespace,name) pair from key %v: %v", key, err) + } + err = resourceManager.DeleteTargetIngress(ingName, ingNamespace) + if err != nil { + return fmt.Errorf("Error deleting target ingress %v/%v in cluster %v: %v", ingNamespace, ingName, cluster, err) + } + } // GC will find GCE resources that were used for this ingress and delete them. return lbc.CloudClusterManager.GC(lbNames, gceNodePorts) } @@ -297,7 +305,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { } ing = ing.DeepCopy() - ensureErr := lbc.ensureIngress(key, ing, nodeNames, gceNodePorts) + ensureErr := lbc.ensureIngress(key, ing, svcMappers, gceNodePorts) if ensureErr != nil { lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", ensureErr.Error())) } @@ -312,31 +320,73 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { return ensureErr } -func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ingress, nodeNames []string, gceNodePorts []backends.ServicePort) error { - // Given an ingress, returns a mapping of IngressBackend -> ServicePort - svcPortMapping, err := lbc.Translator.ServicePortMapping(ing) - if err != nil { - // TODO(rramkumar): Clean this up, it's very ugly. - switch err.(type) { - case *multierror.Error: - // Emit an event for each error in the multierror. - merr := err.(*multierror.Error) - for _, e := range merr.Errors { - msg := fmt.Sprintf("%v", e) +func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ingress, svcMappers map[string]mapper.ClusterServiceMapper, gceNodePorts []backends.ServicePort) error { + var ingNodePorts []backends.ServicePort + var backendToServicePorts map[extensions.IngressBackend]backends.ServicePort + for cluster, svcMapper := range svcMappers { + svcPorts, m, err := servicePorts(ing, svcMapper) + ingNodePorts = svcPorts + backendToServicePorts = m + if err != nil { + // TODO(rramkumar): Clean this up, it's very ugly. + switch err.(type) { + case *multierror.Error: + // Emit an event for each error in the multierror. + merr := err.(*multierror.Error) + for _, e := range merr.Errors { + msg := fmt.Sprintf("Error getting NodePort's for cluster %v: %v", cluster, e) + lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Service", msg) + } + default: + msg := fmt.Sprintf("%v", err) lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Service", msg) } - default: - msg := fmt.Sprintf("%v", err) - lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Service", msg) } + // TODO(rramkumar): In each iteration, pass svcPorts to a validator which progressively + // validates that each list of ServicePort's it gets is consistent with the previous. + // For now, we are going to do no validation. } - ingNodePorts := extractSvcPorts(svcPortMapping) - igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, ingNodePorts) + + nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) if err != nil { return err } - if isGCEMultiClusterIngress(ing) { + var igs []*compute.InstanceGroup + if lbc.ctx.MC.MCIEnabled { + resourceManagers := lbc.ctx.ResourceManagers() + for cluster, resourceManager := range resourceManagers { + targetIng, ensureErr := resourceManager.EnsureTargetIngress(ing) + if ensureErr != nil { + return fmt.Errorf("Error ensuring target ingress %v/%v in cluster %v: %v", ing.Namespace, ing.Name, cluster, ensureErr) + } + annotationVals, annotationErr := instanceGroupsAnnotation(targetIng) + if annotationErr != nil { + return fmt.Errorf("Error getting instance group annotations from target ingress %v/%v in cluster %v: %v", ing.Namespace, ing.Name, cluster, annotationErr) + } + if len(annotationVals) == 0 { + // If a target ingress does not have the annotation yet, + // then just return and wait for the ingress to be requeued. + glog.V(3).Infof("Could not find instance group annotation for target ingress %v/%v in cluster %v. Requeueing ingress...", ing.Namespace, ing.Name, cluster) + return nil + } + for _, val := range annotationVals { + ig, err := lbc.CloudClusterManager.instancePool.Get(val.Name, utils.TrimZoneLink(val.Zone)) + if err != nil { + return fmt.Errorf("Error getting instance groups for target ingress %v/%v in cluster %v: %v", ing.Namespace, ing.Name, cluster, err) + } + glog.V(3).Infof("Found instance group %v for target ingress %v/v in cluster %v", ig.Name, ing.Namespace, ing.Name, cluster) + igs = append(igs, ig) + } + } + } else { + igs, err = lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, ingNodePorts) + if err != nil { + return err + } + } + + if utils.IsGCEMultiClusterIngress(ing) { // Add instance group names as annotation on the ingress and return. if ing.Annotations == nil { ing.Annotations = map[string]string{} @@ -363,7 +413,7 @@ func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ing negEndpointPorts := lbc.Translator.GatherEndpointPorts(gceNodePorts) // Ensure firewall rule for the cluster and pass any NEG endpoint ports. - if err = lbc.CloudClusterManager.EnsureFirewall(nodeNames, negEndpointPorts); err != nil { + if err = lbc.CloudClusterManager.EnsureFirewall(nodeNames, negEndpointPorts, lbc.ctx.MC.MCIEnabled); err != nil { if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { // XPN: Raise an event and ignore the error. lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, "XPN", fwErr.Message) @@ -393,7 +443,7 @@ func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ing return fmt.Errorf("unable to get loadbalancer: %v", err) } - urlMap, err := lbc.Translator.ToURLMap(ing, svcPortMapping) + urlMap, err := lbc.Translator.ToURLMap(ing, backendToServicePorts) if err != nil { return fmt.Errorf("error converting to URLMap: %v", err) } @@ -490,9 +540,19 @@ func updateAnnotations(client kubernetes.Interface, name, namespace string, anno return nil } -func extractSvcPorts(svcPortMapping map[extensions.IngressBackend]backends.ServicePort) (svcPorts []backends.ServicePort) { - for _, svcPort := range svcPortMapping { +// servicePorts converts an Ingress to its ServicePort's using a specific ClusterServiceMapper. +func servicePorts(ing *extensions.Ingress, svcMapper mapper.ClusterServiceMapper) ([]backends.ServicePort, map[extensions.IngressBackend]backends.ServicePort, error) { + var svcPorts []backends.ServicePort + backendToServiceMap, err := svcMapper.Services(ing) + if err != nil { + return nil, nil, err + } + backendToServicePortsMap, err := backends.ServicePorts(backendToServiceMap) + if err != nil { + return nil, nil, err + } + for _, svcPort := range backendToServicePortsMap { svcPorts = append(svcPorts, svcPort) } - return svcPorts + return svcPorts, backendToServicePortsMap, nil } diff --git a/pkg/controller/translator/translator.go b/pkg/controller/translator/translator.go index 266575b8fd..a7cf8d2b09 100644 --- a/pkg/controller/translator/translator.go +++ b/pkg/controller/translator/translator.go @@ -37,7 +37,6 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" "k8s.io/ingress-gce/pkg/loadbalancers" - "k8s.io/ingress-gce/pkg/mapper" "k8s.io/ingress-gce/pkg/utils" ) @@ -46,7 +45,7 @@ type recorderSource interface { } // New returns a new ControllerContext. -func New(recorders recorderSource, namer *utils.Namer, svcLister cache.Indexer, nodeLister cache.Indexer, podLister cache.Indexer, endpointLister cache.Indexer, svcMapper mapper.ClusterServiceMapper, negEnabled bool) *GCE { +func New(recorders recorderSource, namer *utils.Namer, svcLister cache.Indexer, nodeLister cache.Indexer, podLister cache.Indexer, endpointLister cache.Indexer, negEnabled bool) *GCE { return &GCE{ recorders, namer, @@ -54,7 +53,6 @@ func New(recorders recorderSource, namer *utils.Namer, svcLister cache.Indexer, nodeLister, podLister, endpointLister, - svcMapper, negEnabled, } } @@ -68,7 +66,6 @@ type GCE struct { nodeLister cache.Indexer podLister cache.Indexer endpointLister cache.Indexer - svcMapper mapper.ClusterServiceMapper negEnabled bool } @@ -116,19 +113,6 @@ func (t *GCE) ToURLMap(ing *extensions.Ingress, svcPorts map[extensions.IngressB return urlMap, nil } -// ServicePortMapping converts an Ingress to a IngressBackend -> ServicePort mapping. -func (t *GCE) ServicePortMapping(ing *extensions.Ingress) (map[extensions.IngressBackend]backends.ServicePort, error) { - backendToServiceMap, err := t.svcMapper.Services(ing) - if err != nil { - return nil, err - } - backendToServicePortsMap, err := backends.ServicePorts(backendToServiceMap) - if err != nil { - return nil, err - } - return backendToServicePortsMap, nil -} - func getZone(n *api_v1.Node) string { zone, ok := n.Labels[annotations.ZoneKey] if !ok { diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go index 599213eb01..1174b7d54d 100644 --- a/pkg/controller/translator/translator_test.go +++ b/pkg/controller/translator/translator_test.go @@ -37,7 +37,6 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" "k8s.io/ingress-gce/pkg/context" - "k8s.io/ingress-gce/pkg/mapper" "k8s.io/ingress-gce/pkg/test" "k8s.io/ingress-gce/pkg/utils" ) @@ -57,14 +56,12 @@ func gceForTest(negEnabled bool) *GCE { namer := utils.NewNamer("uid1", "fw1") ctx := context.NewControllerContext(client, nil, apiv1.NamespaceAll, 1*time.Second, negEnabled) - svcGetter := utils.SvcGetter{Store: ctx.ServiceInformer.GetStore()} gce := &GCE{ recorders: ctx, namer: namer, svcLister: ctx.ServiceInformer.GetIndexer(), nodeLister: ctx.NodeInformer.GetIndexer(), podLister: ctx.PodInformer.GetIndexer(), - svcMapper: mapper.NewClusterServiceMapper(svcGetter.Get, nil), negEnabled: negEnabled, } if ctx.EndpointInformer != nil { diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index ee4353379a..da148bf9b7 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -33,91 +33,9 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" - "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils" ) -// isGCEIngress returns true if the Ingress matches the class managed by this -// controller. -func isGCEIngress(ing *extensions.Ingress) bool { - class := annotations.FromIngress(ing).IngressClass() - if flags.F.IngressClass == "" { - return class == "" || class == annotations.GceIngressClass - } - return class == flags.F.IngressClass -} - -// isGCEMultiClusterIngress returns true if the given Ingress has -// ingress.class annotation set to "gce-multi-cluster". -func isGCEMultiClusterIngress(ing *extensions.Ingress) bool { - class := annotations.FromIngress(ing).IngressClass() - return class == annotations.GceMultiIngressClass -} - -// StoreToIngressLister makes a Store that lists Ingress. -// TODO: Move this to cache/listers post 1.1. -type StoreToIngressLister struct { - cache.Store -} - -// List lists all Ingress' in the store (both single and multi cluster ingresses). -func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) { - for _, m := range s.Store.List() { - newIng := m.(*extensions.Ingress) - if isGCEIngress(newIng) || isGCEMultiClusterIngress(newIng) { - ing.Items = append(ing.Items, *newIng) - } - } - return ing, nil -} - -// ListGCEIngresses lists all GCE Ingress' in the store. -func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) { - for _, m := range s.Store.List() { - newIng := m.(*extensions.Ingress) - if isGCEIngress(newIng) { - ing.Items = append(ing.Items, *newIng) - } - } - return ing, nil -} - -// GetServiceIngress gets all the Ingress' that have rules pointing to a service. -// Note that this ignores services without the right nodePorts. -func (s *StoreToIngressLister) GetServiceIngress(svc *api_v1.Service) (ings []extensions.Ingress, err error) { -IngressLoop: - for _, m := range s.Store.List() { - ing := *m.(*extensions.Ingress) - if ing.Namespace != svc.Namespace { - continue - } - - // Check service of default backend - if ing.Spec.Backend != nil && ing.Spec.Backend.ServiceName == svc.Name { - ings = append(ings, ing) - continue - } - - // Check the target service for each path rule - for _, rule := range ing.Spec.Rules { - if rule.IngressRuleValue.HTTP == nil { - continue - } - for _, p := range rule.IngressRuleValue.HTTP.Paths { - if p.Backend.ServiceName == svc.Name { - ings = append(ings, ing) - // Skip the rest of the rules to avoid duplicate ingresses in list - continue IngressLoop - } - } - } - } - if len(ings) == 0 { - err = fmt.Errorf("no ingress for service %v", svc.Name) - } - return -} - // StoreToEndpointLister makes a Store that lists Endpoints. type StoreToEndpointLister struct { cache.Indexer @@ -158,15 +76,16 @@ func (s *StoreToEndpointLister) ListEndpointTargetPorts(namespace, name, targetP return ret } +type InstanceGroupsAnnotationValue struct { + Name string + Zone string +} + // setInstanceGroupsAnnotation sets the instance-groups annotation with names of the given instance groups. func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.InstanceGroup) error { - type Value struct { - Name string - Zone string - } - var instanceGroups []Value + var instanceGroups []InstanceGroupsAnnotationValue for _, ig := range igs { - instanceGroups = append(instanceGroups, Value{Name: ig.Name, Zone: ig.Zone}) + instanceGroups = append(instanceGroups, InstanceGroupsAnnotationValue{Name: ig.Name, Zone: ig.Zone}) } jsonValue, err := json.Marshal(instanceGroups) if err != nil { @@ -176,6 +95,17 @@ func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.Inst return nil } +// instanceGroupsAnnotation returns the values for the instance-groups annotation on an ingress. +func instanceGroupsAnnotation(ing *extensions.Ingress) ([]InstanceGroupsAnnotationValue, error) { + var igs []InstanceGroupsAnnotationValue + if val, ok := ing.Annotations[annotations.InstanceGroupsAnnotationKey]; ok { + if err := json.Unmarshal([]byte(val), &igs); err != nil { + return nil, fmt.Errorf("error in parsing instance group annotation: %v", err) + } + } + return igs, nil +} + // uniq returns an array of unique service ports from the given array. func uniq(nodePorts []backends.ServicePort) []backends.ServicePort { portMap := map[int64]backends.ServicePort{} diff --git a/pkg/controller/utils_test.go b/pkg/controller/utils_test.go index db054e04b2..5c708c67cc 100644 --- a/pkg/controller/utils_test.go +++ b/pkg/controller/utils_test.go @@ -17,12 +17,14 @@ limitations under the License. package controller import ( + "reflect" "testing" "time" compute "google.golang.org/api/compute/v1" api_v1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -154,6 +156,31 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) { } } +func TestInstanceGroupsAnnotation(t *testing.T) { + testCases := []struct { + ing *extensions.Ingress + expected []InstanceGroupsAnnotationValue + }{ + { + &extensions.Ingress{ + ObjectMeta: meta_v1.ObjectMeta{ + Annotations: map[string]string{ + annotations.InstanceGroupsAnnotationKey: `[{"Name":"k8s-ig--1","Zone":"https://www.googleapis.com/compute/v1/projects/rramkumar-gke-dev/zones/us-central1-c"}]`, + }, + }, + }, + []InstanceGroupsAnnotationValue{InstanceGroupsAnnotationValue{Name: "k8s-ig--1", Zone: "https://www.googleapis.com/compute/v1/projects/rramkumar-gke-dev/zones/us-central1-c"}}, + }, + } + + for _, testCase := range testCases { + res, _ := instanceGroupsAnnotation(testCase.ing) + if !reflect.DeepEqual(testCase.expected, res) { + t.Errorf("Expected instance group annotation values %+v not equal to result %+v", testCase.expected, res) + } + } +} + func TestNodeStatusChanged(t *testing.T) { testCases := []struct { desc string diff --git a/pkg/firewalls/firewalls.go b/pkg/firewalls/firewalls.go index a0abdc59c9..3ffec31c03 100644 --- a/pkg/firewalls/firewalls.go +++ b/pkg/firewalls/firewalls.go @@ -56,19 +56,11 @@ func NewFirewallPool(cloud Firewall, namer *utils.Namer, l7SrcRanges []string, n } // Sync sync firewall rules with the cloud. -func (fr *FirewallRules) Sync(nodeNames []string, additionalPorts ...string) error { +func (fr *FirewallRules) Sync(nodeNames []string, mciEnabled bool, additionalPorts ...string) error { glog.V(4).Infof("Sync(%v)", nodeNames) name := fr.namer.FirewallRule() existingFirewall, _ := fr.cloud.GetFirewall(name) - // Retrieve list of target tags from node names. This may be configured in - // gce.conf or computed by the GCE cloudprovider package. - targetTags, err := fr.cloud.GetNodeTags(nodeNames) - if err != nil { - return err - } - sort.Strings(targetTags) - ports := sets.NewString(additionalPorts...) ports.Insert(fr.portRanges...) expectedFirewall := &compute.Firewall{ @@ -82,7 +74,19 @@ func (fr *FirewallRules) Sync(nodeNames []string, additionalPorts ...string) err Ports: ports.List(), }, }, - TargetTags: targetTags, + } + + // If MCI is enabled, then for simplicity, we apply the firewall rule across all targets. + // Otherwise, we specifically get the network tags for each node. + if !mciEnabled { + // Retrieve list of target tags from node names. This may be configured in + // gce.conf or computed by the GCE cloudprovider package. + targetTags, err := fr.cloud.GetNodeTags(nodeNames) + if err != nil { + return err + } + sort.Strings(targetTags) + expectedFirewall.TargetTags = targetTags } if existingFirewall == nil { diff --git a/pkg/firewalls/firewalls_test.go b/pkg/firewalls/firewalls_test.go index 5a794ea152..74d63bf23b 100644 --- a/pkg/firewalls/firewalls_test.go +++ b/pkg/firewalls/firewalls_test.go @@ -38,7 +38,7 @@ func TestFirewallPoolSync(t *testing.T) { fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Fatal(err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) @@ -49,21 +49,21 @@ func TestFirewallPoolSyncNodes(t *testing.T) { fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Fatal(err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) // Add nodes nodes = append(nodes, "node-d", "node-e") - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) // Remove nodes nodes = []string{"node-a", "node-c"} - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) @@ -74,7 +74,7 @@ func TestFirewallPoolSyncSrcRanges(t *testing.T) { fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Fatal(err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) @@ -86,7 +86,7 @@ func TestFirewallPoolSyncSrcRanges(t *testing.T) { t.Fatal(err) } - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) @@ -97,7 +97,7 @@ func TestFirewallPoolSyncPorts(t *testing.T) { fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Fatal(err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) @@ -110,14 +110,14 @@ func TestFirewallPoolSyncPorts(t *testing.T) { } // Expect firewall to be synced back to normal - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) // Verify additional ports are included negTargetports := []string{"80", "443", "8080"} - if err := fp.Sync(nodes, negTargetports...); err != nil { + if err := fp.Sync(nodes, false, negTargetports...); err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, append(portRanges(), negTargetports...), t) @@ -128,7 +128,7 @@ func TestFirewallPoolShutdown(t *testing.T) { fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Fatal(err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) @@ -150,7 +150,7 @@ func TestSyncOnXPNWithPermission(t *testing.T) { fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} - if err := fp.Sync(nodes); err != nil { + if err := fp.Sync(nodes, false); err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) @@ -164,7 +164,7 @@ func TestSyncXPNReadOnly(t *testing.T) { fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} - err := fp.Sync(nodes) + err := fp.Sync(nodes, false) if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "create") { t.Errorf("Expected firewall sync error with a user message. Received err: %v", err) } @@ -187,12 +187,12 @@ func TestSyncXPNReadOnly(t *testing.T) { } // Run sync again with same state - expect no event - if err = fp.Sync(nodes); err != nil { + if err = fp.Sync(nodes, false); err != nil { t.Errorf("unexpected err when syncing firewall, err: %v", err) } nodes = append(nodes, "node-d") - err = fp.Sync(nodes) + err = fp.Sync(nodes, false) if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "update") { t.Errorf("Expected firewall sync error with a user message. Received err: %v", err) } diff --git a/pkg/firewalls/interfaces.go b/pkg/firewalls/interfaces.go index 99398829a0..92c9c16606 100644 --- a/pkg/firewalls/interfaces.go +++ b/pkg/firewalls/interfaces.go @@ -22,7 +22,7 @@ import ( // SingleFirewallPool syncs the firewall rule for L7 traffic. type SingleFirewallPool interface { - Sync(nodeNames []string, additionalPorts ...string) error + Sync(nodeNames []string, mciEnabled bool, additionalPorts ...string) error Shutdown() error } diff --git a/pkg/informer/informer.go b/pkg/informer/informer.go index f3fb1062f7..7157e4ffed 100644 --- a/pkg/informer/informer.go +++ b/pkg/informer/informer.go @@ -61,7 +61,7 @@ func (m *clusterInformerManager) CreateInformers() { } func (m *clusterInformerManager) DeleteInformers() { - <-m.stopChan + close(m.stopChan) } func (m *clusterInformerManager) HasSynced() bool { diff --git a/pkg/mci/controller.go b/pkg/mci/controller.go index 785f62957e..ba6bdd8d4a 100644 --- a/pkg/mci/controller.go +++ b/pkg/mci/controller.go @@ -21,15 +21,14 @@ import ( "time" "github.com/golang/glog" + extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - clientcmd "k8s.io/client-go/tools/clientcmd/api" crv1alpha1 "k8s.io/cluster-registry/pkg/apis/clusterregistry/v1alpha1" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/informer" "k8s.io/ingress-gce/pkg/mapper" + "k8s.io/ingress-gce/pkg/target" "k8s.io/ingress-gce/pkg/utils" ) @@ -42,29 +41,35 @@ type Controller struct { clusterSynced cache.InformerSynced clusterLister cache.Indexer + queueHandle MCIEnqueue } // MCIEnqueue is a interface to allow the MCI controller to enqueue ingresses // based on events it receives. type MCIEnqueue interface { EnqueueAllIngresses() error + EnqueueIngress(ing *extensions.Ingress) } -func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration, enqueue MCIEnqueue) (*Controller, error) { +func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration, queueHandle MCIEnqueue) (*Controller, error) { mciController := &Controller{ ctx: ctx, resyncPeriod: resyncPeriod, clusterSynced: ctx.MC.ClusterInformer.HasSynced, clusterLister: ctx.MC.ClusterInformer.GetIndexer(), + queueHandle: queueHandle, } ctx.MC.ClusterInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c := obj.(*crv1alpha1.Cluster) glog.V(3).Infof("Cluster %v added", c.Name) - mciController.handleClusterAdd(c) + err := mciController.handleClusterAdd(c) + if err != nil { + glog.V(3).Infof("Error bootstrapping resources for cluster %v: %v", c.Name, err) + } // For now, queue up all ingresses - err := enqueue.EnqueueAllIngresses() + err = queueHandle.EnqueueAllIngresses() if err != nil { glog.V(3).Infof("Error enqueuing ingresses on add of cluster %v: %v", c.Name, err) } @@ -73,9 +78,10 @@ func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration, e c := obj.(*crv1alpha1.Cluster) glog.V(3).Infof("Cluster %v deleted", c.Name) mciController.handleClusterDelete(c) - err := enqueue.EnqueueAllIngresses() + // For now, queue up all ingresses + err := queueHandle.EnqueueAllIngresses() if err != nil { - glog.V(3).Infof("Error enqueuing ingress on delete of cluster %v: %v", c.Name, err) + glog.V(3).Infof("Error enqueuing ingresses on add of cluster %v: %v", c.Name, err) } }, UpdateFunc: func(obj, cur interface{}) { @@ -86,10 +92,10 @@ func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration, e return mciController, nil } -func (controller *Controller) handleClusterAdd(c *crv1alpha1.Cluster) { +func (controller *Controller) handleClusterAdd(c *crv1alpha1.Cluster) error { client, err := buildClusterClient(c) if err != nil { - glog.V(3).Infof("Error building client for cluster %v: %v", c.Name, err) + return fmt.Errorf("Error building client for cluster %v: %v", c.Name, err) } // Keep track of the client controller.ctx.MC.ClusterClients[c.Name] = client @@ -97,7 +103,8 @@ func (controller *Controller) handleClusterAdd(c *crv1alpha1.Cluster) { informerManager := informer.NewClusterInformerManager(client, controller.resyncPeriod) informerManager.CreateInformers() - // TODO(rramkumar): For now, just add event handlers for Ingress. + // For now, just add event handlers for Ingress. + controller.addIngressEventHandlers(informerManager, c.Name) // Keep track of the informer manager. controller.ctx.MC.ClusterInformerManagers[c.Name] = informerManager @@ -107,43 +114,55 @@ func (controller *Controller) handleClusterAdd(c *crv1alpha1.Cluster) { svcMapper := mapper.NewClusterServiceMapper(svcGetter.Get, nil) // Keep track of the service mapper. controller.ctx.MC.ClusterServiceMappers[c.Name] = svcMapper + // Create a target resource manager for this cluster + targetResourceManager := target.NewTargetResourceManager(client) + controller.ctx.MC.ClusterResourceManagers[c.Name] = targetResourceManager glog.V(3).Infof("Built client and informers for cluster %v", c.Name) + return nil } func (controller *Controller) handleClusterDelete(c *crv1alpha1.Cluster) { + // Remove all ingresses in the cluster + controller.deleteIngressesFromCluster(c) // Remove client for this cluster delete(controller.ctx.MC.ClusterClients, c.Name) // Stop informers. informerManager := controller.ctx.MC.ClusterInformerManagers[c.Name] informerManager.DeleteInformers() delete(controller.ctx.MC.ClusterInformerManagers, c.Name) - // Remove cluster service mappers + // Remove cluster service mapper. delete(controller.ctx.MC.ClusterServiceMappers, c.Name) + // Remove target resource manager + delete(controller.ctx.MC.ClusterResourceManagers, c.Name) glog.V(3).Infof("Removed client and informers for cluster %v", c.Name) } -// buildClusterClient builds a k8s client given a cluster from the ClusterRegistry. -func buildClusterClient(c *crv1alpha1.Cluster) (kubernetes.Interface, error) { - // Config used to instantiate a client - restConfig := &rest.Config{} - // Get endpoint for the master. For now, we only consider the first endpoint given. - masterEndpoints := c.Spec.KubernetesAPIEndpoints.ServerEndpoints - if len(masterEndpoints) == 0 { - return nil, fmt.Errorf("No master endpoints provided") +func (controller *Controller) addIngressEventHandlers(informerManager informer.ClusterInformerManager, clusterName string) { + informerManager.AddHandlersForInformer(informer.IngressInformer, cache.ResourceEventHandlerFuncs{ + // Note: For now, we don't care about ingresses being added or deleted in "target" clusters. + UpdateFunc: func(obj, cur interface{}) { + ing := obj.(*extensions.Ingress) + controller.queueHandle.EnqueueIngress(ing) + glog.V(3).Infof("Target ingress %v/%v updated in cluster %v. Requeueing ingress...", ing.Namespace, ing.Name, clusterName) + }, + }) +} + +func (controller *Controller) deleteIngressesFromCluster(c *crv1alpha1.Cluster) { + ingLister := utils.StoreToIngressLister{Store: controller.ctx.IngressInformer.GetStore()} + resourceManager := controller.ctx.MC.ClusterResourceManagers[c.Name] + ings, err := ingLister.ListGCEIngresses() + if err != nil { + glog.V(3).Infof("Error listing ingresses before deletion of target ingresses from cluster %v", c.Name, err) } - // Populate config with master endpoint. - restConfig.Host = masterEndpoints[0].ServerAddress - // Don't verify TLS for now - restConfig.TLSClientConfig = rest.TLSClientConfig{Insecure: true} - // Get auth for the master. We assume auth mechanism is using a client cert. - authProviders := c.Spec.AuthInfo.Providers - if len(authProviders) == 0 { - return nil, fmt.Errorf("No auth providers provided") + for _, ing := range ings.Items { + err = resourceManager.DeleteTargetIngress(ing.Name, ing.Namespace) + if err != nil { + glog.V(3).Infof("Error deleting target ingress %v/%v in cluster %v: %v", ing.Namespace, ing.Name, c.Name, err) + return + } } - providerConfig := c.Spec.AuthInfo.Providers[0] - // Populate config with client auth. - restConfig.AuthProvider = &clientcmd.AuthProviderConfig{Name: providerConfig.Name, Config: providerConfig.Config} - return kubernetes.NewForConfig(restConfig) + glog.V(3).Infof("Deleted all target ingresses in cluster %v", c.Name) } func (c *Controller) Run(stopCh <-chan struct{}) { diff --git a/pkg/mci/utils.go b/pkg/mci/utils.go new file mode 100644 index 0000000000..e29e1848f3 --- /dev/null +++ b/pkg/mci/utils.go @@ -0,0 +1,52 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mci + +import ( + "fmt" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + crv1alpha1 "k8s.io/cluster-registry/pkg/apis/clusterregistry/v1alpha1" +) + +// buildClusterClient builds a k8s client given a cluster from the ClusterRegistry. +func buildClusterClient(c *crv1alpha1.Cluster) (kubernetes.Interface, error) { + // Config used to instantiate a client + restConfig := &rest.Config{} + // Get endpoint for the master. For now, we only consider the first endpoint given. + masterEndpoints := c.Spec.KubernetesAPIEndpoints.ServerEndpoints + if len(masterEndpoints) == 0 { + return nil, fmt.Errorf("No master endpoints provided") + } + // Populate config with master endpoint. + restConfig.Host = masterEndpoints[0].ServerAddress + // Ignore TLS for now. + restConfig.TLSClientConfig = rest.TLSClientConfig{Insecure: true} + // Specify auth for master. We assume that basic auth is used (username + password) + authProviders := c.Spec.AuthInfo.Providers + if len(authProviders) == 0 { + return nil, fmt.Errorf("No auth providers provided") + } + providerConfig := c.Spec.AuthInfo.Providers[0] + if providerConfig.Config == nil { + return nil, fmt.Errorf("No auth config found in %+v", providerConfig) + } + restConfig.Username = providerConfig.Config["username"] + restConfig.Password = providerConfig.Config["password"] + return kubernetes.NewForConfig(restConfig) +} diff --git a/pkg/target/interfaces.go b/pkg/target/interfaces.go index 8231846bdd..5dc0e72d6c 100644 --- a/pkg/target/interfaces.go +++ b/pkg/target/interfaces.go @@ -21,5 +21,5 @@ import ( // TargetResourceManager is an interface to manage the k8s resources in "target" clusters. type TargetResourceManager interface { EnsureTargetIngress(ing *extensions.Ingress) (*extensions.Ingress, error) - DeleteTargetIngress(ing *extensions.Ingress) error + DeleteTargetIngress(name, namespace string) error } diff --git a/pkg/target/target.go b/pkg/target/target.go index c6662245bf..f70257fe06 100644 --- a/pkg/target/target.go +++ b/pkg/target/target.go @@ -17,12 +17,11 @@ package target import ( "fmt" - "github.com/golang/glog" extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/ingress-gce/pkg/annotations" - "k8s.io/ingress-gce/pkg/utils" ) type targetResourceManager struct { @@ -36,36 +35,36 @@ func NewTargetResourceManager(kubeClient kubernetes.Interface) TargetResourceMan var _ TargetResourceManager = &targetResourceManager{} func (m *targetResourceManager) EnsureTargetIngress(ing *extensions.Ingress) (*extensions.Ingress, error) { - existingIng, getErr := m.kubeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Get(ing.Name, meta_v1.GetOptions{}) + existingTargetIng, getErr := m.kubeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Get(ing.Name, meta_v1.GetOptions{}) if getErr != nil { - if utils.IsNotFoundError(getErr) { - // Add MCI annotation to "target" ingress. - ing.Annotations[annotations.IngressClassKey] = annotations.GceMultiIngressClass - actualIng, createErr := m.kubeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Create(ing) + if errors.IsNotFound(getErr) { + targetIng := createTargetIngress(ing) + createdIng, createErr := m.kubeClient.ExtensionsV1beta1().Ingresses(targetIng.Namespace).Create(targetIng) if createErr != nil { return nil, fmt.Errorf("Error creating target ingress %v/%v: %v", ing.Namespace, ing.Name, createErr) } - return actualIng, nil + return createdIng, nil } + return nil, getErr } - // Ignore instance group annotation while comparing ingresses. - ignoreAnnotations := map[string]string{instanceGroupAnnotationKey: ""} - if !objectMetaAndSpecEquivalent(ing, existingIng, ignoreAnnotations) { - glog.V(3).Infof("Target ingress %v in cluster %v differs. Overwriting... \n") - updatedIng, updateErr := m.kubeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Update(ing) + // Ignore all GCP resource annotations and MCI annotation when comparing ingresses. + ignoreAnnotations := map[string]string{annotations.InstanceGroupsAnnotationKey: "", annotations.IngressClassKey: ""} + if !objectMetaAndSpecEquivalent(ing, existingTargetIng, ignoreAnnotations) { + updateTargetIng(ing, existingTargetIng) + updatedIng, updateErr := m.kubeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Update(existingTargetIng) if updateErr != nil { return nil, fmt.Errorf("Error updating target ingress %v/%v: %v", ing.Namespace, ing.Name, updateErr) } return updatedIng, nil } - return existingIng, nil + return existingTargetIng, nil } -func (m *targetResourceManager) DeleteTargetIngress(ing *extensions.Ingress) error { - deleteErr := m.kubeClient.ExtensionsV1beta1().Ingresses(ing.Namespace).Delete(ing.Name, &meta_v1.DeleteOptions{}) +func (m *targetResourceManager) DeleteTargetIngress(name, namespace string) error { + deleteErr := m.kubeClient.ExtensionsV1beta1().Ingresses(namespace).Delete(name, &meta_v1.DeleteOptions{}) if deleteErr != nil { - return fmt.Errorf("Error deleting ingress %v/%v: %v", ing.Namespace, ing.Name, deleteErr) + return fmt.Errorf("Error deleting ingress %v/%v: %v", namespace, name, deleteErr) } return nil } diff --git a/pkg/target/utils.go b/pkg/target/utils.go index 5bdff26515..6b778b7cd7 100644 --- a/pkg/target/utils.go +++ b/pkg/target/utils.go @@ -17,15 +17,13 @@ package target import ( "reflect" + "k8s.io/api/extensions/v1beta1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/utils" ) -var ( - instanceGroupAnnotationKey = "ingress.gcp.kubernetes.io/instance-groups" -) - // objectMetaEquivalent checks if cluster-independent, user provided data in two given ObjectMeta are equal. // If in the future the ObjectMeta structure is expanded then any field that is not populated // by the api server should be included here. @@ -61,3 +59,29 @@ func objectMetaAndSpecEquivalent(a, b runtime.Object, ignoreAnnotationKeys map[s specB := reflect.ValueOf(b).Elem().FieldByName("Spec").Interface() return objectMetaEquivalent(objectMetaA, objectMetaB, ignoreAnnotationKeys) && reflect.DeepEqual(specA, specB) } + +// createTargetIngress creates a copy of the passed in ingress but with certain +// modifications needed to make the target ingress viable in the target cluster. +func createTargetIngress(ing *v1beta1.Ingress) *v1beta1.Ingress { + // We have to do a manual copy since doing a deep copy will + // populate certain ObjectMeta fields we don't want. + targetIng := &v1beta1.Ingress{} + // Copy over name, namespace, annotations. + targetIng.Name = ing.Name + targetIng.Namespace = ing.Namespace + targetIng.Annotations = ing.Annotations + // Copy over spec. + targetIng.Spec = ing.Spec + // Add MCI annotation to "target" ingress. + annotations.AddAnnotation(targetIng, annotations.IngressClassKey, annotations.GceMultiIngressClass) + return targetIng +} + +// updateTargetIng makes sure that targetIng is updated to reflect any changes in ing. +// Note that we only care about namespace, labels, and the spec. We do not care about any +// changes to feature annotations because the target ingress does not process them. +func updateTargetIng(ing *v1beta1.Ingress, targetIng *v1beta1.Ingress) { + targetIng.Namespace = ing.Namespace + targetIng.Labels = ing.Labels + targetIng.Spec = ing.Spec +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index d971a5b498..0728c67d42 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -23,8 +23,11 @@ import ( "google.golang.org/api/googleapi" api_v1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/annotations" + "k8s.io/ingress-gce/pkg/flags" ) const ( @@ -164,6 +167,87 @@ func (s *SvcGetter) Get(svcName, namespace string) (*api_v1.Service, error) { return svc, nil } +// IsGCEIngress returns true if the Ingress matches the class managed by this +// controller. +func IsGCEIngress(ing *extensions.Ingress) bool { + class := annotations.FromIngress(ing).IngressClass() + if flags.F.IngressClass == "" { + return class == "" || class == annotations.GceIngressClass + } + return class == flags.F.IngressClass +} + +// IsGCEMultiClusterIngress returns true if the given Ingress has +// ingress.class annotation set to "gce-multi-cluster". +func IsGCEMultiClusterIngress(ing *extensions.Ingress) bool { + class := annotations.FromIngress(ing).IngressClass() + return class == annotations.GceMultiIngressClass +} + +// StoreToIngressLister makes a Store that lists Ingress. +// TODO: Move this to cache/listers post 1.1. +type StoreToIngressLister struct { + cache.Store +} + +// List lists all Ingress' in the store (both single and multi cluster ingresses). +func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) { + for _, m := range s.Store.List() { + newIng := m.(*extensions.Ingress) + if IsGCEIngress(newIng) || IsGCEMultiClusterIngress(newIng) { + ing.Items = append(ing.Items, *newIng) + } + } + return ing, nil +} + +// ListGCEIngresses lists all GCE Ingress' in the store. +func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) { + for _, m := range s.Store.List() { + newIng := m.(*extensions.Ingress) + if IsGCEIngress(newIng) { + ing.Items = append(ing.Items, *newIng) + } + } + return ing, nil +} + +// GetServiceIngress gets all the Ingress' that have rules pointing to a service. +// Note that this ignores services without the right nodePorts. +func (s *StoreToIngressLister) GetServiceIngress(svc *api_v1.Service) (ings []extensions.Ingress, err error) { +IngressLoop: + for _, m := range s.Store.List() { + ing := *m.(*extensions.Ingress) + if ing.Namespace != svc.Namespace { + continue + } + + // Check service of default backend + if ing.Spec.Backend != nil && ing.Spec.Backend.ServiceName == svc.Name { + ings = append(ings, ing) + continue + } + + // Check the target service for each path rule + for _, rule := range ing.Spec.Rules { + if rule.IngressRuleValue.HTTP == nil { + continue + } + for _, p := range rule.IngressRuleValue.HTTP.Paths { + if p.Backend.ServiceName == svc.Name { + ings = append(ings, ing) + // Skip the rest of the rules to avoid duplicate ingresses in list + continue IngressLoop + } + } + } + } + if len(ings) == 0 { + err = fmt.Errorf("no ingress for service %v", svc.Name) + } + return +} + // BackendServiceRelativeResourcePath returns a relative path of the link for a // BackendService given its name. func BackendServiceRelativeResourcePath(name string) string { @@ -181,6 +265,15 @@ func BackendServiceComparablePath(url string) string { return fmt.Sprintf("global/%s", path_parts[1]) } +// func TrimZoneLink takes in a fully qualified zone link and returns just the zone. +func TrimZoneLink(url string) string { + path_parts := strings.Split(url, "zones/") + if len(path_parts) != 2 { + return "" + } + return path_parts[1] +} + // StringsToKeyMap returns the map representation of a list of strings. func StringsToKeyMap(strings []string) map[string]bool { m := make(map[string]bool) diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index c9a14dcaef..66effca7a7 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -118,3 +118,26 @@ func TestBackendServiceComparablePath(t *testing.T) { } } } + +func TestTrimZoneLink(t *testing.T) { + testCases := []struct { + url string + expected string + }{ + { + "https://www.googleapis.com/compute/v1/projects/foo/zones/us-central1-c", + "us-central1-c", + }, + { + "https://www.googleapis.com/comopute/v1/projects/foo/zones/", + "", + }, + } + + for _, tc := range testCases { + res := TrimZoneLink(tc.url) + if res != tc.expected { + t.Errorf("Expected result after zone link trim to be %v, but got %v", tc.expected, res) + } + } +}