Skip to content

Commit

Permalink
informers: don't treat relist same as sync
Browse files Browse the repository at this point in the history
Background:

Before this change, DeltaFIFO emits the Sync DeltaType on Resync() and
Replace(). Seperately, the SharedInformer will only pass that event
on to handlers that have a ResyncInterval and are due for Resync. This
can cause updates to be lost if an object changes as part of the Replace(),
as it may be incorrectly discarded if the handler does not want a Resync.

What this change does:

Creates a new DeltaType, Replaced, which is emitted by DeltaFIFO on
Replace(). For backwards compatability concerns, the old behavior of
always emitting Sync is preserved unless explicity overridden.

As a result, if an object changes (or is added) on Replace(), now all
SharedInformer handlers will get a correct Add() or Update()
notification.

One additional side-effect is that handlers which do not ever want
Resyncs will now see them for all objects that have not changed during
the Replace.

Kubernetes-commit: ca1eeb99b530a6d76b464dad545abc18d4508c49
  • Loading branch information
squeed authored and k8s-publishing-bot committed Dec 16, 2019
1 parent 802190f commit b775e00
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 16 deletions.
7 changes: 5 additions & 2 deletions tools/cache/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,10 @@ func newInformer(
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})

cfg := &Config{
Queue: fifo,
Expand All @@ -377,7 +380,7 @@ func newInformer(
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
Expand Down
83 changes: 74 additions & 9 deletions tools/cache/delta_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (
// NewDeltaFIFO returns a Queue which can be used to process changes to items.
//
// keyFunc is used to figure out what key an object should have. (It is
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
// around deleted objects and queue state).
//
// 'knownObjects' may be supplied to modify the behavior of Delete,
// Replace, and Resync. It may be nil if you do not need those
Expand All @@ -56,12 +57,62 @@ import (
// and internal tests.
//
// Also see the comment on DeltaFIFO.
//
// Warning: This constructs a DeltaFIFO that does not differentiate between
// events caused by a call to Replace (e.g., from a relist, which may
// contain object updates), and synthetic events caused by a periodic resync
// (which just emit the existing object). See https://issue.k8s.io/86015 for details.
//
// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
// instead to receive a `Replaced` event depending on the type.
//
// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: keyFunc,
KnownObjects: knownObjects,
})
}

// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
// optional.
type DeltaFIFOOptions struct {

// KeyFunction is used to figure out what key an object should have. (It's
// exposed in the returned DeltaFIFO's KeyOf() method, with additional
// handling around deleted objects and queue state).
// Optional, the default is MetaNamespaceKeyFunc.
KeyFunction KeyFunc

// KnownObjects is expected to return a list of keys that the consumer of
// this queue "knows about". It is used to decide which items are missing
// when Replace() is called; 'Deleted' deltas are produced for the missing items.
// KnownObjects may be nil if you can tolerate missing deletions on Replace().
KnownObjects KeyListerGetter

// EmitDeltaTypeReplaced indicates that the queue consumer
// understands the Replaced DeltaType. Before the `Replaced` event type was
// added, calls to Replace() were handled the same as Sync(). For
// backwards-compatibility purposes, this is false by default.
// When true, `Replaced` events will be sent for items passed to a Replace() call.
// When false, `Sync` events will be sent instead.
EmitDeltaTypeReplaced bool
}

// NewDeltaFIFOWithOptions returns a Store which can be used process changes to
// items. See also the comment on DeltaFIFO.
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.KeyFunction == nil {
opts.KeyFunction = MetaNamespaceKeyFunc
}

f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
knownObjects: knownObjects,
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,

emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
}
f.cond.L = &f.lock
return f
Expand Down Expand Up @@ -134,6 +185,10 @@ type DeltaFIFO struct {
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex

// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
}

var (
Expand Down Expand Up @@ -446,7 +501,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
}

// Replace atomically does two things: (1) it adds the given objects
// using the Sync type of Delta and then (2) it does some deletions.
// using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
Expand All @@ -460,13 +515,19 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
defer f.lock.Unlock()
keys := make(sets.String, len(list))

