From d2bc2c023b3026eeca1b328a4740ccc7e8f0f165 Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Tue, 23 Jan 2018 15:42:19 +0100 Subject: [PATCH] UPSTREAM: 58547: Send correct resource version for delete events from watch cache Origin-commit: 6f1df5683547bd6c5b0bc6f73cc4be92d637e928 --- .../src/k8s.io/apiserver/pkg/storage/BUILD | 1 + .../k8s.io/apiserver/pkg/storage/cacher.go | 37 +++--- .../pkg/storage/cacher_whitebox_test.go | 118 +++++++++++------- 3 files changed, 96 insertions(+), 60 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/BUILD index 1838e84d85ce3..b8ade8bf9b95a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/BUILD @@ -21,6 +21,7 @@ go_test( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go index e3c787d08420b..3ff0d5f9677b7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher.go @@ -338,7 +338,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, c.Lock() defer c.Unlock() forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) - watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget) + watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) c.watcherIdx++ @@ -786,22 +786,24 @@ func (c *errWatcher) Stop() { // cacherWatch implements watch.Interface type cacheWatcher struct { sync.Mutex - input chan *watchCacheEvent - result chan watch.Event - done chan struct{} - filter watchFilterFunc - stopped bool - forget func(bool) + input chan *watchCacheEvent + result chan watch.Event + done chan struct{} + filter watchFilterFunc + stopped bool + forget func(bool) + versioner Versioner } -func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool)) *cacheWatcher { +func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher { watcher := &cacheWatcher{ - input: make(chan *watchCacheEvent, chanSize), - result: make(chan watch.Event, chanSize), - done: make(chan struct{}), - filter: filter, - stopped: false, - forget: forget, + input: make(chan *watchCacheEvent, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + filter: filter, + stopped: false, + forget: forget, + versioner: versioner, } go watcher.process(initEvents, resourceVersion) return watcher @@ -890,7 +892,12 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { case curObjPasses && oldObjPasses: watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} case !curObjPasses && oldObjPasses: - watchEvent = watch.Event{Type: watch.Deleted, Object: event.PrevObject.DeepCopyObject()} + // return a delete event with the previous object content, but with the event's resource version + oldObj := event.PrevObject.DeepCopyObject() + if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil { + utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err)) + } + watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj} } // We need to ensure that if we put event X to the c.result, all diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go index 3ef62e3fd5826..c1fc53898ac22 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher_whitebox_test.go @@ -17,15 +17,19 @@ limitations under the License. package storage import ( + "fmt" "reflect" + "strconv" "sync" "testing" "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -48,7 +52,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w := newCacheWatcher(0, 0, initEvents, filter, forget) + w := newCacheWatcher(0, 0, initEvents, filter, forget, testVersioner{}) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -73,28 +77,31 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) { { events: []*watchCacheEvent{ { - Type: watch.Added, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 1, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 2, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 3, }, }, expected: []watch.Event{ {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, - {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}}, {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, }, }, @@ -102,50 +109,56 @@ func TestCacheWatcherHandlesFiltering(t *testing.T) { { events: []*watchCacheEvent{ { - Type: watch.Added, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Added, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 1, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 2, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 3, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, - ObjFields: fields.Set{"spec.nodeName": "host"}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + ObjFields: fields.Set{"spec.nodeName": "host"}, + ResourceVersion: 4, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, - PrevObjFields: fields.Set{"spec.nodeName": "host"}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}, + PrevObjFields: fields.Set{"spec.nodeName": "host"}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 5, }, { - Type: watch.Modified, - PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, - PrevObjFields: fields.Set{"spec.nodeName": ""}, - Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, - ObjFields: fields.Set{"spec.nodeName": ""}, + Type: watch.Modified, + PrevObject: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}, + PrevObjFields: fields.Set{"spec.nodeName": ""}, + Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6"}}, + ObjFields: fields.Set{"spec.nodeName": ""}, + ResourceVersion: 6, }, }, expected: []watch.Event{ {Type: watch.Added, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3"}}}, {Type: watch.Modified, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, - {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4"}}}, + {Type: watch.Deleted, Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5"}}}, }, }, } @@ -157,7 +170,7 @@ TestCase: for j := range testCase.events { testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, 0, testCase.events, filter, forget) + w := newCacheWatcher(0, 0, testCase.events, filter, forget, testVersioner{}) ch := w.ResultChan() for j, event := range testCase.expected { e := <-ch @@ -175,3 +188,18 @@ TestCase: w.Stop() } } + +type testVersioner struct{} + +func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { + return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10)) +} +func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error { + return fmt.Errorf("unimplemented") +} +func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { + return fmt.Errorf("unimplemented") +} +func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { + return 0, fmt.Errorf("unimplemented") +}