From e83f50ec7c5d66108cff969335430ec711c6ef83 Mon Sep 17 00:00:00 2001 From: fengshaobao 00231050 Date: Thu, 17 Aug 2017 20:38:34 +0800 Subject: [PATCH] mvcc: sending events after restore Fixes: #8411 --- mvcc/watchable_store.go | 15 +++++++++++++++ mvcc/watchable_store_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index dbb79bcb693..028e05b9ad8 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -258,6 +258,21 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Unlock() } +func (s *watchableStore) Restore(b backend.Backend) error { + s.mu.Lock() + defer s.mu.Unlock() + err := s.store.Restore(b) + if err != nil { + return err + } + + for wa := range s.synced.watchers { + s.unsynced.watchers.add(wa) + } + s.synced = newWatcherGroup() + return nil +} + // syncWatchersLoop syncs the watcher in the unsynced map every 100ms. func (s *watchableStore) syncWatchersLoop() { defer s.wg.Done() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 42f69ee7561..42d2ae57151 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -294,6 +294,39 @@ func TestWatchFutureRev(t *testing.T) { } } +func TestWatchRestore(t *testing.T) { + b, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(b, &lease.FakeLessor{}, nil) + defer cleanup(s, b, tmpPath) + + testKey := []byte("foo") + testValue := []byte("bar") + rev := s.Put(testKey, testValue, lease.NoLease) + + newBackend, newPath := backend.NewDefaultTmpBackend() + newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) + defer cleanup(newStore, newBackend, newPath) + + w := newStore.NewWatchStream() + w.Watch(testKey, nil, rev-1) + + newStore.Restore(b) + select { + case resp := <-w.Chan(): + if resp.Revision != rev { + t.Fatalf("rev = %d, want %d", resp.Revision, rev) + } + if len(resp.Events) != 1 { + t.Fatalf("failed to get events from the response") + } + if resp.Events[0].Kv.ModRevision != rev { + t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) + } + case <-time.After(time.Second): + t.Fatal("failed to receive event in 1 second.") + } +} + // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend()