Skip to content

Commit

Permalink
Merge pull request #8806 from jpbetz/automated-cherry-pick-of-#8427-o…
Browse files Browse the repository at this point in the history
…rigin-release-3.1-1509570173

Automated cherry pick of #8427 to release 3.1 branch
  • Loading branch information
xiang90 authored Nov 10, 2017
2 parents 00d6d4a + e83f50e commit 3ab9894
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
15 changes: 15 additions & 0 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Unlock()
}

func (s *watchableStore) Restore(b backend.Backend) error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.store.Restore(b)
if err != nil {
return err
}

for wa := range s.synced.watchers {
s.unsynced.watchers.add(wa)
}
s.synced = newWatcherGroup()
return nil
}

// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()
Expand Down
33 changes: 33 additions & 0 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,39 @@ func TestWatchFutureRev(t *testing.T) {
}
}

func TestWatchRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)

newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(testKey, nil, rev-1)

newStore.Restore(b)
select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
}
}

// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
Expand Down

0 comments on commit 3ab9894

Please sign in to comment.