Skip to content

Commit

Permalink
Add events for major lifecycle events
Browse files Browse the repository at this point in the history
{Add,Remove}Nodes from InstanceGroup
SyncIngress, TranslateIngress, IPChanged, GarbageCollection
  • Loading branch information
bowei committed Jul 15, 2020
1 parent 74680cd commit 66367b4
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 43 deletions.
5 changes: 3 additions & 2 deletions pkg/backends/ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/legacy-cloud-providers/gce"
)
Expand All @@ -47,7 +48,7 @@ func newTestIGLinker(fakeGCE *gce.Cloud, fakeInstancePool instances.NodePool) *i

func TestLink(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{})
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
linker := newTestIGLinker(fakeGCE, fakeNodePool)

Expand Down Expand Up @@ -77,7 +78,7 @@ func TestLink(t *testing.T) {

func TestLinkWithCreationModeError(t *testing.T) {
fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer)
fakeNodePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{})
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
linker := newTestIGLinker(fakeGCE, fakeNodePool)

Expand Down
3 changes: 2 additions & 1 deletion pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/healthchecks"
"k8s.io/ingress-gce/pkg/instances"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/legacy-cloud-providers/gce"
)
Expand All @@ -45,7 +46,7 @@ func newTestJig(fakeGCE *gce.Cloud) *Jig {
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer)
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer, &test.FakeRecorderSource{})
fakeInstancePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})

// Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to.
Expand Down
30 changes: 16 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/ingress-gce/pkg/common/operator"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecks"
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewLoadBalancerController(
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer, ctx)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down Expand Up @@ -139,8 +140,8 @@ func NewLoadBalancerController(
return
}

klog.V(3).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", common.NamespacedName(addIng))
klog.V(2).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
Expand Down Expand Up @@ -176,11 +177,11 @@ func NewLoadBalancerController(
return
}
if reflect.DeepEqual(old, cur) {
klog.V(3).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
klog.V(2).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
} else {
klog.V(3).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
klog.V(2).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
}

lbc.ctx.Recorder(curIng.Namespace).Eventf(curIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(cur)
},
})
Expand Down Expand Up @@ -558,7 +559,8 @@ func (lbc *LoadBalancerController) sync(key string) error {
err := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm)
// Skip emitting an event if ingress does not exist as we cannot retrieve ingress namespace.
if err != nil && ingExists {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", err))
klog.Errorf("Error in GC for %s/%s: %v", ing.Namespace, ing.Name, err)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error: %v", err)
}
// Delete the ingress state for metrics after GC is successful.
if err == nil && ingExists {
Expand All @@ -578,16 +580,16 @@ func (lbc *LoadBalancerController) sync(key string) error {
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPort.ID, lbc.ctx.ClusterNamer)

if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
msg := fmt.Errorf("invalid ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.TranslateIngress, "Translation failed: %v", msg)
return msg
}

// Sync GCP resources.
syncState := &syncState{urlMap, ing, nil}
syncErr := lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error syncing to GCP: %v", syncErr.Error())
} else {
// Insert/update the ingress state for metrics after successful sync.
lbc.metrics.SetIngress(key, metrics.NewIngressState(ing, urlMap.AllServicePorts()))
Expand All @@ -598,7 +600,7 @@ func (lbc *LoadBalancerController) sync(key string) error {
// free up enough quota for the next sync to pass.
frontendGCAlgorithm := frontendGCAlgorithm(ingExists, ing)
if gcErr := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm); gcErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", gcErr))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error during garbage collection: %v", gcErr)
return fmt.Errorf("error during sync %v, error during GC %v", syncErr, gcErr)
}

Expand Down Expand Up @@ -633,7 +635,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
klog.Errorf("PatchIngressStatus(%s/%s) failed: %v", currIng.Namespace, currIng.Name, err)
return err
}
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, events.IPChanged, "IP is now %v", ip)
}
}
annotations, err := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.backendSyncer)
Expand All @@ -655,7 +657,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if apierrors.IsNotFound(err) {
// TODO: this path should be removed when external certificate managers migrate to a better solution.
const msg = "Could not find TLS certificates. Continuing setup for the load balancer to serve HTTP. Note: this behavior is deprecated and will be removed in a future version of ingress-gce"
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", msg)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, msg)
} else {
klog.Errorf("Could not get certificates for ingress %s/%s: %v", ing.Namespace, ing.Name, err)
return nil, err
Expand All @@ -666,7 +668,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if lbc.ctx.FrontendConfigEnabled {
feConfig, err = frontendconfig.FrontendConfigForIngress(lbc.ctx.FrontendConfigs().List(), ing)
if err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("%v", err))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error: %v", err)
}
// Object in cache could be changed in-flight. Deepcopy to
// reduce race conditions.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newLoadBalancerController() *LoadBalancerController {
ctx := context.NewControllerContext(nil, kubeClient, backendConfigClient, nil, fakeGCE, namer, "" /*kubeSystemUID*/, ctxConfig)
lbc := NewLoadBalancerController(ctx, stopCh)
// TODO(rramkumar): Fix this so we don't have to override with our fake
lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer)
lbc.instancePool = instances.NewNodePool(instances.NewFakeInstanceGroups(sets.NewString(), namer), namer, &test.FakeRecorderSource{})
lbc.l7Pool = loadbalancers.NewLoadBalancerPool(fakeGCE, namer, events.RecorderProducerMock{}, namer_util.NewFrontendNamerFactory(namer, ""))
lbc.instancePool.Init(&instances.FakeZoneLister{Zones: []string{fakeZone}})

