Skip to content

Commit

Permalink
Make synced callback and named wait group private 🕵️
Browse files Browse the repository at this point in the history
These utility objects don't really make sense to expose as part of the
informed watcher package and are only used by the informed watcher.
Writing tests for unexported code makes me a bit :( but hopefully these
will get moved to some other package one day. And there's really no
reason to expose these to users of knative/pkg at the moment.
  • Loading branch information
bobcatfish committed Mar 2, 2021
1 parent a70d600 commit 572b926
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 223 deletions.
94 changes: 1 addition & 93 deletions configmap/informer/informed_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@ package informer
import (
"errors"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/util/wait"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers/internalinterfaces"
Expand Down Expand Up @@ -156,7 +152,7 @@ func (i *InformedWatcher) getConfigMapNames() []string {
func (i *InformedWatcher) Start(stopCh <-chan struct{}) error {
// using the synced callback wrapper around the add event handler will allow the caller
// to wait for the add event to be processed for all configmaps
s := NewSyncedCallback(i.getConfigMapNames(), i.addConfigMapEvent)
s := newSyncedCallback(i.getConfigMapNames(), i.addConfigMapEvent)
addConfigMapEvent := func(obj interface{}) {
configMap := obj.(*corev1.ConfigMap)
s.Call(obj, configMap.Name)
Expand Down Expand Up @@ -241,91 +237,3 @@ func (i *InformedWatcher) deleteConfigMapEvent(obj interface{}) {
}
// If there is no default value, then don't do anything.
}

// NamedWaitGroup is used to increment and decrement a WaitGroup by name
type NamedWaitGroup struct {
waitGroup sync.WaitGroup
keys sets.String
mu sync.Mutex
}

// NewNamedWaitGroup returns an instantiated NamedWaitGroup.
func NewNamedWaitGroup() *NamedWaitGroup {
return &NamedWaitGroup{
keys: sets.NewString(),
}
}

// Add will add the key to the list of keys being tracked and increment the wait group.
// If the key has already been added, the wait group will not be incremented again.
func (n *NamedWaitGroup) Add(key string) {
n.mu.Lock()
defer n.mu.Unlock()

if !n.keys.Has(key) {
n.keys.Insert(key)
n.waitGroup.Add(1)
}
}

// Done will decrement the counter if the key is present in the tracked keys. If it is not present
// it will be ignored.
func (n *NamedWaitGroup) Done(key string) {
n.mu.Lock()
defer n.mu.Unlock()

if n.keys.Has(key) {
n.keys.Delete(key)
n.waitGroup.Done()
}
}

// Wait will wait for the underlying waitGroup to complete.
func (n *NamedWaitGroup) Wait() {
n.waitGroup.Wait()
}

// SyncedCallback can be used to wait for a callback to be called at least once for a list of keys.
type SyncedCallback struct {
// namedWaitGroup will block until the callback has been called for all tracked entities
namedWaitGroup *NamedWaitGroup

// callback is the callback that is intended to be called at least once for each key
// being tracked via WaitGroup
callback func(obj interface{})
}

// NewSyncedCallback will return a SyncedCallback that will track the provided keys.
func NewSyncedCallback(keys []string, callback func(obj interface{})) *SyncedCallback {
s := &SyncedCallback{
callback: callback,
namedWaitGroup: NewNamedWaitGroup(),
}
for _, key := range keys {
s.namedWaitGroup.Add(key)
}
return s
}

// Event is intended to be a wrapper for the actual event handler; this wrapper will signal via
// the wait group that the event handler has been called at least once for the key.
func (s *SyncedCallback) Call(obj interface{}, key string) {
s.callback(obj)
s.namedWaitGroup.Done(key)
}

// WaitForAllKeys will block until s.Call has been called for all the keys we are tracking or the stop signal is
// received.
func (s *SyncedCallback) WaitForAllKeys(stopCh <-chan struct{}) error {
c := make(chan struct{})
go func() {
defer close(c)
s.namedWaitGroup.Wait()
}()
select {
case <-c:
return nil
case <-stopCh:
return wait.ErrWaitTimeout
}
}
130 changes: 0 additions & 130 deletions configmap/informer/informed_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"sync"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -488,132 +487,3 @@ func TestWatchWithDefaultAfterStart(t *testing.T) {
t.Fatalf("foo1.count = %v, want %d", got, want)
}
}

func TestNamedWaitGroup(t *testing.T) {
nwg := NewNamedWaitGroup()

// nothing has been added so wait returns immediately
initiallyDone := make(chan struct{})
go func() {
defer close(initiallyDone)
nwg.Wait()
}()
select {
case <-time.After(1 * time.Second):
t.Fatalf("Wait should have returned immediately but still hadn't after timeout elapsed")
case <-initiallyDone:
// the Wait returned as expected since nothing was tracked
}

// Add some keys to track
nwg.Add("foo")
nwg.Add("bar")
// Adding keys multiple times shouldn't increment the counter again
nwg.Add("bar")

// Now that we've added keys, when we Wait, it should block
done := make(chan struct{})
go func() {
defer close(done)
nwg.Wait()
}()

// Indicate that this key is done
nwg.Done("foo")
// Indicating done on a key that doesn't exist should do nothing
nwg.Done("doesnt exist")

// Only one of the tracked keys has completed, so the channel should not yet have closed
select {
case <-done:
t.Fatalf("Wait returned before all keys were done")
default:
// as expected, the channel is still open (waiting for the final key to be done)
}

// Indicate the final key is done
nwg.Done("bar")

// Now that all keys are done, the Wait should return
select {
case <-time.After(1 * time.Second):
t.Fatalf("Wait should have returned immediately but still hadn't after timeout elapsed")
case <-done:
// completed successfully
}
}

func TestSyncedCallback(t *testing.T) {
keys := []string{"foo", "bar"}
objs := []interface{}{"fooobj", "barobj"}
var seen []interface{}
callback := func(obj interface{}) {
seen = append(seen, obj)
}
sc := NewSyncedCallback(keys, callback)

// Wait for the callback to be called for all of the keys
stopCh := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(done)
sc.WaitForAllKeys(stopCh)
}()

// Call the callback for one of the keys
sc.Call(objs[0], "foo")

// Only one of the tracked keys has been synced so we should still be waiting
select {
case <-done:
t.Fatalf("Wait returned before all keys were done")
default:
// as expected, the channel is still open (waiting for the final key to be done)
}

// Call the callback for the other key
sc.Call(objs[1], "bar")

// Now that all keys are done, the Wait should return
select {
case <-time.After(1 * time.Second):
t.Fatalf("WaitForAllKeys should have returned but still hadn't after timeout elapsed")
case <-done:
// completed successfully
}

if len(seen) != 2 || seen[0] != objs[0] || seen[1] != objs[1] {
t.Errorf("callback wasn't called as expected, expected to see %v but saw %v", objs, seen)
}
}

func TestSyncedCallbackStops(t *testing.T) {
sc := NewSyncedCallback([]string{"somekey"}, func(obj interface{}) {})

// Wait for the callback to be called - which it won't be!
stopCh := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(done)
sc.WaitForAllKeys(stopCh)
}()

// Nothing has been synced so we should still be waiting
select {
case <-done:
t.Fatalf("Wait returned before all keys were done")
default:
// as expected, the channel is still open
}

// signal to stop via the stop channel
close(stopCh)

// Even though the callback wasn't called, the Wait should return b/c of the stop channel
select {
case <-time.After(1 * time.Second):
t.Fatalf("WaitForAllKeys should have returned because of the stop channel but still hadn't after timeout elapsed")
case <-done:
// stopped successfully
}
}
112 changes: 112 additions & 0 deletions configmap/informer/synced_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package informer

import (
"sync"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
)

// namedWaitGroup is used to increment and decrement a WaitGroup by name
type namedWaitGroup struct {
waitGroup sync.WaitGroup
keys sets.String
mu sync.Mutex
}

// newNamedWaitGroup returns an instantiated namedWaitGroup.
func newNamedWaitGroup() *namedWaitGroup {
return &namedWaitGroup{
keys: sets.NewString(),
}
}

// Add will add the key to the list of keys being tracked and increment the wait group.
// If the key has already been added, the wait group will not be incremented again.
func (n *namedWaitGroup) Add(key string) {
n.mu.Lock()
defer n.mu.Unlock()

if !n.keys.Has(key) {
n.keys.Insert(key)
n.waitGroup.Add(1)
}
}

// Done will decrement the counter if the key is present in the tracked keys. If it is not present
// it will be ignored.
func (n *namedWaitGroup) Done(key string) {
n.mu.Lock()
defer n.mu.Unlock()

if n.keys.Has(key) {
n.keys.Delete(key)
n.waitGroup.Done()
}
}

// Wait will wait for the underlying waitGroup to complete.
func (n *namedWaitGroup) Wait() {
n.waitGroup.Wait()
}

// syncedCallback can be used to wait for a callback to be called at least once for a list of keys.
type syncedCallback struct {
// namedWaitGroup will block until the callback has been called for all tracked entities
namedWaitGroup *namedWaitGroup

// callback is the callback that is intended to be called at least once for each key
// being tracked via WaitGroup
callback func(obj interface{})
}

// newSyncedCallback will return a syncedCallback that will track the provided keys.
func newSyncedCallback(keys []string, callback func(obj interface{})) *syncedCallback {
s := &syncedCallback{
callback: callback,
namedWaitGroup: newNamedWaitGroup(),
}
for _, key := range keys {
s.namedWaitGroup.Add(key)
}
return s
}

// Event is intended to be a wrapper for the actual event handler; this wrapper will signal via
// the wait group that the event handler has been called at least once for the key.
func (s *syncedCallback) Call(obj interface{}, key string) {
s.callback(obj)
s.namedWaitGroup.Done(key)
}

// WaitForAllKeys will block until s.Call has been called for all the keys we are tracking or the stop signal is
// received.
func (s *syncedCallback) WaitForAllKeys(stopCh <-chan struct{}) error {
c := make(chan struct{})
go func() {
defer close(c)
s.namedWaitGroup.Wait()
}()
select {
case <-c:
return nil
case <-stopCh:
return wait.ErrWaitTimeout
}
}
Loading

0 comments on commit 572b926

Please sign in to comment.