Skip to content

Commit

Permalink
Add code for building cluster client and other resources.
Browse files Browse the repository at this point in the history
  • Loading branch information
rramkumar1 committed Apr 20, 2018
1 parent f7ffbc6 commit 4b2517d
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 3 deletions.
11 changes: 10 additions & 1 deletion pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"k8s.io/client-go/tools/record"
crclient "k8s.io/cluster-registry/pkg/client/clientset_generated/clientset"
crinformerv1alpha1 "k8s.io/cluster-registry/pkg/client/informers_generated/externalversions/clusterregistry/v1alpha1"
"k8s.io/ingress-gce/pkg/informer"
"k8s.io/ingress-gce/pkg/mapper"
)

// ControllerContext holds resources necessary for the general
Expand All @@ -54,7 +56,11 @@ type ControllerContext struct {
type MultiClusterContext struct {
RegistryClient crclient.Interface
ClusterInformer cache.SharedIndexInformer
MCIEnabled bool

MCIEnabled bool
ClusterClients map[string]kubernetes.Interface
ClusterInformerManagers map[string]informer.ClusterInformerManager
ClusterServiceMappers map[string]mapper.ClusterServiceMapper
}

// NewControllerContext returns a new shared set of informers.
Expand All @@ -78,6 +84,9 @@ func NewControllerContext(kubeClient kubernetes.Interface, registryClient crclie
if context.MC.RegistryClient != nil {
context.MC.ClusterInformer = crinformerv1alpha1.NewClusterInformer(registryClient, resyncPeriod, newIndexer())
context.MC.MCIEnabled = true
context.MC.ClusterClients = make(map[string]kubernetes.Interface)
context.MC.ClusterInformerManagers = make(map[string]informer.ClusterInformerManager)
context.MC.ClusterServiceMappers = make(map[string]mapper.ClusterServiceMapper)
}

return context
Expand Down
76 changes: 74 additions & 2 deletions pkg/mci/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,27 @@ limitations under the License.
package mci

import (
"fmt"
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
clientcmd "k8s.io/client-go/tools/clientcmd/api"
crv1alpha1 "k8s.io/cluster-registry/pkg/apis/clusterregistry/v1alpha1"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/informer"
"k8s.io/ingress-gce/pkg/mapper"
"k8s.io/ingress-gce/pkg/utils"
)

// Controller is a barebones multi-cluster ingress controller.
// For now, this controller only logs messages when CRUD operations are
// made on a crv1alpha1.Cluster.
type Controller struct {
ctx *context.ControllerContext
resyncPeriod time.Duration

clusterSynced cache.InformerSynced
Expand All @@ -44,6 +52,7 @@ type MCIEnqueue interface {

func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration, enqueue MCIEnqueue) (*Controller, error) {
mciController := &Controller{
ctx: ctx,
resyncPeriod: resyncPeriod,
clusterSynced: ctx.MC.ClusterInformer.HasSynced,
clusterLister: ctx.MC.ClusterInformer.GetIndexer(),
Expand All @@ -53,17 +62,20 @@ func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration, e
AddFunc: func(obj interface{}) {
c := obj.(*crv1alpha1.Cluster)
glog.V(3).Infof("Cluster %v added", c.Name)
mciController.handleClusterAdd(c)
// For now, queue up all ingresses
err := enqueue.EnqueueAllIngresses()
if err != nil {
glog.V(3).Infof("Error enqueuing ingress on cluster add: %v", err)
glog.V(3).Infof("Error enqueuing ingresses on add of cluster %v: %v", c.Name, err)
}
},
DeleteFunc: func(obj interface{}) {
c := obj.(*crv1alpha1.Cluster)
glog.V(3).Infof("Cluster %v deleted", c.Name)
mciController.handleClusterDelete(c)
err := enqueue.EnqueueAllIngresses()
if err != nil {
glog.V(3).Infof("Error enqueuing ingress on cluster delete: %v", err)
glog.V(3).Infof("Error enqueuing ingress on delete of cluster %v: %v", c.Name, err)
}
},
UpdateFunc: func(obj, cur interface{}) {
Expand All @@ -74,6 +86,66 @@ func NewController(ctx *context.ControllerContext, resyncPeriod time.Duration, e
return mciController, nil
}

func (controller *Controller) handleClusterAdd(c *crv1alpha1.Cluster) {
client, err := buildClusterClient(c)
if err != nil {
glog.V(3).Infof("Error building client for cluster %v: %v", c.Name, err)
}
// Keep track of the client
controller.ctx.MC.ClusterClients[c.Name] = client
// Create informers for this cluster.
informerManager := informer.NewClusterInformerManager(client, controller.resyncPeriod)
informerManager.CreateInformers()

// TODO(rramkumar): For now, just add event handlers for Ingress.

// Keep track of the informer manager.
controller.ctx.MC.ClusterInformerManagers[c.Name] = informerManager
// Create a service mapper for this cluster
serviceInformer := informerManager.Informers().ServiceInformer
svcGetter := utils.SvcGetter{Store: serviceInformer.GetStore()}
svcMapper := mapper.NewClusterServiceMapper(svcGetter.Get, nil)
// Keep track of the service mapper.
controller.ctx.MC.ClusterServiceMappers[c.Name] = svcMapper
glog.V(3).Infof("Built client and informers for cluster %v", c.Name)
}

func (controller *Controller) handleClusterDelete(c *crv1alpha1.Cluster) {
// Remove client for this cluster
delete(controller.ctx.MC.ClusterClients, c.Name)
// Stop informers.
informerManager := controller.ctx.MC.ClusterInformerManagers[c.Name]
informerManager.DeleteInformers()
delete(controller.ctx.MC.ClusterInformerManagers, c.Name)
// Remove cluster service mappers
delete(controller.ctx.MC.ClusterServiceMappers, c.Name)
glog.V(3).Infof("Removed client and informers for cluster %v", c.Name)
}

// buildClusterClient builds a k8s client given a cluster from the ClusterRegistry.
func buildClusterClient(c *crv1alpha1.Cluster) (kubernetes.Interface, error) {
// Config used to instantiate a client
restConfig := &rest.Config{}
// Get endpoint for the master. For now, we only consider the first endpoint given.
masterEndpoints := c.Spec.KubernetesAPIEndpoints.ServerEndpoints
if len(masterEndpoints) == 0 {
return nil, fmt.Errorf("No master endpoints provided")
}
// Populate config with master endpoint.
restConfig.Host = masterEndpoints[0].ServerAddress
// Don't verify TLS for now
restConfig.TLSClientConfig = rest.TLSClientConfig{Insecure: true}
// Get auth for the master. We assume auth mechanism is using a client cert.
authProviders := c.Spec.AuthInfo.Providers
if len(authProviders) == 0 {
return nil, fmt.Errorf("No auth providers provided")
}
providerConfig := c.Spec.AuthInfo.Providers[0]
// Populate config with client auth.
restConfig.AuthProvider = &clientcmd.AuthProviderConfig{Name: providerConfig.Name, Config: providerConfig.Config}
return kubernetes.NewForConfig(restConfig)
}

func (c *Controller) Run(stopCh <-chan struct{}) {
wait.PollUntil(5*time.Second, func() (bool, error) {
glog.V(2).Infof("Waiting for initial sync")
Expand Down

0 comments on commit 4b2517d

Please sign in to comment.