Skip to content

Commit

Permalink
add liveness probe for NEG controller
Browse files Browse the repository at this point in the history
  • Loading branch information
freehan committed Jun 20, 2018
1 parent 6fb6717 commit 753ae9c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
36 changes: 30 additions & 6 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"
)

const (
Expand All @@ -54,6 +55,8 @@ type Controller struct {
manager negSyncerManager
resyncPeriod time.Duration
recorder record.EventRecorder
namer networkEndpointGroupNamer
zoneGetter zoneGetter

ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
Expand All @@ -63,8 +66,9 @@ type Controller struct {

// serviceQueue takes service key as work item. Service key with format "namespace/name".
serviceQueue workqueue.RateLimitingInterface
zoneGetter zoneGetter
namer networkEndpointGroupNamer

// syncTracker tracks the latest time that service and endpoint changes are processed
syncTracker utils.TimeTracker
}

// NewController returns a network endpoint group controller.
Expand Down Expand Up @@ -96,14 +100,15 @@ func NewController(
manager: manager,
resyncPeriod: resyncPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
ingressSynced: ctx.IngressInformer.HasSynced,
serviceSynced: ctx.ServiceInformer.HasSynced,
endpointSynced: ctx.EndpointInformer.HasSynced,
ingressLister: ctx.IngressInformer.GetIndexer(),
serviceLister: ctx.ServiceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
zoneGetter: zoneGetter,
namer: namer,
syncTracker: utils.NewTimeTracker(),
}

ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -133,6 +138,7 @@ func NewController(
negController.processEndpoint(cur)
},
})
ctx.AddHealthCheck("neg-controller", negController.IsHealthy)
return negController, nil
}

Expand All @@ -154,6 +160,16 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
<-stopCh
}

func (c *Controller) IsHealthy() error {
// check if last seen service and endpoint processing is more than an hour ago
if c.syncTracker.Get().Before(time.Now().Add(-time.Hour)) {
msg := fmt.Sprintf("NEG controller has not proccessed any service and endpoint updates for more than an hour. Something went wrong. Last sync was on %v", c.syncTracker.Get())
glog.Error(msg)
return fmt.Errorf(msg)
}
return nil
}

func (c *Controller) stop() {
glog.V(2).Infof("Shutting down network endpoint group controller")
c.serviceQueue.ShutDown()
Expand All @@ -162,7 +178,11 @@ func (c *Controller) stop() {

// processEndpoint finds the related syncers and signal it to sync
func (c *Controller) processEndpoint(obj interface{}) {
defer lastSyncTimestamp.WithLabelValues().Set(float64(time.Now().UTC().UnixNano()))
defer func() {
now := c.syncTracker.Track()
lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
}()

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Failed to generate endpoint key: %v", err)
Expand Down Expand Up @@ -191,7 +211,11 @@ func (c *Controller) serviceWorker() {

// processService takes a service and determines whether it needs NEGs or not.
func (c *Controller) processService(key string) error {
defer lastSyncTimestamp.WithLabelValues().Set(float64(time.Now().UTC().UnixNano()))
defer func() {
now := c.syncTracker.Track()
lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
Expand Down
23 changes: 23 additions & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@ func newTestController(kubeClient kubernetes.Interface) *Controller {
return controller
}

func TestIsHealthy(t *testing.T) {
controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()

err := controller.IsHealthy()
if err != nil {
t.Errorf("Expect controller to be healthy initially: %v", err)
}

timestamp := time.Now().Add(-61 * time.Minute)
controller.syncTracker.Set(timestamp)
err = controller.IsHealthy()
if err == nil {
t.Errorf("Expect controller to NOT be healthy")
}

controller.syncTracker.Track()
err = controller.IsHealthy()
if err != nil {
t.Errorf("Expect controller to be healthy: %v", err)
}
}

func TestNewNonNEGService(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 753ae9c

Please sign in to comment.