Skip to content

Commit

Permalink
Merge pull request #217 from nicksardo/mci-watcher
Browse files Browse the repository at this point in the history
Bootstrap multi-cluster controller
  • Loading branch information
nicksardo authored Apr 13, 2018
2 parents 9c78af6 + 41460b2 commit 4795047
Show file tree
Hide file tree
Showing 19 changed files with 447 additions and 32 deletions.
7 changes: 5 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (

crdclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
crclient "k8s.io/cluster-registry/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"

"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller"
"k8s.io/ingress-gce/pkg/mci"
neg "k8s.io/ingress-gce/pkg/neg"

"k8s.io/ingress-gce/cmd/glbc/app"
Expand Down Expand Up @@ -82,6 +84,14 @@ func main() {
}
}

var registryClient crclient.Interface
if flags.F.MultiCluster {
registryClient, err = crclient.NewForConfig(kubeConfig)
if err != nil {
glog.Fatalf("Failed to create Cluster Registry client: %v", err)
}
}

namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, controller.DefaultFirewallName)
if err != nil {
glog.Fatalf("%v", err)
Expand All @@ -96,8 +106,8 @@ func main() {

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG)
ctx := context.NewControllerContext(kubeClient, registryClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(ctx, clusterManager, enableNEG, stopCh)
if err != nil {
glog.Fatalf("Error creating load balancer controller: %v", err)
}
Expand All @@ -109,11 +119,17 @@ func main() {
glog.V(0).Infof("clusterManager initialized")

if enableNEG {
negController, _ := neg.NewController(kubeClient, cloud, ctx, lbc.Translator, namer, flags.F.ResyncPeriod)
negController, _ := neg.NewController(cloud, ctx, lbc.Translator, namer, flags.F.ResyncPeriod)
go negController.Run(stopCh)
glog.V(0).Infof("negController started")
}

if flags.F.MultiCluster {
mciController, _ := mci.NewController(ctx, flags.F.ResyncPeriod)
go mciController.Run(stopCh)
glog.V(0).Infof("Multi-Cluster Ingress Controller started")
}

go app.RunHTTPServer(lbc)
go app.RunSIGTERMHandler(lbc, flags.F.DeleteAllOnQuit)

Expand Down
26 changes: 20 additions & 6 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
crclient "k8s.io/cluster-registry/pkg/client/clientset_generated/clientset"
crinformerv1alpha1 "k8s.io/cluster-registry/pkg/client/informers_generated/externalversions/clusterregistry/v1alpha1"
)

// ControllerContext holds resources necessary for the general
// workflow of the controller.
type ControllerContext struct {
kubeClient kubernetes.Interface
KubeClient kubernetes.Interface

IngressInformer cache.SharedIndexInformer
ServiceInformer cache.SharedIndexInformer
Expand All @@ -46,31 +47,36 @@ type ControllerContext struct {
// Map of namespace => record.EventRecorder.
recorders map[string]record.EventRecorder

MCContext MultiClusterContext
MC MultiClusterContext
}

// MultiClusterContext holds resource necessary for MCI mode.
type MultiClusterContext struct {
crClient crclient.Interface
RegistryClient crclient.Interface
ClusterInformer cache.SharedIndexInformer
}

// NewControllerContext returns a new shared set of informers.
func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext {
func NewControllerContext(kubeClient kubernetes.Interface, registryClient crclient.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext {
newIndexer := func() cache.Indexers {
return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
}

context := &ControllerContext{
kubeClient: kubeClient,
KubeClient: kubeClient,
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, newIndexer()),
NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, newIndexer()),
MC: MultiClusterContext{RegistryClient: registryClient},
recorders: map[string]record.EventRecorder{},
}
if enableEndpointsInformer {
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, newIndexer())
}
if context.MC.RegistryClient != nil {
context.MC.ClusterInformer = crinformerv1alpha1.NewClusterInformer(registryClient, resyncPeriod, newIndexer())
}

return context
}
Expand All @@ -86,6 +92,9 @@ func (ctx *ControllerContext) HasSynced() bool {
if ctx.EndpointInformer != nil {
funcs = append(funcs, ctx.EndpointInformer.HasSynced)
}
if ctx.MC.ClusterInformer != nil {
funcs = append(funcs, ctx.MC.ClusterInformer.HasSynced)
}
for _, f := range funcs {
if !f() {
return false
Expand All @@ -94,6 +103,7 @@ func (ctx *ControllerContext) HasSynced() bool {
return true
}

// Recorder creates an event recorder for the specified namespace.
func (ctx *ControllerContext) Recorder(ns string) record.EventRecorder {
if rec, ok := ctx.recorders[ns]; ok {
return rec
Expand All @@ -102,7 +112,7 @@ func (ctx *ControllerContext) Recorder(ns string) record.EventRecorder {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{
Interface: ctx.kubeClient.Core().Events(ns),
Interface: ctx.KubeClient.Core().Events(ns),
})
rec := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"})
ctx.recorders[ns] = rec
Expand All @@ -119,4 +129,8 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) {
if ctx.EndpointInformer != nil {
go ctx.EndpointInformer.Run(stopCh)
}

if ctx.MC.ClusterInformer != nil {
go ctx.MC.ClusterInformer.Run(stopCh)
}
}
17 changes: 7 additions & 10 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ var (
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
client kubernetes.Interface
ctx *context.ControllerContext
ctx *context.ControllerContext

ingLister StoreToIngressLister
nodeLister cache.Indexer
Expand Down Expand Up @@ -88,18 +87,16 @@ type LoadBalancerController struct {
}

// NewLoadBalancerController creates a controller for gce loadbalancers.
// - kubeClient: A kubernetes REST client.
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) {
func NewLoadBalancerController(ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool, stopCh chan struct{}) (*LoadBalancerController, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(glog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: kubeClient.Core().Events(""),
Interface: ctx.KubeClient.Core().Events(""),
})
lbc := LoadBalancerController{
client: kubeClient,
ctx: ctx,
ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()},
nodeLister: ctx.NodeInformer.GetIndexer(),
Expand Down Expand Up @@ -174,7 +171,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
ctx.PodInformer.GetIndexer(),
endpointIndexer,
negEnabled)
lbc.tlsLoader = &tls.TLSCertsFromSecretsLoader{Client: lbc.client}
lbc.tlsLoader = &tls.TLSCertsFromSecretsLoader{Client: lbc.ctx.KubeClient}

glog.V(3).Infof("Created new loadbalancer controller")

Expand Down Expand Up @@ -299,7 +296,7 @@ func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ing
if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil {
return err
}
if err = updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations); err != nil {
if err = updateAnnotations(lbc.ctx.KubeClient, ing.Name, ing.Namespace, ing.Annotations); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -367,7 +364,7 @@ func (lbc *LoadBalancerController) ensureIngress(key string, ing *extensions.Ing
// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing *extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingresses(ing.Namespace)
ingClient := lbc.ctx.KubeClient.Extensions().Ingresses(ing.Namespace)

// Update IP through update/status endpoint
ip := l7.GetIP()
Expand Down Expand Up @@ -395,7 +392,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
}
}
annotations := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool)
if err := updateAnnotations(lbc.client, ing.Name, ing.Namespace, annotations); err != nil {
if err := updateAnnotations(lbc.ctx.KubeClient, ing.Name, ing.Namespace, annotations); err != nil {
return err
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func defaultBackendName(clusterName string) string {
func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController {
kubeClient := fake.NewSimpleClientset()
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(kubeClient, stopCh, ctx, cm.ClusterManager, true)
ctx := context.NewControllerContext(kubeClient, nil, api_v1.NamespaceAll, 1*time.Second, true)
lb, err := NewLoadBalancerController(ctx, cm.ClusterManager, true, stopCh)
if err != nil {
t.Fatalf("%v", err)
}
Expand Down Expand Up @@ -202,7 +202,7 @@ func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePo
lbc.ctx.ServiceInformer.GetIndexer().Add(svc)
}
}
lbc.client.Extensions().Ingresses(ing.Namespace).Create(ing)
lbc.ctx.KubeClient.Extensions().Ingresses(ing.Namespace).Create(ing)
lbc.ctx.IngressInformer.GetIndexer().Add(ing)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func gceForTest(negEnabled bool) *GCE {

namer := utils.NewNamer("uid1", "fw1")

ctx := context.NewControllerContext(client, apiv1.NamespaceAll, 1*time.Second, negEnabled)
ctx := context.NewControllerContext(client, nil, apiv1.NamespaceAll, 1*time.Second, negEnabled)
gce := &GCE{
recorders: ctx,
namer: namer,
Expand Down
76 changes: 76 additions & 0 deletions pkg/mci/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mci

import (
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
crv1alpha1 "k8s.io/cluster-registry/pkg/apis/clusterregistry/v1alpha1"
"k8s.io/ingress-gce/pkg/context"
)

// Controller is a barebones multi-cluster ingress controller.
// For now, this controller only logs messages when CRUD operations are
// made on a crv1alpha1.Cluster.
type Controller struct {
resyncPeriod time.Duration

clusterSynced cache.InformerSynced
clusterLister cache.Indexer

// TODO(rramkumar): Add lister for service extension CRD.
}

func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration) (*Controller, error) {
mciController := &Controller{
resyncPeriod: resyncPeriod,
clusterSynced: ctx.MC.ClusterInformer.HasSynced,
clusterLister: ctx.MC.ClusterInformer.GetIndexer(),
}

ctx.MC.ClusterInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c := obj.(*crv1alpha1.Cluster)
glog.V(3).Infof("Cluster %v added", c.Name)
},
DeleteFunc: func(obj interface{}) {
c := obj.(*crv1alpha1.Cluster)
glog.V(3).Infof("Cluster %v deleted", c.Name)
},
UpdateFunc: func(obj, cur interface{}) {
c := obj.(*crv1alpha1.Cluster)
glog.V(3).Infof("Cluster %v updated", c.Name)
},
})
return mciController, nil
}

func (c *Controller) Run(stopCh <-chan struct{}) {
wait.PollUntil(5*time.Second, func() (bool, error) {
glog.V(2).Infof("Waiting for initial sync")
return c.synced(), nil
}, stopCh)

glog.V(2).Infof("Starting Multi-Cluster Ingress controller")
}

func (c *Controller) synced() bool {
return c.clusterSynced()
}
4 changes: 1 addition & 3 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -62,7 +61,6 @@ type Controller struct {

// NewController returns a network endpoint group controller.
func NewController(
kubeClient kubernetes.Interface,
cloud networkEndpointGroupCloud,
ctx *context.ControllerContext,
zoneGetter zoneGetter,
Expand All @@ -74,7 +72,7 @@ func NewController(
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Interface: kubeClient.Core().Events(""),
Interface: ctx.KubeClient.Core().Events(""),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme,
apiv1.EventSource{Component: "neg-controller"})
Expand Down
4 changes: 2 additions & 2 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
)

func newTestController(kubeClient kubernetes.Interface) *Controller {
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
controller, _ := NewController(kubeClient,
context := context.NewControllerContext(kubeClient, nil, apiv1.NamespaceAll, 1*time.Second, true)
controller, _ := NewController(
NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"),
context,
NewFakeZoneGetter(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager {
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
context := context.NewControllerContext(kubeClient, nil, apiv1.NamespaceAll, 1*time.Second, true)
manager := newSyncerManager(
utils.NewNamer(CluseterID, ""),
record.NewFakeRecorder(100),
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (

func NewTestSyncer() *syncer {
kubeClient := fake.NewSimpleClientset()
context := context.NewControllerContext(kubeClient, apiv1.NamespaceAll, 1*time.Second, true)
context := context.NewControllerContext(kubeClient, nil, apiv1.NamespaceAll, 1*time.Second, true)
svcPort := servicePort{
namespace: testServiceNamespace,
name: testServiceName,
Expand Down
Loading

0 comments on commit 4795047

Please sign in to comment.