From 6b1a37a2ab56c7f755748b3b547987e7822a203d Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 5 Jan 2018 16:21:04 -0800 Subject: [PATCH 01/11] Move creation of GCE client into app package so it can be tested --- cmd/glbc/app/clients.go | 26 +++++++++++++++++--------- cmd/glbc/main.go | 27 +++------------------------ 2 files changed, 20 insertions(+), 33 deletions(-) diff --git a/cmd/glbc/app/clients.go b/cmd/glbc/app/clients.go index 429773a78c..bae30be3bd 100644 --- a/cmd/glbc/app/clients.go +++ b/cmd/glbc/app/clients.go @@ -21,6 +21,7 @@ import ( "io" "io/ioutil" "net/http" + "os" "time" "github.com/golang/glog" @@ -60,22 +61,30 @@ func NewKubeClient() (kubernetes.Interface, error) { return kubernetes.NewForConfig(config) } -// NewGCEClient returns a client to the GCE environment. -func NewGCEClient(config io.Reader) *gce.GCECloud { - getConfigReader := func() io.Reader { return nil } +// NewGCEClient returns a client to the GCE environment. This will block until +// a valid configuration file can be read. +func NewGCEClient() *gce.GCECloud { + var configReader func() io.Reader + if Flags.ConfigFilePath != "" { + glog.Infof("Reading config from path %q", Flags.ConfigFilePath) + config, err := os.Open(Flags.ConfigFilePath) + if err != nil { + glog.Fatalf("%v", err) + } + defer config.Close() - if config != nil { allConfig, err := ioutil.ReadAll(config) if err != nil { - glog.Fatalf("Error while reading entire config: %v", err) + glog.Fatalf("Error while reading config (%q): %v", Flags.ConfigFilePath, err) } - glog.V(4).Infof("Using cloudprovider config file: %q", string(allConfig)) + glog.V(4).Infof("Cloudprovider config file contains: %q", string(allConfig)) - getConfigReader = func() io.Reader { + configReader = func() io.Reader { return bytes.NewReader(allConfig) } } else { glog.V(2).Infof("No cloudprovider config file provided, using default values.") + configReader = func() io.Reader { return nil } } // Creating the cloud interface involves resolving the metadata server to get @@ -83,10 +92,9 @@ func NewGCEClient(config io.Reader) *gce.GCECloud { // No errors are thrown. So we need to keep retrying till it works because // we know we're on GCE. for { - provider, err := cloudprovider.GetCloudProvider("gce", getConfigReader()) + provider, err := cloudprovider.GetCloudProvider("gce", configReader()) if err == nil { cloud := provider.(*gce.GCECloud) - // If this controller is scheduled on a node without compute/rw // it won't be allowed to list backends. We can assume that the // user has no need for Ingress in this case. If they grant diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 5e8e4c8caf..9ee004452a 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -57,6 +57,8 @@ func main() { glog.V(0).Infof("argv[%d]: %q", i, a) } + glog.V(2).Infof("Flags = %+v", app.Flags) + kubeClient, err := app.NewKubeClient() if err != nil { glog.Fatalf("Failed to create kubernetes client: %v", err) @@ -67,29 +69,7 @@ func main() { glog.Fatalf("%v", err) } - var cloud *gce.GCECloud - // TODO: Make this more resilient. Currently we create the cloud client - // and pass it through to all the pools. This makes unit testing easier. - // However if the cloud client suddenly fails, we should try to re-create it - // and continue. - if app.Flags.ConfigFilePath != "" { - glog.Infof("Reading config from path %q", app.Flags.ConfigFilePath) - config, err := os.Open(app.Flags.ConfigFilePath) - if err != nil { - glog.Fatalf("%v", err) - } - defer config.Close() - cloud = app.NewGCEClient(config) - glog.Infof("Successfully loaded cloudprovider using config %q", app.Flags.ConfigFilePath) - } else { - // TODO: refactor so this comment does not need to be here. - // While you might be tempted to refactor so we simply assing nil to the - // config and only invoke getGCEClient once, that will not do the right - // thing because a nil check against an interface isn't true in golang. - cloud = app.NewGCEClient(nil) - glog.Infof("Created GCE client without a config file") - } - + cloud := app.NewGCEClient() defaultBackendServicePort := app.DefaultBackendServicePort(kubeClient) clusterManager, err := controller.NewClusterManager(cloud, namer, *defaultBackendServicePort, app.Flags.HealthCheckPath) if err != nil { @@ -99,7 +79,6 @@ func main() { enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup) stopCh := make(chan struct{}) ctx := context.NewControllerContext(kubeClient, app.Flags.WatchNamespace, app.Flags.ResyncPeriod, enableNEG) - // Start loadbalancer controller lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG) if err != nil { glog.Fatalf("Error creating load balancer controller: %v", err) From 90b95abd193427b717647938beba5d4af4d2dc40 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 5 Jan 2018 17:06:09 -0800 Subject: [PATCH 02/11] Some minor cleanups in namer.go --- cmd/glbc/app/namer.go | 80 +++++++++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 37 deletions(-) diff --git a/cmd/glbc/app/namer.go b/cmd/glbc/app/namer.go index be5ab9c748..54346b254f 100644 --- a/cmd/glbc/app/namer.go +++ b/cmd/glbc/app/namer.go @@ -17,8 +17,8 @@ limitations under the License. package app import ( + "crypto/rand" "fmt" - "os" "time" "github.com/golang/glog" @@ -36,8 +36,11 @@ import ( const ( // Key used to persist UIDs to configmaps. uidConfigMapName = "ingress-uid" + // uidByteLength is the length in bytes for the random UID. + uidByteLength = 8 ) +// NewNamer returns a new naming policy given the state of the cluster. func NewNamer(kubeClient kubernetes.Interface, clusterName string, fwName string) (*utils.Namer, error) { name, err := getClusterUID(kubeClient, clusterName) if err != nil { @@ -51,18 +54,18 @@ func NewNamer(kubeClient kubernetes.Interface, clusterName string, fwName string namer := utils.NewNamer(name, fw_name) uidVault := storage.NewConfigMapVault(kubeClient, metav1.NamespaceSystem, uidConfigMapName) - // Start a goroutine to poll the cluster UID config map - // We don't watch because we know exactly which configmap we want and this - // controller already watches 5 other resources, so it isn't worth the cost - // of another connection and complexity. + // Start a goroutine to poll the cluster UID config map. We don't + // watch because we know exactly which configmap we want and this + // controller already watches 5 other resources, so it isn't worth the + // cost of another connection and complexity. go wait.Forever(func() { - for _, key := range [...]string{storage.UidDataKey, storage.ProviderDataKey} { + for _, key := range [...]string{storage.UIDDataKey, storage.ProviderDataKey} { val, found, err := uidVault.Get(key) if err != nil { glog.Errorf("Can't read uidConfigMap %v", uidConfigMapName) } else if !found { errmsg := fmt.Sprintf("Can't read %v from uidConfigMap %v", key, uidConfigMapName) - if key == storage.UidDataKey { + if key == storage.UIDDataKey { glog.Errorf(errmsg) } else { glog.V(4).Infof(errmsg) @@ -70,7 +73,7 @@ func NewNamer(kubeClient kubernetes.Interface, clusterName string, fwName string } else { switch key { - case storage.UidDataKey: + case storage.UIDDataKey: if uid := namer.UID(); uid != val { glog.Infof("Cluster uid changed from %v -> %v", uid, val) namer.SetUID(val) @@ -87,32 +90,33 @@ func NewNamer(kubeClient kubernetes.Interface, clusterName string, fwName string return namer, nil } -// useDefaultOrLookupVault returns either a 'default_name' or if unset, obtains a name from a ConfigMap. -// The returned value follows this priority: -// If the provided 'default_name' is not empty, that name is used. -// This is effectively a client override via a command line flag. -// else, check cfgVault with 'cm_key' as a key and if found, use the associated value +// useDefaultOrLookupVault returns either a 'defaultName' or if unset, obtains +// a name from a ConfigMap. The returned value follows this priority: +// +// If the provided 'defaultName' is not empty, that name is used. +// This is effectively a client override via a command line flag. +// else, check cfgVault with 'configMapKey' as a key and if found, use the associated value // else, return an empty 'name' and pass along an error iff the configmap lookup is erroneous. -func useDefaultOrLookupVault(cfgVault *storage.ConfigMapVault, cm_key, default_name string) (string, error) { - if default_name != "" { - glog.Infof("Using user provided %v %v", cm_key, default_name) - // Don't save the uid in the vault, so users can rollback through - // setting the accompany flag to "" - return default_name, nil +func useDefaultOrLookupVault(cfgVault *storage.ConfigMapVault, configMapKey, defaultName string) (string, error) { + if defaultName != "" { + glog.Infof("Using user provided %v %v", configMapKey, defaultName) + // Don't save the uid in the vault, so users can rollback + // through setting the accompany flag to "" + return defaultName, nil } - val, found, err := cfgVault.Get(cm_key) + val, found, err := cfgVault.Get(configMapKey) if err != nil { // This can fail because of: // 1. No such config map - found=false, err=nil // 2. No such key in config map - found=false, err=nil // 3. Apiserver flake - found=false, err!=nil // It is not safe to proceed in 3. - return "", fmt.Errorf("failed to retrieve %v: %v, returning empty name", cm_key, err) + return "", fmt.Errorf("failed to retrieve %v: %v, returning empty name", configMapKey, err) } else if !found { // Not found but safe to proceed. return "", nil } - glog.Infof("Using %v = %q saved in ConfigMap", cm_key, val) + glog.Infof("Using %v = %q saved in ConfigMap", configMapKey, val) return val, nil } @@ -120,15 +124,15 @@ func useDefaultOrLookupVault(cfgVault *storage.ConfigMapVault, cm_key, default_n // backwards compatibility, the firewall name will default to the cluster UID. // Use getFlagOrLookupVault to obtain a stored or overridden value for the firewall name. // else, use the cluster UID as a backup (this retains backwards compatibility). -func getFirewallName(kubeClient kubernetes.Interface, name, cluster_uid string) (string, error) { +func getFirewallName(kubeClient kubernetes.Interface, name, clusterUID string) (string, error) { cfgVault := storage.NewConfigMapVault(kubeClient, metav1.NamespaceSystem, uidConfigMapName) - if fw_name, err := useDefaultOrLookupVault(cfgVault, storage.ProviderDataKey, name); err != nil { + if firewallName, err := useDefaultOrLookupVault(cfgVault, storage.ProviderDataKey, name); err != nil { return "", err - } else if fw_name != "" { - return fw_name, cfgVault.Put(storage.ProviderDataKey, fw_name) + } else if firewallName != "" { + return firewallName, cfgVault.Put(storage.ProviderDataKey, firewallName) } else { - glog.Infof("Using cluster UID %v as firewall name", cluster_uid) - return cluster_uid, cfgVault.Put(storage.ProviderDataKey, cluster_uid) + glog.Infof("Using cluster UID %v as firewall name", clusterUID) + return clusterUID, cfgVault.Put(storage.ProviderDataKey, clusterUID) } } @@ -140,7 +144,7 @@ func getFirewallName(kubeClient kubernetes.Interface, name, cluster_uid string) // else, allocate a new uid func getClusterUID(kubeClient kubernetes.Interface, name string) (string, error) { cfgVault := storage.NewConfigMapVault(kubeClient, metav1.NamespaceSystem, uidConfigMapName) - if name, err := useDefaultOrLookupVault(cfgVault, storage.UidDataKey, name); err != nil { + if name, err := useDefaultOrLookupVault(cfgVault, storage.UIDDataKey, name); err != nil { return "", err } else if name != "" { return name, nil @@ -158,23 +162,25 @@ func getClusterUID(kubeClient kubernetes.Interface, name string) (string, error) if len(ing.Status.LoadBalancer.Ingress) != 0 { c := namer.ParseName(loadbalancers.GCEResourceName(ing.Annotations, "forwarding-rule")) if c.ClusterName != "" { - return c.ClusterName, cfgVault.Put(storage.UidDataKey, c.ClusterName) + return c.ClusterName, cfgVault.Put(storage.UIDDataKey, c.ClusterName) } glog.Infof("Found a working Ingress, assuming uid is empty string") - return "", cfgVault.Put(storage.UidDataKey, "") + return "", cfgVault.Put(storage.UIDDataKey, "") } } - // Allocate new uid - f, err := os.Open("/dev/urandom") + uid, err := randomUID() if err != nil { return "", err } - defer f.Close() - b := make([]byte, 8) - if _, err := f.Read(b); err != nil { + return uid, cfgVault.Put(storage.UIDDataKey, uid) +} + +func randomUID() (string, error) { + b := make([]byte, uidByteLength) + if _, err := rand.Read(b); err != nil { return "", err } uid := fmt.Sprintf("%x", b) - return uid, cfgVault.Put(storage.UidDataKey, uid) + return uid, nil } From 1115250ce232649f93f7c39cfc92618e94a64378 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 5 Jan 2018 17:06:32 -0800 Subject: [PATCH 03/11] Privatize ConfigMapStore so it can't leak out of the package --- pkg/storage/configmaps.go | 55 ++++++++++++++++++---------------- pkg/storage/configmaps_test.go | 18 +++++------ 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/pkg/storage/configmaps.go b/pkg/storage/configmaps.go index 1c3089c150..e65bbdf0a0 100644 --- a/pkg/storage/configmaps.go +++ b/pkg/storage/configmaps.go @@ -31,19 +31,19 @@ import ( ) const ( - // UidDataKey is the key used in config maps to store the UID. - UidDataKey = "uid" + // UIDDataKey is the key used in config maps to store the UID. + UIDDataKey = "uid" // ProviderDataKey is the key used in config maps to store the Provider // UID which we use to ensure unique firewalls. ProviderDataKey = "provider-uid" ) // ConfigMapVault stores cluster UIDs in config maps. -// It's a layer on top of ConfigMapStore that just implements the utils.uidVault +// It's a layer on top of configMapStore that just implements the utils.uidVault // interface. type ConfigMapVault struct { storeLock sync.Mutex - ConfigMapStore cache.Store + configMapStore cache.Store namespace string name string } @@ -54,7 +54,7 @@ type ConfigMapVault struct { // returns and error of nil instead. func (c *ConfigMapVault) Get(key string) (string, bool, error) { keyStore := fmt.Sprintf("%v/%v", c.namespace, c.name) - item, found, err := c.ConfigMapStore.GetByKey(keyStore) + item, found, err := c.configMapStore.GetByKey(keyStore) if err != nil || !found { return "", false, err } @@ -81,7 +81,7 @@ func (c *ConfigMapVault) Put(key, val string) error { } cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name) - item, exists, err := c.ConfigMapStore.GetByKey(cfgMapKey) + item, exists, err := c.configMapStore.GetByKey(cfgMapKey) if err == nil && exists { data := item.(*api_v1.ConfigMap).Data existingVal, ok := data[key] @@ -96,12 +96,12 @@ func (c *ConfigMapVault) Put(key, val string) error { } else { glog.Infof("Configmap %v will be updated with %v = %v", cfgMapKey, key, val) } - if err := c.ConfigMapStore.Update(apiObj); err != nil { + if err := c.configMapStore.Update(apiObj); err != nil { return fmt.Errorf("failed to update %v: %v", cfgMapKey, err) } } else { apiObj.Data = map[string]string{key: val} - if err := c.ConfigMapStore.Add(apiObj); err != nil { + if err := c.configMapStore.Add(apiObj); err != nil { return fmt.Errorf("failed to add %v: %v", cfgMapKey, err) } } @@ -109,12 +109,12 @@ func (c *ConfigMapVault) Put(key, val string) error { return nil } -// Delete deletes the ConfigMapStore. +// Delete deletes the configMapStore. func (c *ConfigMapVault) Delete() error { cfgMapKey := fmt.Sprintf("%v/%v", c.namespace, c.name) - item, _, err := c.ConfigMapStore.GetByKey(cfgMapKey) + item, _, err := c.configMapStore.GetByKey(cfgMapKey) if err == nil { - return c.ConfigMapStore.Delete(item) + return c.configMapStore.Delete(item) } glog.Warningf("Couldn't find item %v in vault, unable to delete", cfgMapKey) return nil @@ -125,57 +125,57 @@ func (c *ConfigMapVault) Delete() error { // configmaps and the API, and just store/retrieve a single value, the cluster uid. func NewConfigMapVault(c kubernetes.Interface, uidNs, uidConfigMapName string) *ConfigMapVault { return &ConfigMapVault{ - ConfigMapStore: NewConfigMapStore(c), + configMapStore: newConfigMapStore(c), namespace: uidNs, name: uidConfigMapName} } -// NewFakeConfigMapVault is an implementation of the ConfigMapStore that doesn't +// NewFakeConfigMapVault is an implementation of the configMapStore that doesn't // persist configmaps. Only used in testing. func NewFakeConfigMapVault(ns, name string) *ConfigMapVault { return &ConfigMapVault{ - ConfigMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + configMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc), namespace: ns, name: name} } -// ConfigMapStore wraps the store interface. Implementations usually persist +// configMapStore wraps the store interface. Implementations usually persist // contents of the store transparently. -type ConfigMapStore interface { +type configMapStore interface { cache.Store } -// APIServerConfigMapStore only services Add and GetByKey from apiserver. +// APIServerconfigMapStore only services Add and GetByKey from apiserver. // TODO: Implement all the other store methods and make this a write // through cache. -type APIServerConfigMapStore struct { - ConfigMapStore +type APIServerconfigMapStore struct { + configMapStore client kubernetes.Interface } // Add adds the given config map to the apiserver's store. -func (a *APIServerConfigMapStore) Add(obj interface{}) error { +func (a *APIServerconfigMapStore) Add(obj interface{}) error { cfg := obj.(*api_v1.ConfigMap) _, err := a.client.Core().ConfigMaps(cfg.Namespace).Create(cfg) return err } // Update updates the existing config map object. -func (a *APIServerConfigMapStore) Update(obj interface{}) error { +func (a *APIServerconfigMapStore) Update(obj interface{}) error { cfg := obj.(*api_v1.ConfigMap) _, err := a.client.Core().ConfigMaps(cfg.Namespace).Update(cfg) return err } // Delete deletes the existing config map object. -func (a *APIServerConfigMapStore) Delete(obj interface{}) error { +func (a *APIServerconfigMapStore) Delete(obj interface{}) error { cfg := obj.(*api_v1.ConfigMap) return a.client.Core().ConfigMaps(cfg.Namespace).Delete(cfg.Name, &metav1.DeleteOptions{}) } // GetByKey returns the config map for a given key. // The key must take the form namespace/name. -func (a *APIServerConfigMapStore) GetByKey(key string) (item interface{}, exists bool, err error) { +func (a *APIServerconfigMapStore) GetByKey(key string) (item interface{}, exists bool, err error) { nsName := strings.Split(key, "/") if len(nsName) != 2 { return nil, false, fmt.Errorf("failed to get key %v, unexpecte format, expecting ns/name", key) @@ -192,8 +192,11 @@ func (a *APIServerConfigMapStore) GetByKey(key string) (item interface{}, exists return cfg, true, nil } -// NewConfigMapStore returns a config map store capable of persisting updates +// newConfigMapStore returns a config map store capable of persisting updates // to apiserver. -func NewConfigMapStore(c kubernetes.Interface) ConfigMapStore { - return &APIServerConfigMapStore{ConfigMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc), client: c} +func newConfigMapStore(c kubernetes.Interface) configMapStore { + return &APIServerconfigMapStore{ + configMapStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + client: c, + } } diff --git a/pkg/storage/configmaps_test.go b/pkg/storage/configmaps_test.go index 2c40fdeae1..bec2256cdf 100644 --- a/pkg/storage/configmaps_test.go +++ b/pkg/storage/configmaps_test.go @@ -22,18 +22,18 @@ import ( api "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestConfigMapUID(t *testing.T) { +func TestFakeConfigMapVaule(t *testing.T) { vault := NewFakeConfigMapVault(api.NamespaceSystem, "ingress-uid") // Get value from an empty vault. - val, exists, err := vault.Get(UidDataKey) + val, exists, err := vault.Get(UIDDataKey) if exists { t.Errorf("Got value from an empty vault") } - // Store empty value for UidDataKey. + // Store empty value for UIDDataKey. uid := "" - vault.Put(UidDataKey, uid) - val, exists, err = vault.Get(UidDataKey) + vault.Put(UIDDataKey, uid) + val, exists, err = vault.Get(UIDDataKey) if !exists || err != nil { t.Errorf("Failed to retrieve value from vault: %v", err) } @@ -43,8 +43,8 @@ func TestConfigMapUID(t *testing.T) { // Store actual value in key. storedVal := "newuid" - vault.Put(UidDataKey, storedVal) - val, exists, err = vault.Get(UidDataKey) + vault.Put(UIDDataKey, storedVal) + val, exists, err = vault.Get(UIDDataKey) if !exists || err != nil { t.Errorf("Failed to retrieve value from vault") } else if val != storedVal { @@ -59,7 +59,7 @@ func TestConfigMapUID(t *testing.T) { if !exists || err != nil || val != secondVal { t.Errorf("Failed to retrieve second value from vault") } - val, exists, err = vault.Get(UidDataKey) + val, exists, err = vault.Get(UIDDataKey) if !exists || err != nil || val != storedVal { t.Errorf("Failed to retrieve first value from vault") } @@ -68,7 +68,7 @@ func TestConfigMapUID(t *testing.T) { if err := vault.Delete(); err != nil { t.Errorf("Failed to delete uid %v", err) } - if _, exists, _ := vault.Get(UidDataKey); exists { + if _, exists, _ := vault.Get(UIDDataKey); exists { t.Errorf("Found uid but expected none after deletion") } } From 01e4673a2558706a88a7e4dc7c3c3c4068f9f347 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 5 Jan 2018 17:36:24 -0800 Subject: [PATCH 04/11] Adding a proper version variable --- build/build.sh | 2 +- cmd/glbc/app/flags.go | 3 +++ cmd/glbc/main.go | 25 ++++++++----------------- pkg/version/version.go | 21 +++++++++++++++++++++ 4 files changed, 33 insertions(+), 18 deletions(-) create mode 100644 pkg/version/version.go diff --git a/build/build.sh b/build/build.sh index 99f6cc5aa1..90b9c472de 100755 --- a/build/build.sh +++ b/build/build.sh @@ -39,5 +39,5 @@ fi go install \ -installsuffix "static" \ - -ldflags "-X ${PKG}/pkg/version.VERSION=${VERSION}" \ + -ldflags "-X ${PKG}/pkg/version.Version=${VERSION}" \ ./... diff --git a/cmd/glbc/app/flags.go b/cmd/glbc/app/flags.go index 2d0d1a15b6..a32777faae 100644 --- a/cmd/glbc/app/flags.go +++ b/cmd/glbc/app/flags.go @@ -39,6 +39,7 @@ var ( ResyncPeriod time.Duration Verbose bool WatchNamespace string + Version bool }{} ) @@ -80,6 +81,8 @@ the pod secrets for creating a Kubernetes client.`) `Relist and confirm cloud resources this often.`) flag.StringVar(&Flags.WatchNamespace, "watch-namespace", v1.NamespaceAll, `Namespace to watch for Ingress/Services/Endpoints.`) + flag.BoolVar(&Flags.Version, "version", false, + `Print the version of the controller and exit`) // Deprecated flags. flag.BoolVar(&Flags.Verbose, "verbose", false, diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 9ee004452a..fd1c4f14e6 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "fmt" "os" "time" @@ -30,29 +31,21 @@ import ( neg "k8s.io/ingress-gce/pkg/networkendpointgroup" "k8s.io/ingress-gce/cmd/glbc/app" + "k8s.io/ingress-gce/pkg/version" ) -// Entrypoint of GLBC. Example invocation: -// 1. In a pod: -// glbc --delete-all-on-quit -// 2. Dry run (on localhost): -// $ kubectl proxy --api-prefix="/" -// $ glbc --proxy="http://localhost:proxyport" - -const ( - // Current docker image version. Only used in debug logging. - // TODO: this should be populated from the build. - imageVersion = "glbc:0.9.7" -) - -// main function for GLBC. func main() { flag.Parse() if app.Flags.Verbose { flag.Set("v", "4") } - glog.V(0).Infof("Starting GLBC image: %q, cluster name %q", imageVersion, app.Flags.ClusterName) + if app.Flags.Version { + fmt.Printf("Controller version: %s\n", version.Version) + os.Exit(0) + } + + glog.V(0).Infof("Starting GLBC image: %q, cluster name %q", version.Version, app.Flags.ClusterName) for i, a := range os.Args { glog.V(0).Infof("argv[%d]: %q", i, a) } @@ -100,8 +93,6 @@ func main() { go app.RunSIGTERMHandler(lbc, app.Flags.DeleteAllOnQuit) ctx.Start(stopCh) - - glog.V(0).Infof("Starting load balancer controller") lbc.Run() for { diff --git a/pkg/version/version.go b/pkg/version/version.go new file mode 100644 index 0000000000..4093bc0d77 --- /dev/null +++ b/pkg/version/version.go @@ -0,0 +1,21 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +// Version is a version string populated by the build using -ldflags "-X +// ${PKG}/pkg/version.Version=${VERSION}". +var Version = "UNKNOWN" From 51440937aa5709ab1bf01c9484e098083bcc99bf Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 5 Jan 2018 17:55:14 -0800 Subject: [PATCH 05/11] Remove unused function --- pkg/controller/utils.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index 8059dcde32..74aea71fde 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -68,12 +68,6 @@ func (e ErrSvcAppProtosParsing) Error() string { return fmt.Sprintf("could not parse %v annotation on Service %v/%v, err: %v", annotations.ServiceApplicationProtocolKey, e.svc.Namespace, e.svc.Name, e.origErr) } -// compareLinks returns true if the 2 self links are equal. -func compareLinks(l1, l2 string) bool { - // TODO: These can be partial links - return l1 == l2 && l1 != "" -} - // StoreToIngressLister makes a Store that lists Ingress. // TODO: Move this to cache/listers post 1.1. type StoreToIngressLister struct { From 57ab564754f2212a2ec66c3c6ef3b2feccfaecf0 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 5 Jan 2018 18:02:56 -0800 Subject: [PATCH 06/11] Remove unecessary indirection in the Indexers --- pkg/controller/controller.go | 20 ++++++++------------ pkg/controller/controller_test.go | 2 +- pkg/controller/utils.go | 25 +++++-------------------- pkg/controller/utils_test.go | 14 +++++++------- 4 files changed, 21 insertions(+), 40 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index da7799c203..4b75f77546 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -68,10 +68,9 @@ type LoadBalancerController struct { nodeSynced cache.InformerSynced endpointSynced cache.InformerSynced ingLister StoreToIngressLister - nodeLister StoreToNodeLister - svcLister StoreToServiceLister - // Health checks are the readiness probes of containers on pods. - podLister StoreToPodLister + nodeLister cache.Indexer + svcLister cache.Indexer + podLister cache.Indexer // endpoint lister is needed when translating service target port to real endpoint target ports. endpointLister StoreToEndpointLister // TODO: Watch secrets @@ -125,9 +124,9 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru lbc.endpointSynced = func() bool { return true } lbc.ingLister.Store = ctx.IngressInformer.GetStore() - lbc.svcLister.Indexer = ctx.ServiceInformer.GetIndexer() - lbc.podLister.Indexer = ctx.PodInformer.GetIndexer() - lbc.nodeLister.Indexer = ctx.NodeInformer.GetIndexer() + lbc.svcLister = ctx.ServiceInformer.GetIndexer() + lbc.podLister = ctx.PodInformer.GetIndexer() + lbc.nodeLister = ctx.NodeInformer.GetIndexer() if negEnabled { lbc.endpointSynced = ctx.EndpointInformer.HasSynced lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer() @@ -477,16 +476,13 @@ func (lbc *LoadBalancerController) syncNodes(key string) error { if err != nil { return err } - if err := lbc.CloudClusterManager.instancePool.Sync(nodeNames); err != nil { - return err - } - return nil + return lbc.CloudClusterManager.instancePool.Sync(nodeNames) } // getReadyNodeNames returns names of schedulable, ready nodes from the node lister. func (lbc *LoadBalancerController) getReadyNodeNames() ([]string, error) { nodeNames := []string{} - nodes, err := listers.NewNodeLister(lbc.nodeLister.Indexer).ListWithPredicate(utils.NodeIsReady) + nodes, err := listers.NewNodeLister(lbc.nodeLister).ListWithPredicate(utils.NodeIsReady) if err != nil { return nodeNames, err } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index fbce28a50a..3c2583baee 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -196,7 +196,7 @@ func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePo } svcPort.NodePort = int32(pm.getNodePort(path.Backend.ServiceName)) svc.Spec.Ports = []api_v1.ServicePort{svcPort} - lbc.svcLister.Indexer.Add(svc) + lbc.svcLister.Add(svc) } } } diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index 74aea71fde..96a409ff64 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -74,26 +74,6 @@ type StoreToIngressLister struct { cache.Store } -// StoreToNodeLister makes a Store that lists Node. -type StoreToNodeLister struct { - cache.Indexer -} - -// StoreToServiceLister makes a Store that lists Service. -type StoreToServiceLister struct { - cache.Indexer -} - -// StoreToPodLister makes a Store that lists Pods. -type StoreToPodLister struct { - cache.Indexer -} - -// StoreToPodLister makes a Store that lists Endpoints. -type StoreToEndpointLister struct { - cache.Indexer -} - // List lists all Ingress' in the store (both single and multi cluster ingresses). func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) { for _, m := range s.Store.List() { @@ -152,6 +132,11 @@ IngressLoop: return } +// StoreToEndpointLister makes a Store that lists Endpoints. +type StoreToEndpointLister struct { + cache.Indexer +} + func (s *StoreToEndpointLister) ListEndpointTargetPorts(namespace, name, targetPort string) []int { // if targetPort is integer, no need to translate to endpoint ports if i, err := strconv.Atoi(targetPort); err == nil { diff --git a/pkg/controller/utils_test.go b/pkg/controller/utils_test.go index ad1de3eaa4..744d3c2b05 100644 --- a/pkg/controller/utils_test.go +++ b/pkg/controller/utils_test.go @@ -117,7 +117,7 @@ func TestProbeGetterNamedPort(t *testing.T) { {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", } addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) - for _, p := range lbc.podLister.Indexer.List() { + for _, p := range lbc.podLister.List() { pod := p.(*api_v1.Pod) pod.Spec.Containers[0].Ports[0].Name = "test" pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} @@ -167,7 +167,7 @@ func TestProbeGetterCrossNamespace(t *testing.T) { }, }, } - lbc.podLister.Indexer.Add(firstPod) + lbc.podLister.Add(firstPod) nodePortToHealthCheck := map[backends.ServicePort]string{ {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", } @@ -203,7 +203,7 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[backends.Ser } svc.Name = fmt.Sprintf("%d", np.Port) svc.Namespace = ns - lbc.svcLister.Indexer.Add(svc) + lbc.svcLister.Add(svc) pod := &api_v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ @@ -232,7 +232,7 @@ func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[backends.Ser }, }, } - lbc.podLister.Indexer.Add(pod) + lbc.podLister.Add(pod) delay = 2 * delay } } @@ -253,7 +253,7 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { }, }, } - lbc.nodeLister.Indexer.Add(n) + lbc.nodeLister.Add(n) } } lbc.CloudClusterManager.instancePool.Init(lbc.Translator) @@ -334,8 +334,8 @@ func TestGatherFirewallPorts(t *testing.T) { }, } - lbc.endpointLister.Indexer.Add(newDefaultEndpoint(ep1)) - lbc.endpointLister.Indexer.Add(newDefaultEndpoint(ep2)) + lbc.endpointLister.Add(newDefaultEndpoint(ep1)) + lbc.endpointLister.Add(newDefaultEndpoint(ep2)) res := lbc.Translator.GatherFirewallPorts(svcPorts, true) expect := map[int64]bool{ From 308afdc4d71ee6256043184782d8a5b734a0de56 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 5 Jan 2018 20:35:59 -0800 Subject: [PATCH 07/11] Add resource to the task queue log message --- pkg/controller/controller.go | 4 ++-- pkg/utils/taskqueue.go | 11 +++++++---- pkg/utils/taskqueue_test.go | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 4b75f77546..145803b453 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -113,8 +113,8 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru apiv1.EventSource{Component: "loadbalancer-controller"}), negEnabled: negEnabled, } - lbc.nodeQueue = utils.NewPeriodicTaskQueue(lbc.syncNodes) - lbc.ingQueue = utils.NewPeriodicTaskQueue(lbc.sync) + lbc.nodeQueue = utils.NewPeriodicTaskQueue("nodes", lbc.syncNodes) + lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync) lbc.hasSynced = lbc.storesSynced lbc.ingressSynced = ctx.IngressInformer.HasSynced diff --git a/pkg/utils/taskqueue.go b/pkg/utils/taskqueue.go index 4cacf7c6ab..ef4f4118f4 100644 --- a/pkg/utils/taskqueue.go +++ b/pkg/utils/taskqueue.go @@ -36,6 +36,8 @@ type TaskQueue interface { // inserted. If the sync() function results in an error, the item is put on // the work queue after a rate-limit. type PeriodicTaskQueue struct { + // resource is used for logging to distinguish the queue being used. + resource string // keyFunc translates an object to a string-based key. keyFunc func(obj interface{}) (string, error) // queue is the work queue the worker polls. @@ -59,7 +61,7 @@ func (t *PeriodicTaskQueue) Enqueue(obj interface{}) { glog.Errorf("Couldn't get key for object %+v (type %T): %v", obj, obj, err) return } - glog.V(4).Infof("Enqueue key=%q", key) + glog.V(4).Infof("Enqueue key=%q (%v)", key, t.resource) t.queue.Add(key) } @@ -78,9 +80,9 @@ func (t *PeriodicTaskQueue) worker() { close(t.workerDone) return } - glog.V(4).Infof("Syncing %v", key) + glog.V(4).Infof("Syncing %v (%v)", key, t.resource) if err := t.sync(key.(string)); err != nil { - glog.Errorf("Requeuing %q due to error: %v", key, err) + glog.Errorf("Requeuing %q due to error: %v (%v)", key, err, t.resource) t.queue.AddRateLimited(key) } else { t.queue.Forget(key) @@ -91,8 +93,9 @@ func (t *PeriodicTaskQueue) worker() { // NewPeriodicTaskQueue creates a new task queue with the given sync function. // The sync function is called for every element inserted into the queue. -func NewPeriodicTaskQueue(syncFn func(string) error) *PeriodicTaskQueue { +func NewPeriodicTaskQueue(resource string, syncFn func(string) error) *PeriodicTaskQueue { return &PeriodicTaskQueue{ + resource: resource, keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, diff --git a/pkg/utils/taskqueue_test.go b/pkg/utils/taskqueue_test.go index ff6381e9cf..3d746b5a6e 100644 --- a/pkg/utils/taskqueue_test.go +++ b/pkg/utils/taskqueue_test.go @@ -41,7 +41,7 @@ func TestPeriodicTaskQueue(t *testing.T) { } return nil } - tq = NewPeriodicTaskQueue(sync) + tq = NewPeriodicTaskQueue("test", sync) go tq.Run(time.Microsecond, stopCh) tq.Enqueue(cache.ExplicitKey("a")) From 669ae6444b1971c531e7b62a5ed05b9c947fb22f Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Sat, 6 Jan 2018 12:21:34 -0800 Subject: [PATCH 08/11] Reduce surface area of lbc --- pkg/controller/controller.go | 105 ++++++++++++++++------------------- 1 file changed, 48 insertions(+), 57 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 145803b453..f055170dcd 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -62,15 +62,10 @@ var ( type LoadBalancerController struct { client kubernetes.Interface - ingressSynced cache.InformerSynced - serviceSynced cache.InformerSynced - podSynced cache.InformerSynced - nodeSynced cache.InformerSynced - endpointSynced cache.InformerSynced - ingLister StoreToIngressLister - nodeLister cache.Indexer - svcLister cache.Indexer - podLister cache.Indexer + ingLister StoreToIngressLister + nodeLister cache.Indexer + svcLister cache.Indexer + podLister cache.Indexer // endpoint lister is needed when translating service target port to real endpoint target ports. endpointLister StoreToEndpointLister // TODO: Watch secrets @@ -115,20 +110,12 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru } lbc.nodeQueue = utils.NewPeriodicTaskQueue("nodes", lbc.syncNodes) lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync) - lbc.hasSynced = lbc.storesSynced - - lbc.ingressSynced = ctx.IngressInformer.HasSynced - lbc.serviceSynced = ctx.ServiceInformer.HasSynced - lbc.podSynced = ctx.PodInformer.HasSynced - lbc.nodeSynced = ctx.NodeInformer.HasSynced - lbc.endpointSynced = func() bool { return true } - + lbc.hasSynced = hasSyncedFromContext(ctx, negEnabled) lbc.ingLister.Store = ctx.IngressInformer.GetStore() lbc.svcLister = ctx.ServiceInformer.GetIndexer() lbc.podLister = ctx.PodInformer.GetIndexer() lbc.nodeLister = ctx.NodeInformer.GetIndexer() if negEnabled { - lbc.endpointSynced = ctx.EndpointInformer.HasSynced lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer() } @@ -238,23 +225,6 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { return nil } -// storesSynced returns true if all the sub-controllers have finished their -// first sync with apiserver. -func (lbc *LoadBalancerController) storesSynced() bool { - return ( - // wait for pods to sync so we don't allocate a default health check when - // an endpoint has a readiness probe. - lbc.podSynced() && - // wait for services so we don't thrash on backend creation. - lbc.serviceSynced() && - // wait for nodes so we don't disconnect a backend from an instance - // group just because we don't realize there are nodes in that zone. - lbc.nodeSynced() && - // Wait for ingresses as a safety measure. We don't really need this. - lbc.ingressSynced() && - lbc.endpointSynced()) -} - // sync manages Ingress create/updates/deletes. func (lbc *LoadBalancerController) sync(key string) (err error) { if !lbc.hasSynced() { @@ -307,13 +277,12 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { glog.V(3).Infof("Finished syncing %v", key) }() - // TODO: Implement proper backoff for the queue. - eventMsg := "GCE" - // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, lbc.Translator.GatherFirewallPorts(gceNodePorts, len(lbs) > 0)) if err != nil { + // TODO: Implement proper backoff for the queue. + const eventMsg = "GCE" if fwErr, ok := err.(*firewalls.FirewallSyncError); ok { if ingExists { lbc.recorder.Eventf(obj.(*extensions.Ingress), apiv1.EventTypeNormal, eventMsg, fwErr.Message) @@ -342,7 +311,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil { return err } - if err = lbc.updateAnnotations(ing.Name, ing.Namespace, ing.Annotations); err != nil { + if err = updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations); err != nil { return err } return nil @@ -414,26 +383,9 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing } } annotations := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool) - if err := lbc.updateAnnotations(ing.Name, ing.Namespace, annotations); err != nil { - return err - } - return nil -} - -func (lbc *LoadBalancerController) updateAnnotations(name, namespace string, annotations map[string]string) error { - // Update annotations through /update endpoint - ingClient := lbc.client.Extensions().Ingresses(namespace) - currIng, err := ingClient.Get(name, metav1.GetOptions{}) - if err != nil { + if err := updateAnnotations(lbc.client, ing.Name, ing.Namespace, annotations); err != nil { return err } - if !reflect.DeepEqual(currIng.Annotations, annotations) { - glog.V(3).Infof("Updating annotations of %v/%v", namespace, name) - currIng.Annotations = annotations - if _, err := ingClient.Update(currIng); err != nil { - return err - } - } return nil } @@ -494,3 +446,42 @@ func (lbc *LoadBalancerController) getReadyNodeNames() ([]string, error) { } return nodeNames, nil } + +func hasSyncedFromContext(ctx *context.ControllerContext, negEnabled bool) func() bool { + // Wait for all resources to be sync'd to avoid performing actions while + // the controller is still initializing state. + var funcs []func() bool + funcs = append(funcs, []func() bool{ + ctx.IngressInformer.HasSynced, + ctx.ServiceInformer.HasSynced, + ctx.PodInformer.HasSynced, + ctx.NodeInformer.HasSynced, + }...) + if negEnabled { + funcs = append(funcs, ctx.EndpointInformer.HasSynced) + } + return func() bool { + for _, f := range funcs { + if !f() { + return false + } + } + return true + } +} + +func updateAnnotations(client kubernetes.Interface, name, namespace string, annotations map[string]string) error { + ingClient := client.Extensions().Ingresses(namespace) + currIng, err := ingClient.Get(name, metav1.GetOptions{}) + if err != nil { + return err + } + if !reflect.DeepEqual(currIng.Annotations, annotations) { + glog.V(3).Infof("Updating annotations of %v/%v", namespace, name) + currIng.Annotations = annotations + if _, err := ingClient.Update(currIng); err != nil { + return err + } + } + return nil +} From 89960c2ccfc369fe12570c97f041bd755bc591ef Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Sat, 6 Jan 2018 12:28:09 -0800 Subject: [PATCH 09/11] Remove getReadyNodes out of lbc to reduce surface area --- pkg/controller/controller.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f055170dcd..482711cd6d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -249,7 +249,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if err != nil { return err } - nodeNames, err := lbc.getReadyNodeNames() + nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) if err != nil { return err } @@ -424,7 +424,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ingList extensions.IngressList) // syncNodes manages the syncing of kubernetes nodes to gce instance groups. // The instancegroups are referenced by loadbalancer backends. func (lbc *LoadBalancerController) syncNodes(key string) error { - nodeNames, err := lbc.getReadyNodeNames() + nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) if err != nil { return err } @@ -432,9 +432,9 @@ func (lbc *LoadBalancerController) syncNodes(key string) error { } // getReadyNodeNames returns names of schedulable, ready nodes from the node lister. -func (lbc *LoadBalancerController) getReadyNodeNames() ([]string, error) { +func getReadyNodeNames(lister listers.NodeLister) ([]string, error) { nodeNames := []string{} - nodes, err := listers.NewNodeLister(lbc.nodeLister).ListWithPredicate(utils.NodeIsReady) + nodes, err := lister.ListWithPredicate(utils.NodeIsReady) if err != nil { return nodeNames, err } From 101a2595d1be2f9fe3980d18030946cfe51a26e6 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Sat, 6 Jan 2018 12:40:02 -0800 Subject: [PATCH 10/11] Move flags to pkg/ so the settings can be used as globals This remove a great deal of threading for options that don't change during the execution of the controller. --- cmd/glbc/app/clients.go | 18 +++++----- cmd/glbc/app/handlers.go | 6 ++-- cmd/glbc/app/init.go | 10 +++--- cmd/glbc/main.go | 20 +++++++----- pkg/controller/controller.go | 15 ++++----- pkg/controller/controller_test.go | 11 ++++--- pkg/controller/utils.go | 10 ++++-- pkg/controller/utils_test.go | 13 ++++---- {cmd/glbc/app => pkg/flags}/flags.go | 49 ++++++++++++++++------------ 9 files changed, 86 insertions(+), 66 deletions(-) rename {cmd/glbc/app => pkg/flags}/flags.go (68%) diff --git a/cmd/glbc/app/clients.go b/cmd/glbc/app/clients.go index bae30be3bd..5cd432ffa9 100644 --- a/cmd/glbc/app/clients.go +++ b/cmd/glbc/app/clients.go @@ -29,12 +29,14 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/ingress-gce/pkg/utils" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" // Register the GCP authorization provider. _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "k8s.io/ingress-gce/pkg/flags" + "k8s.io/ingress-gce/pkg/utils" ) const ( @@ -44,7 +46,7 @@ const ( // NewKubeClient returns a Kubernetes client given the command line settings. func NewKubeClient() (kubernetes.Interface, error) { - if Flags.InCluster { + if flags.F.InCluster { glog.V(0).Infof("Using in cluster configuration") config, err := rest.InClusterConfig() if err != nil { @@ -53,8 +55,8 @@ func NewKubeClient() (kubernetes.Interface, error) { return kubernetes.NewForConfig(config) } - glog.V(0).Infof("Using APIServerHost=%q, KubeConfig=%q", Flags.APIServerHost, Flags.KubeConfigFile) - config, err := clientcmd.BuildConfigFromFlags(Flags.APIServerHost, Flags.KubeConfigFile) + glog.V(0).Infof("Using APIServerHost=%q, KubeConfig=%q", flags.F.APIServerHost, flags.F.KubeConfigFile) + config, err := clientcmd.BuildConfigFromFlags(flags.F.APIServerHost, flags.F.KubeConfigFile) if err != nil { return nil, err } @@ -65,9 +67,9 @@ func NewKubeClient() (kubernetes.Interface, error) { // a valid configuration file can be read. func NewGCEClient() *gce.GCECloud { var configReader func() io.Reader - if Flags.ConfigFilePath != "" { - glog.Infof("Reading config from path %q", Flags.ConfigFilePath) - config, err := os.Open(Flags.ConfigFilePath) + if flags.F.ConfigFilePath != "" { + glog.Infof("Reading config from path %q", flags.F.ConfigFilePath) + config, err := os.Open(flags.F.ConfigFilePath) if err != nil { glog.Fatalf("%v", err) } @@ -75,7 +77,7 @@ func NewGCEClient() *gce.GCECloud { allConfig, err := ioutil.ReadAll(config) if err != nil { - glog.Fatalf("Error while reading config (%q): %v", Flags.ConfigFilePath, err) + glog.Fatalf("Error while reading config (%q): %v", flags.F.ConfigFilePath, err) } glog.V(4).Infof("Cloudprovider config file contains: %q", string(allConfig)) diff --git a/cmd/glbc/app/handlers.go b/cmd/glbc/app/handlers.go index 157e79e34f..3972c4db07 100644 --- a/cmd/glbc/app/handlers.go +++ b/cmd/glbc/app/handlers.go @@ -25,7 +25,9 @@ import ( "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus/promhttp" + "k8s.io/ingress-gce/pkg/controller" + "k8s.io/ingress-gce/pkg/flags" ) func RunHTTPServer(lbc *controller.LoadBalancerController) { @@ -44,8 +46,8 @@ func RunHTTPServer(lbc *controller.LoadBalancerController) { lbc.Stop(true) }) - glog.V(0).Infof("Running http server on :%v", Flags.HealthzPort) - glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", Flags.HealthzPort), nil)) + glog.V(0).Infof("Running http server on :%v", flags.F.HealthzPort) + glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", flags.F.HealthzPort), nil)) } func RunSIGTERMHandler(lbc *controller.LoadBalancerController, deleteAll bool) { diff --git a/cmd/glbc/app/init.go b/cmd/glbc/app/init.go index abf9d3fedb..1fb7e389e3 100644 --- a/cmd/glbc/app/init.go +++ b/cmd/glbc/app/init.go @@ -28,26 +28,28 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/flags" ) func DefaultBackendServicePort(kubeClient kubernetes.Interface) *backends.ServicePort { // TODO: make this not fatal - if Flags.DefaultSvc == "" { + if flags.F.DefaultSvc == "" { glog.Fatalf("Please specify --default-backend") } // Wait for the default backend Service. There's no pretty way to do this. - parts := strings.Split(Flags.DefaultSvc, "/") + parts := strings.Split(flags.F.DefaultSvc, "/") if len(parts) != 2 { glog.Fatalf("Default backend should take the form namespace/name: %v", - Flags.DefaultSvc) + flags.F.DefaultSvc) } port, nodePort, err := getNodePort(kubeClient, parts[0], parts[1]) if err != nil { glog.Fatalf("Could not configure default backend %v: %v", - Flags.DefaultSvc, err) + flags.F.DefaultSvc, err) } return &backends.ServicePort{ diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index fd1c4f14e6..d6ecf675c8 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -31,47 +31,49 @@ import ( neg "k8s.io/ingress-gce/pkg/networkendpointgroup" "k8s.io/ingress-gce/cmd/glbc/app" + "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/version" ) func main() { + flags.Register() flag.Parse() - if app.Flags.Verbose { + if flags.F.Verbose { flag.Set("v", "4") } - if app.Flags.Version { + if flags.F.Version { fmt.Printf("Controller version: %s\n", version.Version) os.Exit(0) } - glog.V(0).Infof("Starting GLBC image: %q, cluster name %q", version.Version, app.Flags.ClusterName) + glog.V(0).Infof("Starting GLBC image: %q, cluster name %q", version.Version, flags.F.ClusterName) for i, a := range os.Args { glog.V(0).Infof("argv[%d]: %q", i, a) } - glog.V(2).Infof("Flags = %+v", app.Flags) + glog.V(2).Infof("Flags = %+v", flags.F) kubeClient, err := app.NewKubeClient() if err != nil { glog.Fatalf("Failed to create kubernetes client: %v", err) } - namer, err := app.NewNamer(kubeClient, app.Flags.ClusterName, controller.DefaultFirewallName) + namer, err := app.NewNamer(kubeClient, flags.F.ClusterName, controller.DefaultFirewallName) if err != nil { glog.Fatalf("%v", err) } cloud := app.NewGCEClient() defaultBackendServicePort := app.DefaultBackendServicePort(kubeClient) - clusterManager, err := controller.NewClusterManager(cloud, namer, *defaultBackendServicePort, app.Flags.HealthCheckPath) + clusterManager, err := controller.NewClusterManager(cloud, namer, *defaultBackendServicePort, flags.F.HealthCheckPath) if err != nil { glog.Fatalf("Error creating cluster manager: %v", err) } enableNEG := cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureNetworkEndpointGroup) stopCh := make(chan struct{}) - ctx := context.NewControllerContext(kubeClient, app.Flags.WatchNamespace, app.Flags.ResyncPeriod, enableNEG) + ctx := context.NewControllerContext(kubeClient, flags.F.WatchNamespace, flags.F.ResyncPeriod, enableNEG) lbc, err := controller.NewLoadBalancerController(kubeClient, stopCh, ctx, clusterManager, enableNEG) if err != nil { glog.Fatalf("Error creating load balancer controller: %v", err) @@ -84,13 +86,13 @@ func main() { glog.V(0).Infof("clusterManager initialized") if enableNEG { - negController, _ := neg.NewController(kubeClient, cloud, ctx, lbc.Translator, namer, app.Flags.ResyncPeriod) + negController, _ := neg.NewController(kubeClient, cloud, ctx, lbc.Translator, namer, flags.F.ResyncPeriod) go negController.Run(stopCh) glog.V(0).Infof("negController started") } go app.RunHTTPServer(lbc) - go app.RunSIGTERMHandler(lbc, app.Flags.DeleteAllOnQuit) + go app.RunSIGTERMHandler(lbc, flags.F.DeleteAllOnQuit) ctx.Start(stopCh) lbc.Run() diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 482711cd6d..59d3c19bca 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -42,19 +42,16 @@ import ( "k8s.io/ingress-gce/pkg/utils" ) +const ( + DefaultFirewallName = "" + // Frequency to poll on local stores to sync. + storeSyncPollPeriod = 5 * time.Second +) + var ( keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc - - // DefaultClusterUID is the uid to use for clusters resources created by an - // L7 controller created without specifying the --cluster-uid flag. - DefaultClusterUID = "" - // DefaultFirewallName is the name to user for firewall rules created // by an L7 controller when the --fireall-rule is not used. - DefaultFirewallName = "" - - // Frequency to poll on local stores to sync. - storeSyncPollPeriod = 5 * time.Second ) // LoadBalancerController watches the kubernetes api and adds/removes services diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 3c2583baee..c3cc11cc4b 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/firewalls" + "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/loadbalancers" "k8s.io/ingress-gce/pkg/tls" "k8s.io/ingress-gce/pkg/utils" @@ -203,7 +204,7 @@ func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePo func TestLbCreateDelete(t *testing.T) { testFirewallName := "quux" - cm := NewFakeClusterManager(DefaultClusterUID, testFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, testFirewallName) lbc := newLoadBalancerController(t, cm) inputMap1 := map[string]utils.FakeIngressRuleValueMap{ "foo.example.com": { @@ -297,7 +298,7 @@ func TestLbCreateDelete(t *testing.T) { } func TestLbFaultyUpdate(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) inputMap := map[string]utils.FakeIngressRuleValueMap{ "foo.example.com": { @@ -338,7 +339,7 @@ func TestLbFaultyUpdate(t *testing.T) { } func TestLbDefaulting(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) // Make sure the controller plugs in the default values accepted by GCE. ing := newIngress(map[string]utils.FakeIngressRuleValueMap{"": {"": "foo1svc"}}) @@ -358,7 +359,7 @@ func TestLbDefaulting(t *testing.T) { } func TestLbNoService(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) inputMap := map[string]utils.FakeIngressRuleValueMap{ "foo.example.com": { @@ -404,7 +405,7 @@ func TestLbNoService(t *testing.T) { } func TestLbChangeStaticIP(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) inputMap := map[string]utils.FakeIngressRuleValueMap{ "foo.example.com": { diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index 96a409ff64..018bbe3f5f 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -32,13 +32,17 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/flags" ) -// isGCEIngress returns true if the given Ingress either doesn't specify the -// ingress.class annotation, or it's set to "gce". +// isGCEIngress returns true if the Ingress matches the class managed by this +// controller. func isGCEIngress(ing *extensions.Ingress) bool { class := annotations.FromIngress(ing).IngressClass() - return class == "" || class == annotations.GceIngressClass + if flags.F.IngressClass == "" { + return class == "" || class == annotations.GceIngressClass + } + return class == flags.F.IngressClass } // isGCEMultiClusterIngress returns true if the given Ingress has diff --git a/pkg/controller/utils_test.go b/pkg/controller/utils_test.go index 744d3c2b05..b918c092bf 100644 --- a/pkg/controller/utils_test.go +++ b/pkg/controller/utils_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/flags" ) // Pods created in loops start from this time, for routines that @@ -38,7 +39,7 @@ import ( var firstPodCreationTime = time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC) func TestZoneListing(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) zoneToNode := map[string][]string{ "zone-1": {"n1"}, @@ -63,7 +64,7 @@ func TestZoneListing(t *testing.T) { } func TestInstancesAddedToZones(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) zoneToNode := map[string][]string{ "zone-1": {"n1", "n2"}, @@ -92,7 +93,7 @@ func TestInstancesAddedToZones(t *testing.T) { } func TestProbeGetter(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) nodePortToHealthCheck := map[backends.ServicePort]string{ @@ -111,7 +112,7 @@ func TestProbeGetter(t *testing.T) { } func TestProbeGetterNamedPort(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) nodePortToHealthCheck := map[backends.ServicePort]string{ {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", @@ -134,7 +135,7 @@ func TestProbeGetterNamedPort(t *testing.T) { } func TestProbeGetterCrossNamespace(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) firstPod := &api_v1.Pod{ @@ -304,7 +305,7 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) { } func TestGatherFirewallPorts(t *testing.T) { - cm := NewFakeClusterManager(DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) lbc := newLoadBalancerController(t, cm) lbc.CloudClusterManager.defaultBackendNodePort.Port = int64(30000) diff --git a/cmd/glbc/app/flags.go b/pkg/flags/flags.go similarity index 68% rename from cmd/glbc/app/flags.go rename to pkg/flags/flags.go index a32777faae..49ee089993 100644 --- a/cmd/glbc/app/flags.go +++ b/pkg/flags/flags.go @@ -14,19 +14,24 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package flags import ( "flag" "time" "k8s.io/api/core/v1" - "k8s.io/ingress-gce/pkg/controller" +) + +const ( + // DefaultClusterUID is the uid to use for clusters resources created by an + // L7 controller created without specifying the --cluster-uid flag. + DefaultClusterUID = "" ) var ( - // Flags are the command line flags. - Flags = struct { + // F are global flags for the controller. + F = struct { APIServerHost string ClusterName string ConfigFilePath string @@ -35,57 +40,61 @@ var ( HealthCheckPath string HealthzPort int InCluster bool + IngressClass string KubeConfigFile string ResyncPeriod time.Duration Verbose bool - WatchNamespace string Version bool + WatchNamespace string }{} ) -func init() { - flag.StringVar(&Flags.APIServerHost, "apiserver-host", "", +// Register flags with the command line parser. +func Register() { + flag.StringVar(&F.APIServerHost, "apiserver-host", "", `The address of the Kubernetes Apiserver to connect to in the format of protocol://address:port, e.g., http://localhost:8080. If not specified, the assumption is that the binary runs inside a Kubernetes cluster and local discovery is attempted.`) - flag.StringVar(&Flags.ClusterName, "cluster-uid", controller.DefaultClusterUID, + flag.StringVar(&F.ClusterName, "cluster-uid", DefaultClusterUID, `Optional, used to tag cluster wide, shared loadbalancer resources such as instance groups. Use this flag if you'd like to continue using the same resources across a pod restart. Note that this does not need to match the name of you Kubernetes cluster, it's just an arbitrary name used to tag/lookup cloud resources.`) - flag.StringVar(&Flags.ConfigFilePath, "config-file-path", "", + flag.StringVar(&F.ConfigFilePath, "config-file-path", "", `Path to a file containing the gce config. If left unspecified this controller only works with default zones.`) - flag.StringVar(&Flags.DefaultSvc, "default-backend-service", "kube-system/default-http-backend", + flag.StringVar(&F.DefaultSvc, "default-backend-service", "kube-system/default-http-backend", `Service used to serve a 404 page for the default backend. Takes the form namespace/name. The controller uses the first node port of this Service for the default backend.`) - flag.BoolVar(&Flags.DeleteAllOnQuit, "delete-all-on-quit", false, + flag.BoolVar(&F.DeleteAllOnQuit, "delete-all-on-quit", false, `If true, the controller will delete all Ingress and the associated external cloud resources as it's shutting down. Mostly used for testing. In normal environments the controller should only delete a loadbalancer if the associated Ingress is deleted.`) - flag.StringVar(&Flags.HealthCheckPath, "health-check-path", "/", + flag.StringVar(&F.HealthCheckPath, "health-check-path", "/", `Path used to health-check a backend service. All Services must serve a 200 page on this path. Currently this is only configurable globally.`) - flag.IntVar(&Flags.HealthzPort, "healthz-port", 8081, + flag.IntVar(&F.HealthzPort, "healthz-port", 8081, `Port to run healthz server. Must match the health check port in yaml.`) - flag.BoolVar(&Flags.InCluster, "running-in-cluster", true, + flag.BoolVar(&F.InCluster, "running-in-cluster", true, `Optional, if this controller is running in a kubernetes cluster, use the pod secrets for creating a Kubernetes client.`) - flag.StringVar(&Flags.KubeConfigFile, "kubeconfig", "", + flag.StringVar(&F.KubeConfigFile, "kubeconfig", "", `Path to kubeconfig file with authorization and master location information.`) - flag.DurationVar(&Flags.ResyncPeriod, "sync-period", 30*time.Second, + flag.DurationVar(&F.ResyncPeriod, "sync-period", 30*time.Second, `Relist and confirm cloud resources this often.`) - flag.StringVar(&Flags.WatchNamespace, "watch-namespace", v1.NamespaceAll, + flag.StringVar(&F.WatchNamespace, "watch-namespace", v1.NamespaceAll, `Namespace to watch for Ingress/Services/Endpoints.`) - flag.BoolVar(&Flags.Version, "version", false, + flag.BoolVar(&F.Version, "version", false, `Print the version of the controller and exit`) + flag.StringVar(&F.IngressClass, "ingress-class", "", + `If set, overrides what ingress classes are managed by the controller.`) - // Deprecated flags. - flag.BoolVar(&Flags.Verbose, "verbose", false, + // Deprecated F. + flag.BoolVar(&F.Verbose, "verbose", false, `This flag is deprecated. Use -v to control verbosity.`) flag.Bool("use-real-cloud", false, `This flag has been deprecated and no longer has any effect.`) From 0302d6a18ec6b5cb2ada5a589b474d720c760c20 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Sat, 6 Jan 2018 13:11:41 -0800 Subject: [PATCH 11/11] Add name to the logs of CloudListingPool This makes the activities of the Pool easier to follow. --- pkg/backends/backends.go | 27 ++++++++++++--------------- pkg/storage/pools.go | 19 ++++++++++++++----- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/pkg/backends/backends.go b/pkg/backends/backends.go index 89b2220b2e..19972e2a13 100644 --- a/pkg/backends/backends.go +++ b/pkg/backends/backends.go @@ -146,21 +146,18 @@ func NewBackendPool( backendPool.snapshotter = storage.NewInMemoryPool() return backendPool } - backendPool.snapshotter = storage.NewCloudListingPool( - func(i interface{}) (string, error) { - bs := i.(*compute.BackendService) - if !namer.NameBelongsToCluster(bs.Name) { - return "", fmt.Errorf("unrecognized name %v", bs.Name) - } - port, err := namer.BackendPort(bs.Name) - if err != nil { - return "", err - } - return port, nil - }, - backendPool, - 30*time.Second, - ) + keyFunc := func(i interface{}) (string, error) { + bs := i.(*compute.BackendService) + if !namer.NameBelongsToCluster(bs.Name) { + return "", fmt.Errorf("unrecognized name %v", bs.Name) + } + port, err := namer.BackendPort(bs.Name) + if err != nil { + return "", err + } + return port, nil + } + backendPool.snapshotter = storage.NewCloudListingPool("backends", keyFunc, backendPool, 30*time.Second) return backendPool } diff --git a/pkg/storage/pools.go b/pkg/storage/pools.go index 2ced272dec..ed41e2ac2e 100644 --- a/pkg/storage/pools.go +++ b/pkg/storage/pools.go @@ -66,6 +66,8 @@ type cloudLister interface { // CloudListingPool wraps InMemoryPool but relists from the cloud periodically. type CloudListingPool struct { + // name is used to distinguish different pools in the logs. + name string // A lock to protect against concurrent mutation of the pool lock sync.Mutex // The pool that is re-populated via re-list from cloud, and written to @@ -88,7 +90,7 @@ type CloudListingPool struct { func (c *CloudListingPool) ReplenishPool() { c.lock.Lock() defer c.lock.Unlock() - glog.V(4).Infof("Replenishing pool") + glog.V(4).Infof("Replenishing pool %q", c.name) // We must list with the lock, because the controller also lists through // Snapshot(). It's ok if the controller takes a snpshot, we list, we @@ -97,14 +99,14 @@ func (c *CloudListingPool) ReplenishPool() { // creates a backend, and we delete that backend based on stale state. items, err := c.lister.List() if err != nil { - glog.Warningf("Failed to list: %v", err) + glog.Warningf("Failed to list %q: %v", c.name, err) return } for i := range items { key, err := c.keyGetter(items[i]) if err != nil { - glog.V(5).Infof("CloudListingPool: %v", err) + glog.V(5).Infof("CloudListingPool %q: %v", c.name, err) continue } c.InMemoryPool.Add(key, items[i]) @@ -115,6 +117,8 @@ func (c *CloudListingPool) ReplenishPool() { func (c *CloudListingPool) Snapshot() map[string]interface{} { c.lock.Lock() defer c.lock.Unlock() + + glog.V(4).Infof("Snapshot %q", c.name) return c.InMemoryPool.Snapshot() } @@ -122,6 +126,8 @@ func (c *CloudListingPool) Snapshot() map[string]interface{} { func (c *CloudListingPool) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() + + glog.V(4).Infof("Add %q: %q, %+v", c.name, key, obj) c.InMemoryPool.Add(key, obj) } @@ -129,18 +135,21 @@ func (c *CloudListingPool) Add(key string, obj interface{}) { func (c *CloudListingPool) Delete(key string) { c.lock.Lock() defer c.lock.Unlock() + + glog.V(4).Infof("Delete %q: %q", c.name, key) c.InMemoryPool.Delete(key) } // NewCloudListingPool replenishes the InMemoryPool through a background // goroutine that lists from the given cloudLister. -func NewCloudListingPool(k keyFunc, lister cloudLister, relistPeriod time.Duration) *CloudListingPool { +func NewCloudListingPool(name string, k keyFunc, lister cloudLister, relistPeriod time.Duration) *CloudListingPool { cl := &CloudListingPool{ + name: name, InMemoryPool: NewInMemoryPool(), lister: lister, keyGetter: k, } - glog.V(4).Infof("Starting pool replenish goroutine") + glog.V(4).Infof("Starting pool %q", cl.name) go wait.Until(cl.ReplenishPool, relistPeriod, make(chan struct{})) return cl }