Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Final changes to make MCI controller work #238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func main() {
}

enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup)
mciEnabled := flags.F.MultiCluster
stopCh := make(chan struct{})
ctx := context.NewControllerContext(kubeClient, registryClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG)
lbc, err := controller.NewLoadBalancerController(ctx, clusterManager, enableNEG, stopCh)
Expand All @@ -116,7 +117,7 @@ func main() {
if clusterManager.ClusterNamer.UID() != "" {
glog.V(0).Infof("Cluster name is %+v", clusterManager.ClusterNamer.UID())
}
clusterManager.Init(lbc.Translator, lbc.Translator)
clusterManager.Init(lbc.Translator, lbc.Translator, mciEnabled)
glog.V(0).Infof("clusterManager initialized")

if enableNEG {
Expand All @@ -125,7 +126,7 @@ func main() {
glog.V(0).Infof("negController started")
}

if flags.F.MultiCluster {
if mciEnabled {
mciController, _ := mci.NewController(ctx, flags.F.ResyncPeriod, lbc)
go mciController.Run(stopCh)
glog.V(0).Infof("Multi-Cluster Ingress Controller started")
Expand Down
7 changes: 7 additions & 0 deletions pkg/annotations/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,10 @@ func (ing *Ingress) IngressClass() string {
}
return val
}

func AddAnnotation(ing *extensions.Ingress, key, val string) {
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
ing.Annotations[key] = val
}
9 changes: 8 additions & 1 deletion pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const (
// received by GCLB above this RPS are NOT dropped, GCLB continues to distribute
// them across IGs.
// TODO: Should this be math.MaxInt64?
const maxRPS = 1
const maxRPS = 1e14

// Backends implements BackendPool.
type Backends struct {
Expand Down Expand Up @@ -433,6 +433,13 @@ func (b *Backends) ensureBackendService(p ServicePort, igs []*compute.InstanceGr
// edgeHop checks the links of the given backend by executing an edge hop.
// It fixes broken links and updates the Backend accordingly.
func (b *Backends) edgeHop(be *BackendService, igs []*compute.InstanceGroup) error {
// Note: Clearing the backends in the BackendService is an ugly hack to make mci work.
if be.Alpha != nil {
be.Alpha.Backends = nil
} else {
be.Ga.Backends = nil
}

addIGs := getInstanceGroupsToAdd(be, igs)
if len(addIGs) == 0 {
return nil
Expand Down
25 changes: 25 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
crinformerv1alpha1 "k8s.io/cluster-registry/pkg/client/informers_generated/externalversions/clusterregistry/v1alpha1"
"k8s.io/ingress-gce/pkg/informer"
"k8s.io/ingress-gce/pkg/mapper"
"k8s.io/ingress-gce/pkg/target"
"k8s.io/ingress-gce/pkg/utils"
)

// ControllerContext holds resources necessary for the general
Expand Down Expand Up @@ -60,6 +62,7 @@ type MultiClusterContext struct {
MCIEnabled bool
ClusterClients map[string]kubernetes.Interface
ClusterInformerManagers map[string]informer.ClusterInformerManager
ClusterResourceManagers map[string]target.TargetResourceManager
ClusterServiceMappers map[string]mapper.ClusterServiceMapper
}

Expand Down Expand Up @@ -87,6 +90,7 @@ func NewControllerContext(kubeClient kubernetes.Interface, registryClient crclie
context.MC.ClusterClients = make(map[string]kubernetes.Interface)
context.MC.ClusterInformerManagers = make(map[string]informer.ClusterInformerManager)
context.MC.ClusterServiceMappers = make(map[string]mapper.ClusterServiceMapper)
context.MC.ClusterResourceManagers = make(map[string]target.TargetResourceManager)
}

return context
Expand Down Expand Up @@ -145,3 +149,24 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) {
go ctx.MC.ClusterInformer.Run(stopCh)
}
}

func (ctx *ControllerContext) ResourceManagers() (resourceManagers map[string]target.TargetResourceManager) {
if !ctx.MC.MCIEnabled {
return nil
}
return ctx.MC.ClusterResourceManagers
}

// If in MCI mode, ServiceMappers() gets mappers for all clusters in the
// cluster registry. This is because for now, we assume that all ingresses live
// in all "target" clusters. This will change once we start taking the
// MultiClusterConfig into account. If we are not in MCI mode,
// then ServiceMappers() consists of the mapper for the local cluster
func (ctx *ControllerContext) ServiceMappers() (svcMappers map[string]mapper.ClusterServiceMapper) {
if ctx.MC.MCIEnabled {
return ctx.MC.ClusterServiceMappers
}
// Create a ClusterServiceMapper for the local cluster.
svcGetter := utils.SvcGetter{Store: ctx.ServiceInformer.GetStore()}
return map[string]mapper.ClusterServiceMapper{"local": mapper.NewClusterServiceMapper(svcGetter.Get, nil)}
}
10 changes: 6 additions & 4 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ type ClusterManager struct {
}

// Init initializes the cluster manager.
func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider) {
func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider, mciEnabled bool) {
c.instancePool.Init(zl)
c.backendPool.Init(pp)
if !mciEnabled {
c.backendPool.Init(pp)
}
// TODO: Initialize other members as needed.
}

Expand Down Expand Up @@ -135,8 +137,8 @@ func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servic
return igs, err
}

func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string) error {
return c.firewallPool.Sync(nodeNames, endpointPorts...)
func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string, mciEnabled bool) error {
return c.firewallPool.Sync(nodeNames, mciEnabled, endpointPorts...)
}

// GC garbage collects unused resources.
Expand Down
Loading