Skip to content

Commit

Permalink
Fix race: Make informed watcher start wait for Add event 🏎️
Browse files Browse the repository at this point in the history
When using the informed watcher to watch a config map, previously add
events were being processed in a goroutine with no syncrhonization
making it so that code may try to access the values backed by the
configmaps before they are initialized.

This commit makes it so that the Start method of the informer will wait
for the add event to occur at least once for all config maps it is
watching.

This commit also undoes the workaround added in knative#1929 which was working
around the race condition identified in knative#1907 (and in
tektoncd/pipeline#3720). This means that if
the synchronization was removed, the impacted test would start flaking
again. If we wanted it to reliably fail in that case, we could introduce
a sleep in the callback but that doesn't seem worth it.

I also tested this change by manually patching the changes into my
clone of tektoncd/pipeline and following the repro steps at
tektoncd/pipeline#2815 (comment)
Before the change I can reproduce the issue, and after the change, I
can't! :D

Fixes knative#1960
  • Loading branch information
bobcatfish committed Feb 23, 2021
1 parent 86a8236 commit 24377c9
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 39 deletions.
145 changes: 134 additions & 11 deletions configmap/informer/informed_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package informer
import (
"errors"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/informers/internalinterfaces"
Expand Down Expand Up @@ -95,6 +98,10 @@ type InformedWatcher struct {
// defaults are the default ConfigMaps to use if the real ones do not exist or are deleted.
defaults map[string]*corev1.ConfigMap

// syncedCallback is used to ensure that the add callback is called at least once
// per tracked configMap on Start
syncedCallback *SyncedCallback

// Embedding this struct allows us to reuse the logic
// of registering and notifying observers. This simplifies the
// InformedWatcher to just setting up the Kubernetes informer.
Expand Down Expand Up @@ -126,31 +133,56 @@ func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o ...configmap.O
i.Watch(cm.Name, o...)
}

// Start implements Watcher.
func (i *InformedWatcher) Start(stopCh <-chan struct{}) error {
// Pretend that all the defaulted ConfigMaps were just created. This is done before we start
// the informer to ensure that if a defaulted ConfigMap does exist, then the real value is
// processed after the default one.
func (i *InformedWatcher) triggerAddEventForDefaultedConfigMaps(addConfigMapEvent func(obj interface{})) {
i.ForEach(func(k string, _ []configmap.Observer) error {
if def, ok := i.defaults[k]; ok {
i.addConfigMapEvent(def)
addConfigMapEvent(def)
}
return nil
})
}

func (i *InformedWatcher) getConfigMapNames() []string {
var configMaps []string
i.ForEach(func(k string, _ []configmap.Observer) error {
configMaps = append(configMaps, k)
return nil
})
return configMaps
}

// Start implements Watcher.
func (i *InformedWatcher) Start(stopCh <-chan struct{}) error {
// using the synced callback wrapper around the add event handler will allow the caller
// to wait for the add event to be processed for all configmaps
s := NewSyncedCallback(i.getConfigMapNames(), i.addConfigMapEvent)
addConfigMapEvent := func(obj interface{}) {
configMap := obj.(*corev1.ConfigMap)
s.Call(obj, configMap.Name)
}
// Pretend that all the defaulted ConfigMaps were just created. This is done before we start
// the informer to ensure that if a defaulted ConfigMap does exist, then the real value is
// processed after the default one.
i.triggerAddEventForDefaultedConfigMaps(addConfigMapEvent)

if err := i.registerCallbackAndStartInformer(stopCh); err != nil {
if err := i.registerCallbackAndStartInformer(addConfigMapEvent, stopCh); err != nil {
return err
}

// Wait until it has been synced (WITHOUT holing the mutex, so callbacks happen)
// Wait until the shared informer has been synced (WITHOUT holing the mutex, so callbacks happen)
if ok := cache.WaitForCacheSync(stopCh, i.informer.Informer().HasSynced); !ok {
return errors.New("error waiting for ConfigMap informer to sync")
}

return i.checkObservedResourcesExist()
if err := i.checkObservedResourcesExist(); err != nil {
return err
}

// Wait until all config maps have been at least initially processed
return s.WaitForAllKeys(stopCh)
}

func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{}) error {
func (i *InformedWatcher) registerCallbackAndStartInformer(addConfigMapEvent func(obj interface{}), stopCh <-chan struct{}) error {
i.Lock()
defer i.Unlock()
if i.started {
Expand All @@ -159,13 +191,14 @@ func (i *InformedWatcher) registerCallbackAndStartInformer(stopCh <-chan struct{
i.started = true

i.informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: i.addConfigMapEvent,
AddFunc: addConfigMapEvent,
UpdateFunc: i.updateConfigMapEvent,
DeleteFunc: i.deleteConfigMapEvent,
})

// Start the shared informer factory (non-blocking).
i.sif.Start(stopCh)

return nil
}

Expand All @@ -187,6 +220,8 @@ func (i *InformedWatcher) checkObservedResourcesExist() error {

func (i *InformedWatcher) addConfigMapEvent(obj interface{}) {
configMap := obj.(*corev1.ConfigMap)
//fmt.Println("Add config map Event", configMap.Name)
//time.Sleep(100 * time.Millisecond)
i.OnChange(configMap)
}

Expand All @@ -207,3 +242,91 @@ func (i *InformedWatcher) deleteConfigMapEvent(obj interface{}) {
}
// If there is no default value, then don't do anything.
}

// NamedWaitGroup is used to increment and decrement a WaitGroup by name
type NamedWaitGroup struct {
waitGroup sync.WaitGroup
keys sets.String
mu sync.Mutex
}

// NewNamedWaitGroup returns an instantiated NamedWaitGroup.
func NewNamedWaitGroup() *NamedWaitGroup {
return &NamedWaitGroup{
keys: sets.NewString(),
}
}

// Add will add the key to the list of keys being tracked and increment the wait group.
// If the key has already been added, the wait group will not be incremented again.
func (n *NamedWaitGroup) Add(key string) {
n.mu.Lock()
defer n.mu.Unlock()

if !n.keys.Has(key) {
n.keys.Insert(key)
n.waitGroup.Add(1)
}
}

// Done will decrement the counter if the key is present in the tracked keys. If it is not present
// it will be ignored.
func (n *NamedWaitGroup) Done(key string) {
n.mu.Lock()
defer n.mu.Unlock()

if n.keys.Has(key) {
n.keys.Delete(key)
n.waitGroup.Done()
}
}

// Wait will wait for the underlying waitGroup to complete.
func (n *NamedWaitGroup) Wait() {
n.waitGroup.Wait()
}

// SyncedCallback can be used to wait for a callback to be called at least once for a list of keys.
type SyncedCallback struct {
// namedWaitGroup will block until the callback has been called for all tracked entities
namedWaitGroup *NamedWaitGroup

// callback is the callback that is intended to be called at least once for each key
// being tracked via WaitGroup
callback func(obj interface{})
}

// NewSyncedCallback will return a SyncedCallback that will track the provided keys.
func NewSyncedCallback(keys []string, callback func(obj interface{})) *SyncedCallback {
s := &SyncedCallback{
callback: callback,
namedWaitGroup: NewNamedWaitGroup(),
}
for _, key := range keys {
s.namedWaitGroup.Add(key)
}
return s
}

// Event is intended to be a wrapper for the actual event handler; this wrapper will signal via
// the wait group that the event handler has been called at least once for the key.
func (s *SyncedCallback) Call(obj interface{}, key string) {
s.callback(obj)
s.namedWaitGroup.Done(key)
}

// WaitForAllKeys will block until s.Call has been called for all the keys we are tracking or the stop signal is
// received.
func (s *SyncedCallback) WaitForAllKeys(stopCh <-chan struct{}) error {
c := make(chan struct{})
go func() {
defer close(c)
s.namedWaitGroup.Wait()
}()
select {
case <-c:
return nil
case <-stopCh:
return wait.ErrWaitTimeout
}
}
Loading

0 comments on commit 24377c9

Please sign in to comment.