Skip to content

Commit

Permalink
Merge pull request kubernetes#18233 from tnozicka/backport-watch-cach…
Browse files Browse the repository at this point in the history
…e-fix-58547

Automatic merge from submit-queue (batch tested with PRs 18233, 18068, 18228, 18227).

UPSTREAM: 58547: Send correct resource version for delete events from watch cache

Backport of kubernetes#58547

Watch cache was returning incorrect (old) ResourceVersion on "deleted" events breaking informers that were going back in time. This fixes it.

/assign @liggitt
/cc @mfojtik

Fixes openshift/origin#17581 openshift/origin#16003 and likely others

Origin-commit: 042a63f8c1effc2fb911ce2cf494458872e9f8a3
  • Loading branch information
k8s-publishing-bot committed Jan 25, 2018
2 parents 65524d4 + d2bc2c0 commit c37a4d0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 60 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
Expand Down
37 changes: 22 additions & 15 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
c.Lock()
defer c.Unlock()
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)

c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
Expand Down Expand Up @@ -786,22 +786,24 @@ func (c *errWatcher) Stop() {
// cacherWatch implements watch.Interface
type cacheWatcher struct {
sync.Mutex
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter watchFilterFunc
stopped bool
forget func(bool)
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter watchFilterFunc
stopped bool
forget func(bool)
versioner Versioner
}

func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
}
go watcher.process(initEvents, resourceVersion)
return watcher
Expand Down Expand Up @@ -890,7 +892,12 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Deleted, Object: event.PrevObject.DeepCopyObject()}
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
}

// We need to ensure that if we put event X to the c.result, all
Expand Down
118 changes: 73 additions & 45 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package storage

import (
"fmt"
"reflect"
"strconv"
"sync"
"testing"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -48,7 +52,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
}
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w := newCacheWatcher(0, 0, initEvents, filter, forget)
w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{})
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock()
Expand All @@ -73,79 +77,88 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) {
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 1,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}},
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
},
},
// properly handle ignoring changes prior to the filter, then getting added, then deleted
{
events: []*watchCacheEvent{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 1,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 2,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 3,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
ObjFields: fields.Set{"spec.nodeName": "host"},
ResourceVersion: 4,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}},
PrevObjFields: fields.Set{"spec.nodeName": "host"},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 5,
},
{
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""},
Type: watch.Modified,
PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}},
PrevObjFields: fields.Set{"spec.nodeName": ""},
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}},
ObjFields: fields.Set{"spec.nodeName": ""},
ResourceVersion: 6,
},
},
expected: []watch.Event{
{Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}},
{Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}},
{Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}},
},
},
}
Expand All @@ -157,7 +170,7 @@ TestCase:
for j := range testCase.events {
testCase.events[j].ResourceVersion = uint64(j) + 1
}
w := newCacheWatcher(0, 0, testCase.events, filter, forget)
w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{})
ch := w.ResultChan()
for j, event := range testCase.expected {
e := <-ch
Expand All @@ -175,3 +188,18 @@ TestCase:
w.Stop()
}
}

type testVersioner struct{}

func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error {
return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
}
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
return 0, fmt.Errorf("unimplemented")
}

0 comments on commit c37a4d0

Please sign in to comment.