Skip to content

Commit

Permalink
Add events for major lifecycle events
Browse files Browse the repository at this point in the history
{Add,Remove}Nodes from InstanceGroup
SyncIngress, TranslateIngress, IPChanged, GarbageCollection
  • Loading branch information
bowei committed Jun 18, 2020
1 parent 15f450f commit 3bfc101
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 34 deletions.
30 changes: 16 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/ingress-gce/pkg/common/operator"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecks"
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewLoadBalancerController(
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer, ctx)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down Expand Up @@ -139,8 +140,8 @@ func NewLoadBalancerController(
return
}

klog.V(3).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", common.NamespacedName(addIng))
klog.V(2).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
Expand Down Expand Up @@ -176,11 +177,11 @@ func NewLoadBalancerController(
return
}
if reflect.DeepEqual(old, cur) {
klog.V(3).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
klog.V(2).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
} else {
klog.V(3).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
klog.V(2).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
}

lbc.ctx.Recorder(curIng.Namespace).Eventf(curIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(cur)
},
})
Expand Down Expand Up @@ -558,7 +559,8 @@ func (lbc *LoadBalancerController) sync(key string) error {
err := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm)
// Skip emitting an event if ingress does not exist as we cannot retrieve ingress namespace.
if err != nil && ingExists {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", err))
klog.Errorf("Error in GC for %s/%s: %v", ing.Namespace, ing.Name, err)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error: %v", err)
}
// Delete the ingress state for metrics after GC is successful.
if err == nil && ingExists {
Expand All @@ -578,16 +580,16 @@ func (lbc *LoadBalancerController) sync(key string) error {
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPort.ID, lbc.ctx.ClusterNamer)

if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
msg := fmt.Errorf("invalid ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.TranslateIngress, "Translation failed: %v", msg)
return msg
}

// Sync GCP resources.
syncState := &syncState{urlMap, ing, nil}
syncErr := lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error syncing to GCP: %v", syncErr.Error())
} else {
// Insert/update the ingress state for metrics after successful sync.
lbc.metrics.SetIngress(key, metrics.NewIngressState(ing, urlMap.AllServicePorts()))
Expand All @@ -598,7 +600,7 @@ func (lbc *LoadBalancerController) sync(key string) error {
// free up enough quota for the next sync to pass.
frontendGCAlgorithm := frontendGCAlgorithm(ingExists, ing)
if gcErr := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm); gcErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", gcErr))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error during garbage collection: %v", gcErr)
return fmt.Errorf("error during sync %v, error during GC %v", syncErr, gcErr)
}

Expand Down Expand Up @@ -633,7 +635,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
klog.Errorf("PatchIngressStatus(%s/%s) failed: %v", currIng.Namespace, currIng.Name, err)
return err
}
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, events.IPChanged, "IP is now %v", ip)
}
}
annotations, err := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.backendSyncer)
Expand All @@ -655,7 +657,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if apierrors.IsNotFound(err) {
// TODO: this path should be removed when external certificate managers migrate to a better solution.
const msg = "Could not find TLS certificates. Continuing setup for the load balancer to serve HTTP. Note: this behavior is deprecated and will be removed in a future version of ingress-gce"
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", msg)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, msg)
} else {
klog.Errorf("Could not get certificates for ingress %s/%s: %v", ing.Namespace, ing.Name, err)
return nil, err
Expand All @@ -666,7 +668,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if lbc.ctx.FrontendConfigEnabled {
feConfig, err = frontendconfig.FrontendConfigForIngress(lbc.ctx.FrontendConfigs().List(), ing)
if err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("%v", err))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error: %v", err)
}
// Object in cache could be changed in-flight. Deepcopy to
// reduce race conditions.
Expand Down
51 changes: 50 additions & 1 deletion pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,24 @@ limitations under the License.
package events

import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)

const (
// "IngressGCE_" prefix is needed to distinguish these events,
// as they are not attached to a resource.
AddNodes = "IngressGCE_AddNodes"
RemoveNodes = "IngressGCE_RemoveNodes"

SyncIngress = "Sync"
TranslateIngress = "Translate"
IPChanged = "IPChanged"
GarbageCollection = "GarbageCollection"

SyncService = "Sync"
)

