Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Syncer Skeleton #509

Merged
merged 1 commit into from
Nov 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions pkg/neg/syncers/syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
)

// syncer is a NEG syncer skeleton.
// It handles state transitions and backoff retry operations.
type syncer struct {
// metadata
NegSyncerKey
negName string

// NEG sync function
syncFunc func() error

// event recording
serviceLister cache.Indexer
recorder record.EventRecorder

// syncer states
stateLock sync.Mutex
stopped bool
shuttingDown bool

// sync signal and retry handling
syncCh chan interface{}
clock clock.Clock
backoff backoffHandler
}

func newSyncer(negSyncerKey NegSyncerKey, networkEndpointGroupName string, serviceLister cache.Indexer, recorder record.EventRecorder) *syncer {
return &syncer{
NegSyncerKey: negSyncerKey,
negName: networkEndpointGroupName,
syncFunc: func() error { return nil },
serviceLister: serviceLister,
recorder: recorder,
stopped: true,
shuttingDown: false,
clock: clock.RealClock{},
backoff: NewExponentialBackendOffHandler(maxRetries, minRetryDelay, maxRetryDelay),
}
}

func (s *syncer) Start() error {
if !s.IsStopped() {
return fmt.Errorf("NEG syncer for %s is already running.", s.NegSyncerKey.String())
}
if s.IsShuttingDown() {
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.NegSyncerKey.String())
}

glog.V(2).Infof("Starting NEG syncer for service port %s", s.NegSyncerKey.String())
s.init()
go func() {
for {
// equivalent to never retry
retryCh := make(<-chan time.Time)
err := s.syncFunc()
if err != nil {
delay, retryErr := s.backoff.NextRetryDelay()
retryMesg := ""
if retryErr == ErrRetriesExceeded {
retryMesg = "(will not retry)"
} else {
retryCh = s.clock.After(delay)
retryMesg = "(will retry)"
}

if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeWarning, "SyncNetworkEndpointGroupFailed", "Failed to sync NEG %q %s: %v", s.negName, retryMesg, err)
}
} else {
s.backoff.ResetRetryDelay()
}

select {
case _, open := <-s.syncCh:
if !open {
s.stateLock.Lock()
s.shuttingDown = false
s.stateLock.Unlock()
glog.V(2).Infof("Stopping NEG syncer for %s", s.NegSyncerKey.String())
return
}
case <-retryCh:
// continue to sync
}
}
}()
return nil
}

func (s *syncer) init() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
s.stopped = false
s.syncCh = make(chan interface{}, 1)
}

func (s *syncer) Stop() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
if !s.stopped {
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.NegSyncerKey.String())
s.stopped = true
s.shuttingDown = true
close(s.syncCh)
}
}

func (s *syncer) Sync() bool {
if s.IsStopped() {
glog.Warningf("NEG syncer for %s is already stopped.", s.NegSyncerKey.String())
return false
}
select {
case s.syncCh <- struct{}{}:
return true
default:
return false
}
}

func (s *syncer) IsStopped() bool {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.stopped
}

func (s *syncer) IsShuttingDown() bool {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.shuttingDown
}

func (s *syncer) SetSyncFunc(syncFunc func() error) {
s.syncFunc = syncFunc
}
166 changes: 166 additions & 0 deletions pkg/neg/syncers/syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package syncers

import (
"testing"
"time"

"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/utils"
)

type syncerTester struct {
syncer
// keep track of the number of syncs
syncCount int
// syncError is true, then sync function return error
syncError bool
// blockSync is true, then sync function is blocked on channel
blockSync bool
ch chan interface{}
}

// sync sleeps for 3 seconds
func (t *syncerTester) sync() error {
t.syncCount += 1
if t.syncError {
return fmt.Errorf("sync error")
}
if t.blockSync {
<-t.ch
}
return nil
}

func newTestNegSyncer() *syncerTester {
testNegName := "test-neg-name"
kubeClient := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
namer := utils.NewNamer(clusterID, "")
ctxConfig := context.ControllerContextConfig{
NEGEnabled: true,
BackendConfigEnabled: false,
Namespace: apiv1.NamespaceAll,
ResyncPeriod: 1 * time.Second,
DefaultBackendSvcPortID: defaultBackend,
}
context := context.NewControllerContext(kubeClient, backendConfigClient, nil, nil, namer, ctxConfig)
negSyncerKey := NegSyncerKey{
Namespace: testServiceNamespace,
Name: testServiceName,
Port: 80,
TargetPort: "80",
}

s := &syncerTester{
syncer: *newSyncer(
negSyncerKey,
testNegName,
context.ServiceInformer.GetIndexer(),
record.NewFakeRecorder(100),
),
syncCount: 0,
blockSync: false,
syncError: false,
ch: make(chan interface{}),
}
s.SetSyncFunc(s.sync)
return s
}

func TestStartAndStopNoopSyncer(t *testing.T) {
syncer := newTestNegSyncer()
if !syncer.IsStopped() {
t.Fatalf("Syncer is not stopped after creation.")
}
if syncer.IsShuttingDown() {
t.Fatalf("Syncer is shutting down after creation.")
}

if err := syncer.Start(); err != nil {
t.Fatalf("Failed to start syncer: %v", err)
}
if syncer.IsStopped() {
t.Fatalf("Syncer is stopped after Start.")
}
if syncer.IsShuttingDown() {
t.Fatalf("Syncer is shutting down after Start.")
}

// blocks sync function
syncer.blockSync = true
syncer.Stop()
if !syncer.IsShuttingDown() {
// assume syncer needs 5 second for sync
t.Fatalf("Syncer is not shutting down after Start.")
}

if !syncer.IsStopped() {
t.Fatalf("Syncer is not stopped after Stop.")
}

// unblock sync function
syncer.ch <- struct{}{}
if err := wait.PollImmediate(time.Second, 3*time.Second, func() (bool, error) {
return !syncer.IsShuttingDown() && syncer.IsStopped(), nil
}); err != nil {
t.Fatalf("Syncer failed to shutdown: %v", err)
}

if err := syncer.Start(); err != nil {
t.Fatalf("Failed to restart syncer: %v", err)
}
if syncer.IsStopped() {
t.Fatalf("Syncer is stopped after restart.")
}
if syncer.IsShuttingDown() {
t.Fatalf("Syncer is shutting down after restart.")
}

syncer.Stop()
if !syncer.IsStopped() {
t.Fatalf("Syncer is not stopped after Stop.")
}
}

func TestRetryOnSyncError(t *testing.T) {
maxRetry := 3
syncer := newTestNegSyncer()
syncer.syncError = true
if err := syncer.Start(); err != nil {
t.Fatalf("Failed to start syncer: %v", err)
}
syncer.backoff = NewExponentialBackendOffHandler(maxRetry, 0, 0)

if err := wait.PollImmediate(time.Second, 5*time.Second, func() (bool, error) {
// In 5 seconds, syncer should be able to retry 3 times.
return syncer.syncCount == maxRetry+1, nil
}); err != nil {
t.Errorf("Syncer failed to retry and record error: %v", err)
}

if syncer.syncCount != maxRetry+1 {
t.Errorf("Expect sync count to be %v, but got %v", maxRetry+1, syncer.syncCount)
}
}