Skip to content

Commit

Permalink
Add events for major lifecycle events
Browse files Browse the repository at this point in the history
{Add,Remove}Nodes from InstanceGroup
SyncIngress, TranslateIngress, IPChanged, GarbageCollection
  • Loading branch information
bowei committed Jun 6, 2020
1 parent 67471cd commit ea114ca
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 33 deletions.
30 changes: 16 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/ingress-gce/pkg/common/operator"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/controller/translator"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecks"
Expand Down Expand Up @@ -108,7 +109,7 @@ func NewLoadBalancerController(
})

healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendSvcPort.ID.Service)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer, ctx)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)

lbc := LoadBalancerController{
Expand Down Expand Up @@ -139,8 +140,8 @@ func NewLoadBalancerController(
return
}

klog.V(3).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", common.NamespacedName(addIng))
klog.V(2).Infof("Ingress %v added, enqueuing", common.NamespacedName(addIng))
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
Expand Down Expand Up @@ -176,11 +177,11 @@ func NewLoadBalancerController(
return
}
if reflect.DeepEqual(old, cur) {
klog.V(3).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
klog.V(2).Infof("Periodic enqueueing of %v", common.NamespacedName(curIng))
} else {
klog.V(3).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
klog.V(2).Infof("Ingress %v changed, enqueuing", common.NamespacedName(curIng))
}

lbc.ctx.Recorder(curIng.Namespace).Eventf(curIng, apiv1.EventTypeNormal, events.SyncIngress, "Scheduled for sync")
lbc.ingQueue.Enqueue(cur)
},
})
Expand Down Expand Up @@ -558,7 +559,8 @@ func (lbc *LoadBalancerController) sync(key string) error {
err := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm)
// Skip emitting an event if ingress does not exist as we cannot retrieve ingress namespace.
if err != nil && ingExists {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", err))
klog.Errorf("Error in GC for %s/%s: %v", ing.Namespace, ing.Name, err)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error: %v", err)
}
// Delete the ingress state for metrics after GC is successful.
if err == nil && ingExists {
Expand All @@ -578,16 +580,16 @@ func (lbc *LoadBalancerController) sync(key string) error {
urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPort.ID, lbc.ctx.ClusterNamer)

if errs != nil {
msg := fmt.Errorf("error while evaluating the ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Translate", msg.Error())
msg := fmt.Errorf("invalid ingress spec: %v", utils.JoinErrs(errs))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.TranslateIngress, "Translation failed: %v", msg)
return msg
}

// Sync GCP resources.
syncState := &syncState{urlMap, ing, nil}
syncErr := lbc.ingSyncer.Sync(syncState)
if syncErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", syncErr.Error()))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error syncing to GCP: %v", syncErr.Error())
} else {
// Insert/update the ingress state for metrics after successful sync.
lbc.metrics.SetIngress(key, metrics.NewIngressState(ing, urlMap.AllServicePorts()))
Expand All @@ -598,7 +600,7 @@ func (lbc *LoadBalancerController) sync(key string) error {
// free up enough quota for the next sync to pass.
frontendGCAlgorithm := frontendGCAlgorithm(ingExists, ing)
if gcErr := lbc.ingSyncer.GC(allIngresses, ing, frontendGCAlgorithm); gcErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "GC", fmt.Sprintf("Error during GC: %v", gcErr))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.GarbageCollection, "Error during garbage collection: %v", gcErr)
return fmt.Errorf("error during sync %v, error during GC %v", syncErr, gcErr)
}

Expand Down Expand Up @@ -633,7 +635,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
klog.Errorf("PatchIngressStatus(%s/%s) failed: %v", currIng.Namespace, currIng.Name, err)
return err
}
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip)
lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, events.IPChanged, "IP is now %v", ip)
}
}
annotations, err := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.backendSyncer)
Expand All @@ -655,7 +657,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if apierrors.IsNotFound(err) {
// TODO: this path should be removed when external certificate managers migrate to a better solution.
const msg = "Could not find TLS certificates. Continuing setup for the load balancer to serve HTTP. Note: this behavior is deprecated and will be removed in a future version of ingress-gce"
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", msg)
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, msg)
} else {
klog.Errorf("Could not get certificates for ingress %s/%s: %v", ing.Namespace, ing.Name, err)
return nil, err
Expand All @@ -666,7 +668,7 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ing *v1beta1.Ingress, urlMap *u
if lbc.ctx.FrontendConfigEnabled {
feConfig, err = frontendconfig.FrontendConfigForIngress(lbc.ctx.FrontendConfigs().List(), ing)
if err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("%v", err))
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, events.SyncIngress, "Error: %v", err)
}
// Object in cache could be changed in-flight. Deepcopy to
// reduce race conditions.
Expand Down
46 changes: 46 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,22 @@ limitations under the License.
package events

import (
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
)

const (
AddNodes = "IngressGCE_AddNodes"
RemoveNodes = "IngressGCE_RemoveNodes"

SyncIngress = "Sync"
TranslateIngress = "Translate"
IPChanged = "IPChanged"
GarbageCollection = "GarbageCollection"

SyncService = "Sync"
)

