Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace all uses of Snapshotter with CloudLister #590

Merged
merged 2 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 14 additions & 48 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,21 @@ limitations under the License.
package backends

import (
"fmt"
"net/http"
"time"

"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/storage"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)

// Backends handles CRUD operations for backends.
type Backends struct {
cloud *gce.GCECloud
snapshotter storage.Snapshotter
namer *utils.Namer
cloud *gce.GCECloud
namer *utils.Namer
}

// Backends is a Pool.
Expand All @@ -41,29 +37,11 @@ var _ Pool = (*Backends)(nil)
// NewPool returns a new backend pool.
// - cloud: implements BackendServices
// - namer: procudes names for backends.
// - resyncWithCloud: if true, periodically syncs with cloud resources.
func NewPool(
cloud *gce.GCECloud,
namer *utils.Namer,
resyncWithCloud bool) *Backends {

backendPool := &Backends{
func NewPool(cloud *gce.GCECloud, namer *utils.Namer) *Backends {
return &Backends{
cloud: cloud,
namer: namer,
}
if !resyncWithCloud {
backendPool.snapshotter = storage.NewInMemoryPool()
return backendPool
}
keyFunc := func(i interface{}) (string, error) {
bs := i.(*compute.BackendService)
if !namer.NameBelongsToCluster(bs.Name) {
return "", fmt.Errorf("unrecognized name %v", bs.Name)
}
return bs.Name, nil
}
backendPool.snapshotter = storage.NewCloudListingPool("backends", keyFunc, backendPool, 30*time.Second)
return backendPool
}

// ensureDescription updates the BackendService Description with the expected value
Expand Down Expand Up @@ -99,7 +77,6 @@ func (b *Backends) Create(sp utils.ServicePort, hcLink string) (*composite.Backe
if err := composite.CreateBackendService(be, b.cloud); err != nil {
return nil, err
}
b.snapshotter.Add(name, be)
// Note: We need to perform a GCE call to re-fetch the object we just created
// so that the "Fingerprint" field is filled in. This is needed to update the
// object without error.
Expand All @@ -113,7 +90,6 @@ func (b *Backends) Update(be *composite.BackendService) error {
if err := composite.UpdateBackendService(be, b.cloud); err != nil {
return err
}
b.snapshotter.Add(be.Name, be)
return nil
}

Expand All @@ -133,7 +109,6 @@ func (b *Backends) Get(name string, version meta.Version) (*composite.BackendSer
return nil, err
}
}
b.snapshotter.Add(name, be)
return be, nil
}

Expand All @@ -143,9 +118,6 @@ func (b *Backends) Delete(name string) (err error) {
if utils.IsHTTPErrorCode(err, http.StatusNotFound) {
err = nil
}
if err == nil {
b.snapshotter.Delete(name)
}
}()

glog.V(2).Infof("Deleting backend service %v", name)
Expand Down Expand Up @@ -174,27 +146,21 @@ func (b *Backends) Health(name string) string {
return hs.HealthStatus[0].HealthState
}

// GetLocalSnapshot implements Pool.
func (b *Backends) GetLocalSnapshot() []string {
pool := b.snapshotter.Snapshot()
var keys []string
for name := range pool {
keys = append(keys, name)
}
return keys
}

// List lists all backends.
func (b *Backends) List() ([]interface{}, error) {
// List lists all backends managed by this controller.
func (b *Backends) List() ([]string, error) {
// TODO: for consistency with the rest of this sub-package this method
// should return a list of backend ports.
backends, err := b.cloud.ListGlobalBackendServices()
if err != nil {
return nil, err
}
var ret []interface{}
for _, x := range backends {
ret = append(ret, x)

var names []string

for _, bs := range backends {
if b.namer.NameBelongsToCluster(bs.Name) {
names = append(names, bs.Name)
}
}
return ret, nil
return names, nil
}
2 changes: 1 addition & 1 deletion pkg/backends/ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const defaultZone = "zone-a"

func newTestIGLinker(fakeGCE *gce.GCECloud, fakeInstancePool instances.NodePool) *instanceGroupLinker {
fakeInstancePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
fakeBackendPool := NewPool(fakeGCE, defaultNamer, false)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

// Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to.
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook
Expand Down
10 changes: 5 additions & 5 deletions pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ type Jig struct {
pool Pool
}

func newTestJig(fakeGCE *gce.GCECloud, resyncWithCloud bool) *Jig {
func newTestJig(fakeGCE *gce.GCECloud) *Jig {
fakeHealthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
fakeHealthChecks := healthchecks.NewHealthChecker(fakeHealthCheckProvider, "/", "/healthz", defaultNamer, defaultBackendSvc)
fakeBackendPool := NewPool(fakeGCE, defaultNamer, false)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer)
Expand All @@ -58,13 +58,13 @@ func newTestJig(fakeGCE *gce.GCECloud, resyncWithCloud bool) *Jig {
fakeInstancePool: fakeInstancePool,
linker: NewInstanceGroupLinker(fakeInstancePool, fakeBackendPool, defaultNamer),
syncer: NewBackendSyncer(fakeBackendPool, fakeHealthChecks, defaultNamer, false),
pool: NewPool(fakeGCE, defaultNamer, resyncWithCloud),
pool: fakeBackendPool,
}
}

func TestBackendInstanceGroupClobbering(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
jig := newTestJig(fakeGCE, false)
jig := newTestJig(fakeGCE)

sp := utils.ServicePort{NodePort: 80}
_, err := jig.fakeInstancePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {

func TestSyncChaosMonkey(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
jig := newTestJig(fakeGCE, false)
jig := newTestJig(fakeGCE)

sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP}

Expand Down
4 changes: 2 additions & 2 deletions pkg/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type Pool interface {
Delete(name string) error
// Get the health of a BackendService given its name.
Health(name string) string
// Get a list of BackendService names currently in this pool.
GetLocalSnapshot() []string
// Get a list of BackendService names that are managed by this pool.
List() ([]string, error)
}

// Syncer is an interface to sync Kubernetes services to GCE BackendServices.
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/neg_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

func newTestNEGLinker(fakeNEG negtypes.NetworkEndpointGroupCloud, fakeGCE *gce.GCECloud) *negLinker {
fakeBackendPool := NewPool(fakeGCE, defaultNamer, false)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

// Add standard hooks for mocking update calls. Each test can set a update different hook if it chooses to.
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook
Expand Down
8 changes: 7 additions & 1 deletion pkg/backends/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package backends

import (
"fmt"
"net/http"
"strings"

Expand Down Expand Up @@ -140,7 +141,12 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error {
name := sp.BackendName(s.namer)
knownPorts.Insert(name)
}
backendNames := s.backendPool.GetLocalSnapshot()

backendNames, err := s.backendPool.List()
if err != nil {
return fmt.Errorf("error getting the names of controller-managed backends: %v", err)
}

for _, name := range backendNames {
if knownPorts.Has(name) {
continue
Expand Down
24 changes: 12 additions & 12 deletions pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ var (
}
)

func newTestSyncer(fakeGCE *gce.GCECloud, poolSyncWithCloud bool) *backendSyncer {
func newTestSyncer(fakeGCE *gce.GCECloud) *backendSyncer {
fakeHealthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
fakeHealthChecks := healthchecks.NewHealthChecker(fakeHealthCheckProvider, "/", "/healthz", defaultNamer, defaultBackendSvc)

fakeBackendPool := NewPool(fakeGCE, defaultNamer, poolSyncWithCloud)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

syncer := &backendSyncer{
backendPool: fakeBackendPool,
Expand All @@ -82,7 +82,7 @@ func newTestSyncer(fakeGCE *gce.GCECloud, poolSyncWithCloud bool) *backendSyncer

func TestSync(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

testCases := []utils.ServicePort{
{NodePort: 80, Protocol: annotations.ProtocolHTTP},
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestSync(t *testing.T) {

func TestSyncUpdateHTTPS(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

p := utils.ServicePort{NodePort: 3000, Protocol: annotations.ProtocolHTTP}
syncer.Sync([]utils.ServicePort{p})
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestSyncUpdateHTTPS(t *testing.T) {

func TestSyncUpdateHTTP2(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

p := utils.ServicePort{NodePort: 3000, Protocol: annotations.ProtocolHTTP}
syncer.Sync([]utils.ServicePort{p})
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestSyncUpdateHTTP2(t *testing.T) {

func TestGC(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcNodePorts := []utils.ServicePort{
{NodePort: 81, Protocol: annotations.ProtocolHTTP},
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestSyncQuota(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

bsCreated := 0
quota := len(tc.newPorts)
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestSyncNEG(t *testing.T) {
// Convert a BackendPool from non-NEG to NEG.
// Expect the old BackendServices to be GC'ed
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcPort := utils.ServicePort{NodePort: 81, Protocol: annotations.ProtocolHTTP}
if err := syncer.Sync([]utils.ServicePort{svcPort}); err != nil {
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestSyncNEG(t *testing.T) {

func TestShutdown(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

// Sync a backend and verify that it doesn't exist after Shutdown()
syncer.Sync([]utils.ServicePort{{NodePort: 80}})
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestApplyProbeSettingsToHC(t *testing.T) {

func TestEnsureBackendServiceProtocol(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcPorts := []utils.ServicePort{
{NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}},
Expand Down Expand Up @@ -520,7 +520,7 @@ func TestEnsureBackendServiceProtocol(t *testing.T) {

func TestEnsureBackendServiceDescription(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcPorts := []utils.ServicePort{
{NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}},
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestEnsureBackendServiceDescription(t *testing.T) {

func TestEnsureBackendServiceHealthCheckLink(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

p := utils.ServicePort{NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}}
syncer.Sync([]utils.ServicePort{p})
Expand Down
24 changes: 8 additions & 16 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ func NewLoadBalancerController(

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendHealthCheckPath, ctx.ClusterNamer, ctx.DefaultBackendSvcPortID.Service)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer, true)

backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)
var mcrtLister mcrtv1alpha1.ManagedCertificateLister
if ctx.ManagedCertificateEnabled {
mcrtLister = mcrtv1alpha1.NewManagedCertificateLister(ctx.ManagedCertificateInformer.GetIndexer())
Expand Down Expand Up @@ -386,9 +385,12 @@ func (lbc *LoadBalancerController) SyncLoadBalancer(state interface{}) error {
}

// Create higher-level LB resources.
if err := lbc.l7Pool.Sync(lb); err != nil {
l7, err := lbc.l7Pool.Ensure(lb)
if err != nil {
return err
}

syncState.l7 = l7
return nil
}

Expand All @@ -415,18 +417,8 @@ func (lbc *LoadBalancerController) PostProcess(state interface{}) error {
return fmt.Errorf("expected state type to be syncState, type was %T", state)
}

// Get the loadbalancer and update the ingress status.
ing := syncState.ing
k, err := utils.KeyFunc(ing)
if err != nil {
return fmt.Errorf("cannot get key for Ingress %s/%s: %v", ing.Namespace, ing.Name, err)
}

l7, err := lbc.l7Pool.Get(k)
if err != nil {
return fmt.Errorf("unable to get loadbalancer: %v", err)
}
if err := lbc.updateIngressStatus(l7, ing); err != nil {
// Update the ingress status.
if err := lbc.updateIngressStatus(syncState.l7, syncState.ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
}
return nil
Expand Down Expand Up @@ -473,14 +465,14 @@ func (lbc *LoadBalancerController) sync(key string) error {

// Bootstrap state for GCP sync.
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID)
syncState := &syncState{urlMap, ing}
if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
return msg
}

// Sync GCP resources.
syncState := &syncState{urlMap, ing, nil}
syncErr := lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
Expand Down
5 changes: 0 additions & 5 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,6 @@ func TestIngressClassChange(t *testing.T) {
if len(updatedIng.Status.LoadBalancer.Ingress) != 0 {
t.Error("Ingress status wasn't updated after class changed")
}

// Check LB for ingress is deleted after class changed
if pool, _ := lbc.l7Pool.Get(ingStoreKey); pool != nil {
t.Errorf("LB(%v) wasn't deleted after class changed", ingStoreKey)
}
}

// TestEnsureMCIngress asserts a multi-cluster ingress will result with correct status annotations.
Expand Down
Loading