Skip to content

Commit

Permalink
Merge pull request #349 from freehan/neg-liveness
Browse files Browse the repository at this point in the history
Add Liveness Probe for NEG controller
  • Loading branch information
freehan authored Jun 20, 2018
2 parents 3722e74 + 753ae9c commit 0c87755
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 6 deletions.
36 changes: 30 additions & 6 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"
)

const (
Expand All @@ -54,6 +55,8 @@ type Controller struct {
manager negSyncerManager
resyncPeriod time.Duration
recorder record.EventRecorder
namer networkEndpointGroupNamer
zoneGetter zoneGetter

ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
Expand All @@ -63,8 +66,9 @@ type Controller struct {

// serviceQueue takes service key as work item. Service key with format "namespace/name".
serviceQueue workqueue.RateLimitingInterface
zoneGetter zoneGetter
namer networkEndpointGroupNamer

// syncTracker tracks the latest time that service and endpoint changes are processed
syncTracker utils.TimeTracker
}

// NewController returns a network endpoint group controller.
Expand Down Expand Up @@ -96,14 +100,15 @@ func NewController(
manager: manager,
resyncPeriod: resyncPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
ingressSynced: ctx.IngressInformer.HasSynced,
serviceSynced: ctx.ServiceInformer.HasSynced,
endpointSynced: ctx.EndpointInformer.HasSynced,
ingressLister: ctx.IngressInformer.GetIndexer(),
serviceLister: ctx.ServiceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
zoneGetter: zoneGetter,
namer: namer,
syncTracker: utils.NewTimeTracker(),
}

ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -133,6 +138,7 @@ func NewController(
negController.processEndpoint(cur)
},
})
ctx.AddHealthCheck("neg-controller", negController.IsHealthy)
return negController, nil
}

Expand All @@ -154,6 +160,16 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
<-stopCh
}

func (c *Controller) IsHealthy() error {
// check if last seen service and endpoint processing is more than an hour ago
if c.syncTracker.Get().Before(time.Now().Add(-time.Hour)) {
msg := fmt.Sprintf("NEG controller has not proccessed any service and endpoint updates for more than an hour. Something went wrong. Last sync was on %v", c.syncTracker.Get())
glog.Error(msg)
return fmt.Errorf(msg)
}
return nil
}

func (c *Controller) stop() {
glog.V(2).Infof("Shutting down network endpoint group controller")
c.serviceQueue.ShutDown()
Expand All @@ -162,7 +178,11 @@ func (c *Controller) stop() {

// processEndpoint finds the related syncers and signal it to sync
func (c *Controller) processEndpoint(obj interface{}) {
defer lastSyncTimestamp.WithLabelValues().Set(float64(time.Now().UTC().UnixNano()))
defer func() {
now := c.syncTracker.Track()
lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
}()

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Failed to generate endpoint key: %v", err)
Expand Down Expand Up @@ -191,7 +211,11 @@ func (c *Controller) serviceWorker() {

// processService takes a service and determines whether it needs NEGs or not.
func (c *Controller) processService(key string) error {
defer lastSyncTimestamp.WithLabelValues().Set(float64(time.Now().UTC().UnixNano()))
defer func() {
now := c.syncTracker.Track()
lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
Expand Down
23 changes: 23 additions & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@ func newTestController(kubeClient kubernetes.Interface) *Controller {
return controller
}

func TestIsHealthy(t *testing.T) {
controller := newTestController(fake.NewSimpleClientset())
defer controller.stop()

err := controller.IsHealthy()
if err != nil {
t.Errorf("Expect controller to be healthy initially: %v", err)
}

timestamp := time.Now().Add(-61 * time.Minute)
controller.syncTracker.Set(timestamp)
err = controller.IsHealthy()
if err == nil {
t.Errorf("Expect controller to NOT be healthy")
}

controller.syncTracker.Track()
err = controller.IsHealthy()
if err != nil {
t.Errorf("Expect controller to be healthy: %v", err)
}
}

func TestNewNonNEGService(t *testing.T) {
t.Parallel()

Expand Down
56 changes: 56 additions & 0 deletions pkg/utils/timetracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"sync"
"time"
)

type TimeTracker struct {
lock sync.Mutex
timestamp time.Time
}

// Track records the current time and returns it
func (t *TimeTracker) Track() time.Time {
t.lock.Lock()
defer t.lock.Unlock()
t.timestamp = time.Now()
return t.timestamp
}

// Get returns previous recorded time
func (t *TimeTracker) Get() time.Time {
t.lock.Lock()
defer t.lock.Unlock()
return t.timestamp
}

// Set records input timestamp
func (t *TimeTracker) Set(timestamp time.Time) {
t.lock.Lock()
defer t.lock.Unlock()
t.timestamp = timestamp
return
}

func NewTimeTracker() TimeTracker {
return TimeTracker{
timestamp: time.Now(),
}
}
41 changes: 41 additions & 0 deletions pkg/utils/timetracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"testing"
"time"
)

func TestTimeTracker(t *testing.T) {
tt := NewTimeTracker()
trials := 3
for i := 0; i < trials; i++ {
timestamp := tt.Track()
result := tt.Get()
if timestamp != result {
t.Errorf("In trial %d, expect %v == %v", i, timestamp, result)
}
now := time.Now()
tt.Set(now)
result = tt.Get()
if now != result {
t.Errorf("In trial %d, expect %v == %v", i, now, result)
}
}

}

0 comments on commit 0c87755

Please sign in to comment.