Skip to content

Commit

Permalink
test: fix TestRestoreContinueUnfinishedCompaction
Browse files Browse the repository at this point in the history
The original testcase uses `return` statement which skips `restore`
case. It's aimed to enable `restore` testcase.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Mar 16, 2023
1 parent c498ac0 commit 41e3667
Showing 1 changed file with 52 additions and 47 deletions.
99 changes: 52 additions & 47 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,58 +480,63 @@ func TestRestoreDelete(t *testing.T) {
func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
tests := []string{"recreate", "restore"}
for _, test := range tests {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)

// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
UnsafeSetScheduledCompact(tx, 2)
tx.Unlock()

s0.Close()

var s *store
switch test {
case "recreate":
s = NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
case "restore":
s0.Restore(b)
s = s0
}
test := test

// wait for scheduled compaction to be finished
time.Sleep(100 * time.Millisecond)
t.Run(test, func(t *testing.T) {
b, tmpPath := betesting.NewDefaultTmpBackend(t)
s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)

// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
for i := 0; i < 5; i++ {
tx := s.b.BatchTx()
s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)

// write scheduled compaction, but not do compaction
rbytes := newRevBytes()
revToBytes(revision{main: 2}, rbytes)
tx := s0.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0)
UnsafeSetScheduledCompact(tx, 2)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
continue

var s *store
switch test {
case "recreate":
s0.Close()
s = NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
case "restore":
// TODO(fuweid): store doesn't support to restore
// from a closed status because there is no lock
// for `Close` or action to mark it is closed.
s0.Restore(b)
s = s0
}
// FIXME(fuweid): it doesn't test restore one?
return
}
cleanup(s, b, tmpPath)
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
defer cleanup(s, b, tmpPath)

// wait for scheduled compaction to be finished
time.Sleep(100 * time.Millisecond)

if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
}
// check the key in backend is deleted
revbytes := newRevBytes()
revToBytes(revision{main: 1}, revbytes)

// The disk compaction is done asynchronously and requires more time on slow disk.
// try 5 times for CI with slow IO.
for i := 0; i < 5; i++ {
tx := s.b.BatchTx()
tx.Lock()
ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0)
tx.Unlock()
if len(ks) != 0 {
time.Sleep(100 * time.Millisecond)
continue
}
return
}
t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
})
}
}

Expand Down

0 comments on commit 41e3667

Please sign in to comment.