Expand Down
49 changes: 49 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,22 @@ limitations under the License.
package events

import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)

const (
AddNodes = "IngressGCE_AddNodes"
RemoveNodes = "IngressGCE_RemoveNodes"

SyncIngress = "Sync"
TranslateIngress = "Translate"
IPChanged = "IPChanged"
GarbageCollection = "GarbageCollection"

SyncService = "Sync"
)

type RecorderProducer interface {
Recorder(ns string) record.EventRecorder
}
Expand All @@ -30,3 +43,39 @@ type RecorderProducerMock struct {
func (r RecorderProducerMock) Recorder(ns string) record.EventRecorder {
return &record.FakeRecorder{}
}

// GloablEventf records a Cluster level event not attached to a given object.
func GlobalEventf(r record.EventRecorder, eventtype, reason, messageFmt string, args ...interface{}) {
// Using an empty ObjectReference to indicate no associated
// resource. This apparently works, see the package
// k8s.io/client-go/tools/record.
r.Eventf(&v1.ObjectReference{}, eventtype, reason, messageFmt, args...)
}

// truncatedStringListMax is a variable to make testing easier. This
// value should not be modified.
var truncatedStringListMax = 2000

// TruncateStringList will render the list of items as a string,
// eliding elements with elipsis at the end if there are more than a
// reasonable number of characters in the resulting string. This is
// used to prevent accidentally dumping enormous strings into the
// Event description.
func TruncatedStringList(items []string) string {
var (
ret = "["
first = true
)
for _, s := range items {
if len(ret)+len(s)+1 > truncatedStringListMax {
ret += ", ..."
break
}
if !first {
ret += ", "
}
first = false
ret += s
}
return ret + "]"
}
34 changes: 34 additions & 0 deletions pkg/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package events

import (
"fmt"
"testing"
)

func TestTruncatedStringList(t *testing.T) {
var saved int
truncatedStringListMax, saved = 30, truncatedStringListMax
defer func() { truncatedStringListMax = saved }()

for _, tc := range []struct {
desc string
count int
want string
}{
{"zero", 0, "[]"},
{"one", 1, "[elt-0]"},
{"not truncated", 4, "[elt-0, elt-1, elt-2, elt-3]"},
{"truncated", 20, "[elt-0, elt-1, elt-2, elt-3, ...]"},
} {
t.Run(tc.desc, func(t *testing.T) {
var l []string
for i := 0; i < tc.count; i++ {
l = append(l, fmt.Sprintf("elt-%d", i))
}
got := TruncatedStringList(l)
if got != tc.want {
t.Errorf("TruncatedString(%v) = %q; want %q", l, got, tc.want)
}
})
}
}
35 changes: 26 additions & 9 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"net/http"
"time"

"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

"google.golang.org/api/compute/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -39,16 +42,22 @@ const (
type Instances struct {
cloud InstanceGroups
ZoneLister
namer namer.BackendNamer
namer namer.BackendNamer
recorder record.EventRecorder
}

type recorderSource interface {
Recorder(ns string) record.EventRecorder
}

// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer) NodePool {
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource) NodePool {
return &Instances{
cloud: cloud,
namer: namer,
cloud: cloud,
namer: namer,
recorder: recorders.Recorder(""), // No namespace
}
}

Expand Down Expand Up @@ -250,7 +259,8 @@ func (i *Instances) splitNodesByZone(names []string) map[string][]string {

// Add adds the given instances to the appropriately zoned Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -260,12 +270,16 @@ func (i *Instances) Add(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("AddInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Remove removes the given instances from the appropriately zoned Instance Group.
func (i *Instances) Remove(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Removing nodes %v from %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -275,7 +289,10 @@ func (i *Instances) Remove(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("RemoveInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Sync nodes with the instances in the instance group.
Expand Down
3 changes: 2 additions & 1 deletion pkg/instances/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/namer"
)

Expand All @@ -28,7 +29,7 @@ const defaultZone = "default-zone"
var defaultNamer = namer.NewNamer("uid1", "fw1")

func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
pool := NewNodePool(f, defaultNamer)
pool := NewNodePool(f, defaultNamer, &test.FakeRecorderSource{})
pool.Init(&FakeZoneLister{[]string{zone}})
return pool
}
Expand Down
Loading

0 comments on commit 66367b4

Please sign in to comment.