Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add lbName annotation #565

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/annotations/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (
// responsibility to create/delete it.
StaticIPNameKey = "kubernetes.io/ingress.global-static-ip-name"

// LoadBalancerNameKey tells the Ingress controller to use a specific LoadBalancer
LoadBalancerNameKey = "kubernetes.io/ingress.loadbalancer-name"

// PreSharedCertKey represents the specific pre-shared SSL
// certicate for the Ingress controller to use. The controller *does not*
// manage this certificate, it is the users responsibility to create/delete it.
Expand Down Expand Up @@ -117,6 +120,14 @@ func (ing *Ingress) StaticIPName() string {
return val
}

func (ing *Ingress) LoadBalancerName() string {
val, ok := ing.v[LoadBalancerNameKey]
if !ok {
return ""
}
return val
}

func (ing *Ingress) IngressClass() string {
val, ok := ing.v[IngressClassKey]
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type ControllerContext struct {
ControllerContextConfig

IngressInformer cache.SharedIndexInformer
SecretInformer cache.SharedIndexInformer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to watch secrets?

ServiceInformer cache.SharedIndexInformer
BackendConfigInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
Expand Down Expand Up @@ -96,6 +97,7 @@ func NewControllerContext(
ControllerContextConfig: config,
IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()),
ServiceInformer: informerv1.NewServiceInformer(kubeClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()),
SecretInformer: informerv1.NewSecretInformer(kubeClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()),
PodInformer: informerv1.NewPodInformer(kubeClient, config.Namespace, config.ResyncPeriod, utils.NewNamespaceIndexer()),
NodeInformer: informerv1.NewNodeInformer(kubeClient, config.ResyncPeriod, utils.NewNamespaceIndexer()),
recorders: map[string]record.EventRecorder{},
Expand All @@ -121,6 +123,7 @@ func (ctx *ControllerContext) HasSynced() bool {
ctx.ServiceInformer.HasSynced,
ctx.PodInformer.HasSynced,
ctx.NodeInformer.HasSynced,
ctx.SecretInformer.HasSynced,
}
if ctx.EndpointInformer != nil {
funcs = append(funcs, ctx.EndpointInformer.HasSynced)
Expand Down Expand Up @@ -184,6 +187,7 @@ func (ctx *ControllerContext) HealthCheck() HealthCheckResults {
func (ctx *ControllerContext) Start(stopCh chan struct{}) {
go ctx.IngressInformer.Run(stopCh)
go ctx.ServiceInformer.Run(stopCh)
go ctx.SecretInformer.Run(stopCh)
go ctx.PodInformer.Run(stopCh)
go ctx.NodeInformer.Run(stopCh)
if ctx.EndpointInformer != nil {
Expand Down
161 changes: 145 additions & 16 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net/http"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -42,6 +43,7 @@ import (
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
Expand Down Expand Up @@ -302,9 +304,9 @@ func (lbc *LoadBalancerController) SyncBackends(state interface{}) error {
return err
}

// TODO: Remove this after deprecation
ing := syncState.ing
if utils.IsGCEMultiClusterIngress(syncState.ing) {
// TODO: Remove this after deprecation
if utils.IsGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress and return.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
Expand Down Expand Up @@ -372,19 +374,88 @@ func (lbc *LoadBalancerController) GCBackends(state interface{}) error {
return nil
}

func (lbc *LoadBalancerController) MergeLB(origin *loadbalancers.L7RuntimeInfo,
lbList []*loadbalancers.L7RuntimeInfo) (*loadbalancers.L7RuntimeInfo, error) {
var tls []*loadbalancers.TLSCerts
tlsNameSet := sets.NewString()
allowHTTP := false
managedCertificatesSet := sets.NewString()
staticIPNameSet := sets.NewString()
ipSet := sets.NewString()

for _, lb := range lbList {
// switch to true if there is lb with allowed HTTP
allowHTTP = allowHTTP || lb.AllowHTTP

// split comma-separated values to set of strings to merge values without duplicates
tlsNameSet.Insert(strings.Split(lb.TLSName, ",")...)
ipSet.Insert(strings.Split(lb.IP, ",")...)
managedCertificatesSet.Insert(strings.Split(lb.ManagedCertificates, ",")...)
staticIPNameSet.Insert(lb.StaticIPName)
tls = append(tls, lb.TLS...)
}

if staticIPNameSet.Len() > 1 {
return nil, fmt.Errorf("more than one unique staticIPName %+v", staticIPNameSet.List())
}
if len(ipSet.List()) > 1 {
return nil, fmt.Errorf("more than one unique IP %+v", ipSet.List())
}
staticIPName, _ := staticIPNameSet.PopAny()
ip, _ := ipSet.PopAny()

return &loadbalancers.L7RuntimeInfo{
Name: origin.Name,
IP: ip,
TLS: tls,
TLSName: strings.Join(tlsNameSet.List(), ","),
Ingress: origin.Ingress,
ManagedCertificates: strings.Join(managedCertificatesSet.List(), ","),
AllowHTTP: allowHTTP,
StaticIPName: staticIPName,
UrlMap: origin.UrlMap,
}, nil
}

// SyncLoadBalancer implements Controller.
func (lbc *LoadBalancerController) SyncLoadBalancer(state interface{}) error {
// We expect state to be a syncState
syncState, ok := state.(*syncState)
var lb *loadbalancers.L7RuntimeInfo
if !ok {
return fmt.Errorf("expected state type to be syncState, type was %T", state)
}

lb, err := lbc.toRuntimeInfo(syncState.ing, syncState.urlMap)
lbName, changed, err := lbc.GetLoadBalancerName(syncState.ing)
if err != nil {
return err
}

lb, err = lbc.toRuntimeInfo(lbName, syncState.ing, syncState.urlMap)
if err != nil {
return err
}

if changed {
var lbList []*loadbalancers.L7RuntimeInfo
lbIngresses := operator.Ingresses(lbc.ctx.Ingresses().List()).Filter(func(ing *extensions.Ingress) bool {
annotations := annotations.FromIngress(ing)
return annotations.LoadBalancerName() == lbName && utils.IsGCEIngress(ing)
}).AsList()

for _, ing := range lbIngresses {
if lb, err = lbc.toRuntimeInfo(lbName, ing, syncState.urlMap); err == nil {
lbList = append(lbList, lb)
} else {
return fmt.Errorf("failed to convert ingress %v to runtimeInfo, err %v", ing.Name, err.Error())
}
}
lb, err = lbc.MergeLB(lb, lbList)
if err != nil {
return fmt.Errorf("failed to merge LBs, err %+v", err.Error())
}
}

// Create higher-level LB resources.
if err := lbc.l7Pool.Sync(lb); err != nil {
return err
Expand Down Expand Up @@ -417,9 +488,9 @@ func (lbc *LoadBalancerController) PostProcess(state interface{}) error {

// Get the loadbalancer and update the ingress status.
ing := syncState.ing
k, err := utils.KeyFunc(ing)
k, _, err := lbc.GetLoadBalancerName(ing)
if err != nil {
return fmt.Errorf("cannot get key for Ingress %s/%s: %v", ing.Namespace, ing.Name, err)
return err
}

l7, err := lbc.l7Pool.Get(k)
Expand All @@ -429,9 +500,35 @@ func (lbc *LoadBalancerController) PostProcess(state interface{}) error {
if err := lbc.updateIngressStatus(l7, ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
}

return nil
}

func (lbc *LoadBalancerController) listLBs() (lbNames []string, err error) {
for _, k := range lbc.ctx.Ingresses().ListKeys() {
ing, ok, err := lbc.ctx.Ingresses().GetByKey(k)
if !ok {
continue
}
if err != nil {
return nil, err
}
if utils.IsGCEIngress(ing) {
ann := annotations.FromIngress(ing)
lbName := ann.LoadBalancerName()
if lbName != "" {
if slice.ContainsString(lbNames, lbName, nil) {
continue
}
lbNames = append(lbNames, lbName)
continue
}
}
lbNames = append(lbNames, k)
}
return lbNames, nil
}

// sync manages Ingress create/updates/deletes events from queue.
func (lbc *LoadBalancerController) sync(key string) error {
if !lbc.hasSynced() {
Expand All @@ -447,7 +544,11 @@ func (lbc *LoadBalancerController) sync(key string) error {

// gceSvcPorts contains the ServicePorts used by only single-cluster ingress.
gceSvcPorts := lbc.ToSvcPorts(gceIngresses)
lbNames := lbc.ctx.Ingresses().ListKeys()
lbNames, err := lbc.listLBs()

if err != nil {
return err
}
gcState := &gcState{lbNames, gceSvcPorts}

ing, ingExists, err := lbc.ctx.Ingresses().GetByKey(key)
Expand All @@ -471,15 +572,33 @@ func (lbc *LoadBalancerController) sync(key string) error {
return lbc.ingSyncer.GC(gcState)
}

// Bootstrap state for GCP sync.
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID)
syncState := &syncState{urlMap, ing}
lbName, changed, err := lbc.GetLoadBalancerName(ing)
if err != nil {
return err
}

var urlMap *utils.GCEURLMap
var errs []error

// lbName is changed with annotation
if changed {
lbIngresses := operator.Ingresses(lbc.ctx.Ingresses().List()).Filter(func(ing *extensions.Ingress) bool {
annotations := annotations.FromIngress(ing)
return annotations.LoadBalancerName() == lbName && utils.IsGCEIngress(ing)
}).AsList()
urlMap, errs = lbc.Translator.TranslateIngressList(lbIngresses, lbc.ctx.DefaultBackendSvcPortID)
} else {
urlMap, errs = lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID)
}

if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
return msg
}

syncState := &syncState{urlMap, ing}

// Sync GCP resources.
syncErr := lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
Expand Down Expand Up @@ -538,15 +657,11 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
}

// toRuntimeInfo returns L7RuntimeInfo for the given ingress.
func (lbc *LoadBalancerController) toRuntimeInfo(ing *extensions.Ingress, urlMap *utils.GCEURLMap) (*loadbalancers.L7RuntimeInfo, error) {
k, err := utils.KeyFunc(ing)
if err != nil {
return nil, fmt.Errorf("cannot get key for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}

func (lbc *LoadBalancerController) toRuntimeInfo(k string, ing *extensions.Ingress, urlMap *utils.GCEURLMap) (*loadbalancers.L7RuntimeInfo, error) {
var tls []*loadbalancers.TLSCerts

var err error
annotations := annotations.FromIngress(ing)

// Load the TLS cert from the API Spec if it is not specified in the annotation.
// TODO: enforce this with validation.
if annotations.UseNamedTLS() == "" {
Expand Down Expand Up @@ -601,3 +716,17 @@ func (lbc *LoadBalancerController) ToSvcPorts(ings []*extensions.Ingress) []util
}
return knownPorts
}

func (lbc *LoadBalancerController) GetLoadBalancerName(ing *extensions.Ingress) (string, bool, error) {
annotations := annotations.FromIngress(ing)
lbName := annotations.LoadBalancerName()

if lbName == "" {
k, err := utils.KeyFunc(ing)
if err != nil {
return "", false, fmt.Errorf("cannot get key for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}
return k, false, nil
}
return lbName, true, nil
}
Loading