// keep backwards compat for old clients
action := Sync
if f.emitDeltaTypeReplaced {
action = Replaced
}

for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(Sync, item); err != nil {
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
Expand Down Expand Up @@ -600,10 +661,14 @@ const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
// Replaced is emitted when we encountered watch errors and had to do a
// relist. We don't know if the replaced object has changed.
//
// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
// as well. Hence, Replaced is only emitted when the option
// EmitDeltaTypeReplaced is true.
Replaced DeltaType = "Replaced"
// Sync is for synthetic events during a periodic resync.
Sync DeltaType = "Sync"
)

Expand Down
72 changes: 72 additions & 0 deletions tools/cache/delta_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,24 @@ func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
}
}

func TestDeltaFIFO_Resync(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)}
}),
)
f.Resync()

deltas := f.items["foo"]
if len(deltas) != 1 {
t.Fatalf("unexpected deltas length: %v", deltas)
}
if deltas[0].Type != Sync {
t.Errorf("unexpected delta: %v", deltas[0])
}
}

func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
Expand Down Expand Up @@ -384,6 +402,60 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
}
}

// TestDeltaFIFO_ReplaceMakesDeletionsReplaced is the same as the above test, but
// ensures that a Replaced DeltaType is emitted.
func TestDeltaFIFO_ReplaceMakesDeletionsReplaced(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
EmitDeltaTypeReplaced: true,
})

f.Delete(mkFifoObj("baz", 10))
f.Replace([]interface{}{mkFifoObj("foo", 6)}, "0")

expectedList := []Deltas{
{{Deleted, mkFifoObj("baz", 10)}},
{{Replaced, mkFifoObj("foo", 6)}},
// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
}

for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
}

// TestDeltaFIFO_ReplaceDeltaType checks that passing EmitDeltaTypeReplaced
// means that Replaced is correctly emitted.
func TestDeltaFIFO_ReplaceDeltaType(t *testing.T) {
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: literalListerGetter(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5)}
}),
EmitDeltaTypeReplaced: true,
})
f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")

expectedList := []Deltas{
{{Replaced, mkFifoObj("foo", 5)}},
}

for _, expected := range expectedList {
cur := Pop(f).(Deltas)
if e, a := expected, cur; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
}

func TestDeltaFIFO_UpdateResyncRace(t *testing.T) {
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
Expand Down
11 changes: 7 additions & 4 deletions tools/cache/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ type deleteNotification struct {
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

cfg := &Config{
Queue: fifo,
Expand Down Expand Up @@ -478,19 +481,19 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := d.Type == Sync
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
Expand Down
69 changes: 68 additions & 1 deletion tools/cache/shared_informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (l *testListener) satisfiedExpectations() bool {
l.lock.RLock()
defer l.lock.RUnlock()

return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
}

func TestListenerResyncPeriods(t *testing.T) {
Expand Down Expand Up @@ -263,3 +263,70 @@ func TestSharedInformerInitializationRace(t *testing.T) {
go informer.Run(stop)
close(stop)
}

// TestSharedInformerWatchDisruption simulates a watch that was closed
// with updates to the store during that time. We ensure that handlers with
// resync and no resync see the expected state.
func TestSharedInformerWatchDisruption(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()

source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})

// create the shared informer and resync every 1s
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)

clock := clock.NewFakeClock(time.Now())
informer.clock = clock
informer.processor.clock = clock

// listener, never resync
listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod)

listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2")
informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod)
listeners := []*testListener{listenerNoResync, listenerResync}

stop := make(chan struct{})
defer close(stop)

go informer.Run(stop)

for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}

// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3"}})
source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2"}})

// Ensure that nobody saw any changes
for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}

for _, listener := range listeners {
listener.receivedItemNames = []string{}
}

listenerNoResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")

// This calls shouldSync, which deletes noResync from the list of syncingListeners
clock.Step(1 * time.Second)

// Simulate a connection loss (or even just a too-old-watch)
source.ResetWatch()

for _, listener := range listeners {
if !listener.ok() {
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
}
}
}

0 comments on commit b775e00

Please sign in to comment.