type RecorderProducer interface {
Recorder(ns string) record.EventRecorder
}
Expand All @@ -30,3 +45,37 @@ type RecorderProducerMock struct {
func (r RecorderProducerMock) Recorder(ns string) record.EventRecorder {
return &record.FakeRecorder{}
}

// GloablEventf records a Cluster level event not attached to a given object.
func GlobalEventf(r record.EventRecorder, eventtype, reason, messageFmt string, args ...interface{}) {
// Using an empty ObjectReference to indicate no associated
// resource. This apparently works, see the package
// k8s.io/client-go/tools/record.
r.Eventf(&v1.ObjectReference{}, eventtype, reason, messageFmt, args...)
}

const truncatedStringListMax = 1000

// TruncateStringList will render the list of items as a string,
// eliding elements with elipsis at the end if there are more than a
// reasonable number of characters in the resulting string. This is
// used to prevent accidentally dumping enormous strings into the
// Event description.
func TruncatedStringList(items []string) string {
var (
ret = "["
first = true
)
for _, s := range items {
if !first {
ret += ", "
}
ret += s
if len(ret) > truncatedStringListMax {
ret += ", ..."
break
}
first = false
}
return ret + "]"
}
48 changes: 48 additions & 0 deletions pkg/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
"fmt"
"testing"
)

func TestTruncatedStringList(t *testing.T) {
for _, tc := range []struct {
count int
want string
}{
{count: 0, want: "[]"},
{count: 1, want: "[item_0]"},
{count: 2, want: "[item_0, item_1]"},
{count: 1000},
{count: 2000},
} {
var items []string
for i := 0; i < tc.count; i++ {
items = append(items, fmt.Sprintf("item_%d", i))
}
got := TruncatedStringList(items)
const limit = truncatedStringListMax + 200
if len(got) > limit {
t.Errorf("len(TruncatedStringList(%d items)) > %d; got = %q", tc.count, limit, got)
}
if tc.want != "" && tc.want != got {
t.Errorf("TruncatedStringList(%d items) = %q, want %q", tc.count, got, tc.want)
}
}
}
35 changes: 26 additions & 9 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"net/http"
"time"

"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

"google.golang.org/api/compute/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -39,16 +42,22 @@ const (
type Instances struct {
cloud InstanceGroups
ZoneLister
namer namer.BackendNamer
namer namer.BackendNamer
recorder record.EventRecorder
}

type recorderSource interface {
Recorder(ns string) record.EventRecorder
}

// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer) NodePool {
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource) NodePool {
return &Instances{
cloud: cloud,
namer: namer,
cloud: cloud,
namer: namer,
recorder: recorders.Recorder(""), // No namespace
}
}

Expand Down Expand Up @@ -250,7 +259,8 @@ func (i *Instances) splitNodesByZone(names []string) map[string][]string {

// Add adds the given instances to the appropriately zoned Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -260,12 +270,16 @@ func (i *Instances) Add(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("AddInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Remove removes the given instances from the appropriately zoned Instance Group.
func (i *Instances) Remove(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Removing nodes %v from %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -275,7 +289,10 @@ func (i *Instances) Remove(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("RemoveInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Sync nodes with the instances in the instance group.
Expand Down
8 changes: 8 additions & 0 deletions pkg/loadbalancers/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -92,6 +94,8 @@ func (l *L7) checkForwardingRule(protocol namer.NamerProtocol, name, proxyLink,
return nil, err
}
existing = nil
l.recorder.Eventf(l.runtimeInfo.Ingress, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", key.Name)
fw = nil
}
if existing == nil {
// This is a special case where exactly one of http or https forwarding rule
Expand All @@ -115,6 +119,8 @@ func (l *L7) checkForwardingRule(protocol namer.NamerProtocol, name, proxyLink,
if err = composite.CreateForwardingRule(l.cloud, key, fr); err != nil {
return nil, err
}
l.recorder.Eventf(l.runtimeInfo.Ingress, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q created", key.Name)

key, err = l.CreateKey(name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -291,12 +297,14 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I
if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, existingVersion)); err != nil {
return nil, err
}
l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", key.Name)
}
}
klog.V(2).Infof("ensureForwardingRule: Recreating forwarding rule - %s", fr.Name)
if err = composite.CreateForwardingRule(l.cloud, key, fr); err != nil {
return nil, err
}
l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q created", key.Name)

return composite.GetForwardingRule(l.cloud, key, fr.Version)
}
Expand Down
Loading

0 comments on commit 3bfc101

Please sign in to comment.