diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 1d1c9990b4..72161eb7ec 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/utils" ) const ( @@ -54,6 +55,8 @@ type Controller struct { manager negSyncerManager resyncPeriod time.Duration recorder record.EventRecorder + namer networkEndpointGroupNamer + zoneGetter zoneGetter ingressSynced cache.InformerSynced serviceSynced cache.InformerSynced @@ -63,8 +66,9 @@ type Controller struct { // serviceQueue takes service key as work item. Service key with format "namespace/name". serviceQueue workqueue.RateLimitingInterface - zoneGetter zoneGetter - namer networkEndpointGroupNamer + + // syncTracker tracks the latest time that service and endpoint changes are processed + syncTracker utils.TimeTracker } // NewController returns a network endpoint group controller. @@ -96,14 +100,15 @@ func NewController( manager: manager, resyncPeriod: resyncPeriod, recorder: recorder, + zoneGetter: zoneGetter, + namer: namer, ingressSynced: ctx.IngressInformer.HasSynced, serviceSynced: ctx.ServiceInformer.HasSynced, endpointSynced: ctx.EndpointInformer.HasSynced, ingressLister: ctx.IngressInformer.GetIndexer(), serviceLister: ctx.ServiceInformer.GetIndexer(), serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - zoneGetter: zoneGetter, - namer: namer, + syncTracker: utils.NewTimeTracker(), } ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -133,6 +138,7 @@ func NewController( negController.processEndpoint(cur) }, }) + ctx.AddHealthCheck("neg-controller", negController.IsHealthy) return negController, nil } @@ -154,6 +160,16 @@ func (c *Controller) Run(stopCh <-chan struct{}) { <-stopCh } +func (c *Controller) IsHealthy() error { + // check if last seen service and endpoint processing is more than an hour ago + if c.syncTracker.Get().Before(time.Now().Add(-time.Hour)) { + msg := fmt.Sprintf("NEG controller has not proccessed any service and endpoint updates for more than an hour. Something went wrong. Last sync was on %v", c.syncTracker.Get()) + glog.Error(msg) + return fmt.Errorf(msg) + } + return nil +} + func (c *Controller) stop() { glog.V(2).Infof("Shutting down network endpoint group controller") c.serviceQueue.ShutDown() @@ -162,7 +178,11 @@ func (c *Controller) stop() { // processEndpoint finds the related syncers and signal it to sync func (c *Controller) processEndpoint(obj interface{}) { - defer lastSyncTimestamp.WithLabelValues().Set(float64(time.Now().UTC().UnixNano())) + defer func() { + now := c.syncTracker.Track() + lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) + }() + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { glog.Errorf("Failed to generate endpoint key: %v", err) @@ -191,7 +211,11 @@ func (c *Controller) serviceWorker() { // processService takes a service and determines whether it needs NEGs or not. func (c *Controller) processService(key string) error { - defer lastSyncTimestamp.WithLabelValues().Set(float64(time.Now().UTC().UnixNano())) + defer func() { + now := c.syncTracker.Track() + lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) + }() + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 5d3fe990da..f5e0af4fd9 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -50,6 +50,29 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { return controller } +func TestIsHealthy(t *testing.T) { + controller := newTestController(fake.NewSimpleClientset()) + defer controller.stop() + + err := controller.IsHealthy() + if err != nil { + t.Errorf("Expect controller to be healthy initially: %v", err) + } + + timestamp := time.Now().Add(-61 * time.Minute) + controller.syncTracker.Set(timestamp) + err = controller.IsHealthy() + if err == nil { + t.Errorf("Expect controller to NOT be healthy") + } + + controller.syncTracker.Track() + err = controller.IsHealthy() + if err != nil { + t.Errorf("Expect controller to be healthy: %v", err) + } +} + func TestNewNonNEGService(t *testing.T) { t.Parallel() diff --git a/pkg/utils/timetracker.go b/pkg/utils/timetracker.go new file mode 100644 index 0000000000..c13b42ba45 --- /dev/null +++ b/pkg/utils/timetracker.go @@ -0,0 +1,56 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "sync" + "time" +) + +type TimeTracker struct { + lock sync.Mutex + timestamp time.Time +} + +// Track records the current time and returns it +func (t *TimeTracker) Track() time.Time { + t.lock.Lock() + defer t.lock.Unlock() + t.timestamp = time.Now() + return t.timestamp +} + +// Get returns previous recorded time +func (t *TimeTracker) Get() time.Time { + t.lock.Lock() + defer t.lock.Unlock() + return t.timestamp +} + +// Set records input timestamp +func (t *TimeTracker) Set(timestamp time.Time) { + t.lock.Lock() + defer t.lock.Unlock() + t.timestamp = timestamp + return +} + +func NewTimeTracker() TimeTracker { + return TimeTracker{ + timestamp: time.Now(), + } +} diff --git a/pkg/utils/timetracker_test.go b/pkg/utils/timetracker_test.go new file mode 100644 index 0000000000..9a18003daa --- /dev/null +++ b/pkg/utils/timetracker_test.go @@ -0,0 +1,41 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "testing" + "time" +) + +func TestTimeTracker(t *testing.T) { + tt := NewTimeTracker() + trials := 3 + for i := 0; i < trials; i++ { + timestamp := tt.Track() + result := tt.Get() + if timestamp != result { + t.Errorf("In trial %d, expect %v == %v", i, timestamp, result) + } + now := time.Now() + tt.Set(now) + result = tt.Get() + if now != result { + t.Errorf("In trial %d, expect %v == %v", i, now, result) + } + } + +}