Skip to content

Commit

Permalink
etcdserver: add e2e test to reproduce the incorrect hash issue when r…
Browse files Browse the repository at this point in the history
…esuming scheduled compaction

Signed-off-by: caojiamingalan <alan.c.19971111@gmail.com>
  • Loading branch information
CaojiamingAlan committed May 31, 2023
1 parent bd5d0a5 commit 24a92a4
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 5 deletions.
27 changes: 22 additions & 5 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -238,7 +248,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
}
close(ch)
})

Expand All @@ -248,17 +264,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()

prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -267,7 +284,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Commit() {
Expand Down
4 changes: 4 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func TestStoreCompact(t *testing.T) {
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(lg, revision{1, 0}, false)
key2 := newTestKeyBytes(lg, revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}

s.Compact(traceutil.TODO(), 3)
Expand All @@ -349,6 +351,8 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{schema.Key, key2}},
Expand Down
104 changes: 104 additions & 0 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package e2e
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -193,3 +194,106 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
}

func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

slowCompactionNodeIndex := 1

// Start a new cluster, with compact hash check enabled.
t.Log("creating a new cluster with 1 node...")

dataDirPath := t.TempDir()
cfg := e2e.NewConfig(
e2e.WithKeepDataDir(true),
e2e.WithCompactHashCheckEnabled(true),
e2e.WithCompactHashCheckTime(checkTime),
e2e.WithClusterSize(3),
e2e.WithDataDirPath(dataDirPath),
e2e.WithLogLevel("info"),
)
epc, err := e2e.InitEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not init etcd process cluster (%v)", err)
}

// Assign a node a very slow compaction speed, so that its compaction can be interrupted.
err = epc.UpdateProcOptions(slowCompactionNodeIndex, t,
e2e.WithCompactionBatchLimit(1),
e2e.WithCompactionSleepInterval(1*time.Hour),
)
if err != nil {
t.Fatalf("could not update etcd process options (%v)", err)
}

epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

// Put 10 identical keys to the cluster, so that the compaction will drop some stale values.
t.Log("putting 10 values to the identical key...")
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
assert.NoError(t, err)
for i := 0; i < 10; i++ {
err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}

t.Log("compaction started...")
_, err = cc.Compact(ctx, 5, config.CompactOption{})

err = epc.UpdateProcOptions(slowCompactionNodeIndex, t)
if err != nil {
t.Fatalf("could not update etcd process options (%v)", err)
}

t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
err = epc.Procs[slowCompactionNodeIndex].Restart(ctx)

// Wait until the node finished compaction and the leader finished compaction hash check
_, err = epc.Procs[slowCompactionNodeIndex].Logs().ExpectWithContext(ctx, "finished scheduled compaction")
if err != nil {
t.Fatalf("can't get log indicationg finished scheduled compaction")
}

leaderIndex := epc.WaitLeader(t)
_, err = epc.Procs[leaderIndex].Logs().ExpectFunc(ctx, func(s string) bool {
// NB: there are several possible branches in CompactHashCheck(),
// need to check each of them to confirm the check is finished,
// otherwise this ExpectFunc would be blocked.
// todo log "finished compaction hash check" on every possible branch
possibleLogs := []string{
"finished compaction hash check",
"successfully checked hash on whole cluster",
"Detected compaction hash mismatch",
}
for _, possibleLog := range possibleLogs {
if strings.Contains(s, possibleLog) {
return true
}
}
return false
})
if err != nil {
t.Fatalf("can't get log indicationg finished compaction hash check")
}

alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
for _, alarm := range alarmResponse.Alarms {
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
t.Fatalf("there should be no corruption after resuming the compaction, but corruption detected")
}
}
t.Log("no corruption detected.")
}
35 changes: 35 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckTime time.Duration
GoFailEnabled bool
CompactionBatchLimit int
CompactionSleepInterval time.Duration

WarningUnaryRequestDuration time.Duration
ExperimentalWarningUnaryRequestDuration time.Duration
Expand Down Expand Up @@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}

func WithCompactionSleepInterval(time time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time }
}

func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval }
}
Expand Down Expand Up @@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}
if cfg.CompactionSleepInterval != 0 {
args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String())
}
if cfg.WarningUnaryRequestDuration != 0 {
args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String())
}
Expand Down Expand Up @@ -813,6 +821,33 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
return proc.Start(ctx)
}

// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical
// to the cluster.
func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...EPClusterOption) error {
cfg := *epc.Cfg
for _, opt := range opts {
opt(&cfg)
}
serverCfg := cfg.EtcdServerProcessConfig(tb, i)

var initialCluster []string
for _, p := range epc.Procs {
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String()))
}
epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new")

proc, err := NewEtcdProcess(serverCfg)
if err != nil {
return err
}
err = epc.Procs[i].Close()
if err != nil {
return err
}
epc.Procs[i] = proc
return nil
}

func (epc *EtcdProcessCluster) Start(ctx context.Context) error {
return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) })
}
Expand Down
1 change: 1 addition & 0 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type EtcdProcess interface {

type LogsExpect interface {
ExpectWithContext(context.Context, string) (string, error)
ExpectFunc(ctx context.Context, f func(string) bool) (string, error)
Lines() []string
LineCount() int
}
Expand Down

0 comments on commit 24a92a4

Please sign in to comment.