Skip to content

Commit

Permalink
Implement target ingress watchers and push target ingress in controller
Browse files Browse the repository at this point in the history
  • Loading branch information
rramkumar1 committed Apr 23, 2018
1 parent eadf3c4 commit 154c886
Show file tree
Hide file tree
Showing 21 changed files with 498 additions and 244 deletions.
5 changes: 3 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func main() {
}

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
mciEnabled := flags.F.MultiCluster
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, registryClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(ctx, clusterManager, enableNEG, stopCh)
Expand All @@ -116,7 +117,7 @@ func main() {
if clusterManager.ClusterNamer.UID() != "" {
glog.V(0).Infof("Cluster name is %+v", clusterManager.ClusterNamer.UID())
}
clusterManager.Init(lbc.Translator, lbc.Translator)
clusterManager.Init(lbc.Translator, lbc.Translator, mciEnabled)
glog.V(0).Infof("clusterManager initialized")

if enableNEG {
Expand All @@ -125,7 +126,7 @@ func main() {
glog.V(0).Infof("negController started")
}

if flags.F.MultiCluster {
if mciEnabled {
mciController, _ := mci.NewController(ctx, flags.F.ResyncPeriod, lbc)
go mciController.Run(stopCh)
glog.V(0).Infof("Multi-Cluster Ingress Controller started")
Expand Down
7 changes: 7 additions & 0 deletions pkg/annotations/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,10 @@ func (ing *Ingress) IngressClass() string {
}
return val
}

func AddAnnotation(ing *extensions.Ingress, key, val string) {
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
ing.Annotations[key] = val
}
9 changes: 8 additions & 1 deletion pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const (
// received by GCLB above this RPS are NOT dropped, GCLB continues to distribute
// them across IGs.
// TODO: Should this be math.MaxInt64?
const maxRPS = 1
const maxRPS = 1e14

// Backends implements BackendPool.
type Backends struct {
Expand Down Expand Up @@ -433,6 +433,13 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
// edgeHop checks the links of the given backend by executing an edge hop.
// It fixes broken links and updates the Backend accordingly.
func (b *Backends) edgeHop(be *BackendService, igs []*compute.InstanceGroup) error {
// Note: Clearing the backends in the BackendService is an ugly hack to make mci work.
if be.Alpha != nil {
be.Alpha.Backends = nil
} else {
be.Ga.Backends = nil
}

addIGs := getInstanceGroupsToAdd(be, igs)
if len(addIGs) == 0 {
return nil
Expand Down
25 changes: 25 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
crinformerv1alpha1 "k8s.io/cluster-registry/pkg/client/informers_generated/externalversions/clusterregistry/v1alpha1"
"k8s.io/ingress-gce/pkg/informer"
"k8s.io/ingress-gce/pkg/mapper"
"k8s.io/ingress-gce/pkg/target"
"k8s.io/ingress-gce/pkg/utils"
)

// ControllerContext holds resources necessary for the general
Expand Down Expand Up @@ -60,6 +62,7 @@ type MultiClusterContext struct {
MCIEnabled bool
ClusterClients map[string]kubernetes.Interface
ClusterInformerManagers map[string]informer.ClusterInformerManager
ClusterResourceManagers map[string]target.TargetResourceManager
ClusterServiceMappers map[string]mapper.ClusterServiceMapper
}

Expand Down Expand Up @@ -87,6 +90,7 @@ func NewControllerContext(kubeClient kubernetes.Interface, registryClient crclie
context.MC.ClusterClients = make(map[string]kubernetes.Interface)
context.MC.ClusterInformerManagers = make(map[string]informer.ClusterInformerManager)
context.MC.ClusterServiceMappers = make(map[string]mapper.ClusterServiceMapper)
context.MC.ClusterResourceManagers = make(map[string]target.TargetResourceManager)
}

return context
Expand Down Expand Up @@ -145,3 +149,24 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) {
go ctx.MC.ClusterInformer.Run(stopCh)
}
}

func (ctx *ControllerContext) ResourceManagers() (resourceManagers map[string]target.TargetResourceManager) {
if !ctx.MC.MCIEnabled {
return nil
}
return ctx.MC.ClusterResourceManagers
}

// If in MCI mode, ServiceMappers() gets mappers for all clusters in the
// cluster registry. This is because for now, we assume that all ingresses live
// in all "target" clusters. This will change once we start taking the
// MultiClusterConfig into account. If we are not in MCI mode,
// then ServiceMappers() consists of the mapper for the local cluster
func (ctx *ControllerContext) ServiceMappers() (svcMappers map[string]mapper.ClusterServiceMapper) {
if ctx.MC.MCIEnabled {
return ctx.MC.ClusterServiceMappers
}
// Create a ClusterServiceMapper for the local cluster.
svcGetter := utils.SvcGetter{Store: ctx.ServiceInformer.GetStore()}
return map[string]mapper.ClusterServiceMapper{"local": mapper.NewClusterServiceMapper(svcGetter.Get, nil)}
}
10 changes: 6 additions & 4 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ type ClusterManager struct {
}

// Init initializes the cluster manager.
func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider) {
func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider, mciEnabled bool) {
c.instancePool.Init(zl)
c.backendPool.Init(pp)
if !mciEnabled {
c.backendPool.Init(pp)
}
// TODO: Initialize other members as needed.
}

Expand Down Expand Up @@ -135,8 +137,8 @@ func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servic
return igs, err
}

func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string) error {
return c.firewallPool.Sync(nodeNames, endpointPorts...)
func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string, mciEnabled bool) error {
return c.firewallPool.Sync(nodeNames, mciEnabled, endpointPorts...)
}

// GC garbage collects unused resources.
Expand Down
Loading

0 comments on commit 154c886

Please sign in to comment.