Skip to content

Commit

Permalink
Merge pull request #509 from freehan/noop
Browse files Browse the repository at this point in the history
Add Syncer Skeleton
  • Loading branch information
k8s-ci-robot authored Nov 14, 2018
2 parents a101b02 + 46e0e08 commit c17650e
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 0 deletions.
164 changes: 164 additions & 0 deletions pkg/neg/syncers/syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)

// syncer is a NEG syncer skeleton.
// It handles state transitions and backoff retry operations.
type syncer struct {
// metadata
NegSyncerKey
negName string

// NEG sync function
syncFunc func() error

// event recording
serviceLister cache.Indexer
recorder record.EventRecorder

// syncer states
stateLock sync.Mutex
stopped bool
shuttingDown bool

// sync signal and retry handling
syncCh chan interface{}
clock clock.Clock
backoff backoffHandler
}

func newSyncer(negSyncerKey NegSyncerKey, networkEndpointGroupName string, serviceLister cache.Indexer, recorder record.EventRecorder) *syncer {
return &syncer{
NegSyncerKey: negSyncerKey,
negName: networkEndpointGroupName,
syncFunc: func() error { return nil },
serviceLister: serviceLister,
recorder: recorder,
stopped: true,
shuttingDown: false,
clock: clock.RealClock{},
backoff: NewExponentialBackendOffHandler(maxRetries, minRetryDelay, maxRetryDelay),
}
}

func (s *syncer) Start() error {
if !s.IsStopped() {
return fmt.Errorf("NEG syncer for %s is already running.", s.NegSyncerKey.String())
}
if s.IsShuttingDown() {
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.NegSyncerKey.String())
}

glog.V(2).Infof("Starting NEG syncer for service port %s", s.NegSyncerKey.String())
s.init()
go func() {
for {
// equivalent to never retry
retryCh := make(<-chan time.Time)
err := s.syncFunc()
if err != nil {
delay, retryErr := s.backoff.NextRetryDelay()
retryMesg := ""
if retryErr == ErrRetriesExceeded {
retryMesg = "(will not retry)"
} else {
retryCh = s.clock.After(delay)
retryMesg = "(will retry)"
}

if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeWarning, "SyncNetworkEndpointGroupFailed", "Failed to sync NEG %q %s: %v", s.negName, retryMesg, err)
}
} else {
s.backoff.ResetRetryDelay()
}

select {
case _, open := <-s.syncCh:
if !open {
s.stateLock.Lock()
s.shuttingDown = false
s.stateLock.Unlock()
glog.V(2).Infof("Stopping NEG syncer for %s", s.NegSyncerKey.String())
return
}
case <-retryCh:
// continue to sync
}
}
}()
return nil
}

func (s *syncer) init() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
s.stopped = false
s.syncCh = make(chan interface{}, 1)
}

func (s *syncer) Stop() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
if !s.stopped {
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.NegSyncerKey.String())
s.stopped = true
s.shuttingDown = true
close(s.syncCh)
}
}

func (s *syncer) Sync() bool {
if s.IsStopped() {
glog.Warningf("NEG syncer for %s is already stopped.", s.NegSyncerKey.String())
return false
}
select {
case s.syncCh <- struct{}{}:
return true
default:
return false
}
}

func (s *syncer) IsStopped() bool {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.stopped
}

func (s *syncer) IsShuttingDown() bool {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.shuttingDown
}

func (s *syncer) SetSyncFunc(syncFunc func() error) {
s.syncFunc = syncFunc
}
166 changes: 166 additions & 0 deletions pkg/neg/syncers/syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers

import (
"testing"
"time"

"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"
)

type syncerTester struct {
syncer
// keep track of the number of syncs
syncCount int
// syncError is true, then sync function return error
syncError bool
// blockSync is true, then sync function is blocked on channel
blockSync bool
ch chan interface{}
}

// sync sleeps for 3 seconds
func (t *syncerTester) sync() error {
t.syncCount += 1
if t.syncError {
return fmt.Errorf("sync error")
}
if t.blockSync {
<-t.ch
}
return nil
}

func newTestNegSyncer() *syncerTester {
testNegName := "test-neg-name"
kubeClient := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
namer := utils.NewNamer(clusterID, "")
ctxConfig := context.ControllerContextConfig{
NEGEnabled: true,
BackendConfigEnabled: false,
Namespace: apiv1.NamespaceAll,
ResyncPeriod: 1 * time.Second,
DefaultBackendSvcPortID: defaultBackend,
}
context := context.NewControllerContext(kubeClient, backendConfigClient, nil, nil, namer, ctxConfig)
negSyncerKey := NegSyncerKey{
Namespace: testServiceNamespace,
Name: testServiceName,
Port: 80,
TargetPort: "80",
}

s := &syncerTester{
syncer: *newSyncer(
negSyncerKey,
testNegName,
context.ServiceInformer.GetIndexer(),
record.NewFakeRecorder(100),
),
syncCount: 0,
blockSync: false,
syncError: false,
ch: make(chan interface{}),
}
s.SetSyncFunc(s.sync)
return s
}

func TestStartAndStopNoopSyncer(t *testing.T) {
syncer := newTestNegSyncer()
if !syncer.IsStopped() {
t.Fatalf("Syncer is not stopped after creation.")
}
if syncer.IsShuttingDown() {
t.Fatalf("Syncer is shutting down after creation.")
}

if err := syncer.Start(); err != nil {
t.Fatalf("Failed to start syncer: %v", err)
}
if syncer.IsStopped() {
t.Fatalf("Syncer is stopped after Start.")
}
if syncer.IsShuttingDown() {
t.Fatalf("Syncer is shutting down after Start.")
}

// blocks sync function
syncer.blockSync = true
syncer.Stop()
if !syncer.IsShuttingDown() {
// assume syncer needs 5 second for sync
t.Fatalf("Syncer is not shutting down after Start.")
}

if !syncer.IsStopped() {
t.Fatalf("Syncer is not stopped after Stop.")
}

// unblock sync function
syncer.ch <- struct{}{}
if err := wait.PollImmediate(time.Second, 3*time.Second, func() (bool, error) {
return !syncer.IsShuttingDown() && syncer.IsStopped(), nil
}); err != nil {
t.Fatalf("Syncer failed to shutdown: %v", err)
}

if err := syncer.Start(); err != nil {
t.Fatalf("Failed to restart syncer: %v", err)
}
if syncer.IsStopped() {
t.Fatalf("Syncer is stopped after restart.")
}
if syncer.IsShuttingDown() {
t.Fatalf("Syncer is shutting down after restart.")
}

syncer.Stop()
if !syncer.IsStopped() {
t.Fatalf("Syncer is not stopped after Stop.")
}
}

func TestRetryOnSyncError(t *testing.T) {
maxRetry := 3
syncer := newTestNegSyncer()
syncer.syncError = true
if err := syncer.Start(); err != nil {
t.Fatalf("Failed to start syncer: %v", err)
}
syncer.backoff = NewExponentialBackendOffHandler(maxRetry, 0, 0)

if err := wait.PollImmediate(time.Second, 5*time.Second, func() (bool, error) {
// In 5 seconds, syncer should be able to retry 3 times.
return syncer.syncCount == maxRetry+1, nil
}); err != nil {
t.Errorf("Syncer failed to retry and record error: %v", err)
}

if syncer.syncCount != maxRetry+1 {
t.Errorf("Expect sync count to be %v, but got %v", maxRetry+1, syncer.syncCount)
}
}

0 comments on commit c17650e

Please sign in to comment.