type RecorderProducer interface {
Recorder(ns string) record.EventRecorder
}
Expand All @@ -30,3 +43,36 @@ type RecorderProducerMock struct {
func (r RecorderProducerMock) Recorder(ns string) record.EventRecorder {
return &record.FakeRecorder{}
}

// GloablEventf records a Cluster level event not attached to a given object.
func GlobalEventf(r record.EventRecorder, eventtype, reason, messageFmt string, args ...interface{}) {
// Using an empty ObjectReference to indicate no associated
// resource. This apparently works, see the package
// k8s.io/client-go/tools/record.
r.Eventf(&v1.ObjectReference{}, eventtype, reason, messageFmt, args...)
}

// TruncateStringList will render the list of items as a string,
// eliding elements with elipsis at the end if there are more than a
// reasonable number of characters in the resulting string. This is
// used to prevent accidentally dumping enormous strings into the
// Event description.
func TruncatedStringList(items []string) string {
const max = 1000

var (
ret = "["
first = true
)
for _, s := range items {
if !first {
ret += ", "
}
ret += s
if len(ret) > max {
ret += ", ..."
break
}
}
return ret + "]"
}
9 changes: 9 additions & 0 deletions pkg/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package events

import (
"testing"
)

func TestTruncatedStringList(t *testing.T) {
// TODO
}
35 changes: 26 additions & 9 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (
"net/http"
"time"

"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

"google.golang.org/api/compute/v1"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"k8s.io/ingress-gce/pkg/utils"
Expand All @@ -39,16 +42,22 @@ const (
type Instances struct {
cloud InstanceGroups
ZoneLister
namer namer.BackendNamer
namer namer.BackendNamer
recorder record.EventRecorder
}

type recorderSource interface {
Recorder(ns string) record.EventRecorder
}

// NewNodePool creates a new node pool.
// - cloud: implements InstanceGroups, used to sync Kubernetes nodes with
// members of the cloud InstanceGroup.
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer) NodePool {
func NewNodePool(cloud InstanceGroups, namer namer.BackendNamer, recorders recorderSource) NodePool {
return &Instances{
cloud: cloud,
namer: namer,
cloud: cloud,
namer: namer,
recorder: recorders.Recorder(""), // No namespace
}
}

Expand Down Expand Up @@ -250,7 +259,8 @@ func (i *Instances) splitNodesByZone(names []string) map[string][]string {

// Add adds the given instances to the appropriately zoned Instance Group.
func (i *Instances) Add(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.AddNodes, "Adding %s to InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Adding nodes %v to %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.AddInstancesToInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -260,12 +270,16 @@ func (i *Instances) Add(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("AddInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.AddNodes, "Error adding %s to InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Remove removes the given instances from the appropriately zoned Instance Group.
func (i *Instances) Remove(groupName string, names []string) error {
errs := []error{}
events.GlobalEventf(i.recorder, core.EventTypeNormal, events.RemoveNodes, "Removing %s from InstanceGroup %q", events.TruncatedStringList(names), groupName)
var errs []error
for zone, nodeNames := range i.splitNodesByZone(names) {
klog.V(1).Infof("Removing nodes %v from %v in zone %v", nodeNames, groupName, zone)
if err := i.cloud.RemoveInstancesFromInstanceGroup(groupName, zone, i.cloud.ToInstanceReferences(zone, nodeNames)); err != nil {
Expand All @@ -275,7 +289,10 @@ func (i *Instances) Remove(groupName string, names []string) error {
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)

err := fmt.Errorf("RemoveInstances: %v", errs)
events.GlobalEventf(i.recorder, core.EventTypeWarning, events.RemoveNodes, "Error removing nodes %s from InstanceGroup %q: %v", events.TruncatedStringList(names), groupName, err)
return err
}

// Sync nodes with the instances in the instance group.
Expand Down
7 changes: 7 additions & 0 deletions pkg/loadbalancers/forwarding_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
Expand Down Expand Up @@ -85,6 +87,7 @@ func (l *L7) checkForwardingRule(name, proxyLink, ip, portRange string) (fw *com
if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, version)); err != nil {
return nil, err
}
l.recorder.Eventf(l.runtimeInfo.Ingress, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", key.Name)
fw = nil
}
if fw == nil {
Expand Down Expand Up @@ -129,6 +132,8 @@ func (l *L7) checkForwardingRule(name, proxyLink, ip, portRange string) (fw *com
if err = composite.CreateForwardingRule(l.cloud, key, rule); err != nil {
return nil, err
}
l.recorder.Eventf(l.runtimeInfo.Ingress, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q created", key.Name)

key, err = l.CreateKey(name)
if err != nil {
return nil, err
Expand Down Expand Up @@ -305,12 +310,14 @@ func (l *L4) ensureForwardingRule(loadBalancerName, bsLink string, options gce.I
if err = utils.IgnoreHTTPNotFound(composite.DeleteForwardingRule(l.cloud, key, existingVersion)); err != nil {
return nil, err
}
l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q deleted", key.Name)
}
}
klog.V(2).Infof("ensureForwardingRule: Recreating forwarding rule - %s", fr.Name)
if err = composite.CreateForwardingRule(l.cloud, key, fr); err != nil {
return nil, err
}
l.recorder.Eventf(l.Service, corev1.EventTypeNormal, events.SyncIngress, "ForwardingRule %q created", key.Name)

return composite.GetForwardingRule(l.cloud, key, fr.Version)
}
Expand Down
Loading

0 comments on commit ea114ca

Please sign in to comment.