Skip to content

Commit

Permalink
etcdserver: add e2e test to reproduce the incorrect hash issue when r…
Browse files Browse the repository at this point in the history
…esuming scheduled compaction.

check ScheduledCompactKeyName and FinishedCompactKeyName
before writing hash to hashstore. If they do not match, then it means this compaction has once been interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm.

Signed-off-by: caojiamingalan <alan.c.19971111@gmail.com>
  • Loading branch information
CaojiamingAlan committed May 31, 2023
1 parent bd5d0a5 commit ed987e5
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 5 deletions.
27 changes: 22 additions & 5 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,17 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
j := schedule.NewJob("kvstore_compact", func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -238,7 +248,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
}
close(ch)
})

Expand All @@ -248,17 +264,18 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()

prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -267,7 +284,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Commit() {
Expand Down
4 changes: 4 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func TestStoreCompact(t *testing.T) {
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(lg, revision{1, 0}, false)
key2 := newTestKeyBytes(lg, revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}

s.Compact(traceutil.TODO(), 3)
Expand All @@ -349,6 +351,8 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "range", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []interface{}{schema.Meta, schema.FinishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []interface{}{schema.Meta, schema.ScheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{schema.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{schema.Key, key2}},
Expand Down
104 changes: 104 additions & 0 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package e2e
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -193,3 +194,106 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: memberID}}, alarmResponse.Alarms)
}

func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
checkTime := time.Second
e2e.BeforeTest(t)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

slowCompactionNodeIndex := 1

// Start a new cluster, with compact hash check enabled.
t.Log("creating a new cluster with 1 node...")

dataDirPath := t.TempDir()
cfg := e2e.NewConfig(
e2e.WithKeepDataDir(true),
e2e.WithCompactHashCheckEnabled(true),
e2e.WithCompactHashCheckTime(checkTime),
e2e.WithClusterSize(3),
e2e.WithDataDirPath(dataDirPath),
e2e.WithLogLevel("info"),
)
epc, err := e2e.InitEtcdProcessCluster(t, cfg)
if err != nil {
t.Fatalf("could not init etcd process cluster (%v)", err)
}

// Assign a node a very slow compaction speed, so that its compaction can be interrupted.
err = epc.UpdateProcOptions(slowCompactionNodeIndex, t,
e2e.WithCompactionBatchLimit(1),
e2e.WithCompactionSleepInterval(1*time.Hour),
)
if err != nil {
t.Fatalf("could not update etcd process options (%v)", err)
}

epc, err = e2e.StartEtcdProcessCluster(ctx, epc, cfg)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}

t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

// Put 10 identical keys to the cluster, so that the compaction will drop some stale values.
t.Log("putting 10 values to the identical key...")
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
assert.NoError(t, err)
for i := 0; i < 10; i++ {
err := cc.Put(ctx, "key", fmt.Sprint(i), config.PutOptions{})
assert.NoError(t, err, "error on put")
}

t.Log("compaction started...")
_, err = cc.Compact(ctx, 5, config.CompactOption{})

err = epc.UpdateProcOptions(slowCompactionNodeIndex, t)
if err != nil {
t.Fatalf("could not update etcd process options (%v)", err)
}

t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
err = epc.Procs[slowCompactionNodeIndex].Restart(ctx)

// Wait until the node finished compaction and the leader finished compaction hash check
_, err = epc.Procs[slowCompactionNodeIndex].Logs().ExpectWithContext(ctx, "finished scheduled compaction")
if err != nil {
t.Fatalf("can't get log indicating finished scheduled compaction")
}

leaderIndex := epc.WaitLeader(t)
_, err = epc.Procs[leaderIndex].Logs().ExpectFunc(ctx, func(s string) bool {
// NB: there are several possible branches in CompactHashCheck(),
// need to check each of them to confirm the check is finished,
// otherwise this ExpectFunc would be blocked.
// todo log "finished compaction hash check" on every possible branch
possibleLogs := []string{
"finished compaction hash check",
"successfully checked hash on whole cluster",
"Detected compaction hash mismatch",
}
for _, possibleLog := range possibleLogs {
if strings.Contains(s, possibleLog) {
return true
}
}
return false
})
if err != nil {
t.Fatalf("can't get log indicating finished compaction hash check")
}

alarmResponse, err := cc.AlarmList(ctx)
assert.NoError(t, err, "error on alarm list")
for _, alarm := range alarmResponse.Alarms {
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
t.Fatalf("there should be no corruption after resuming the compaction, but corruption detected")
}
}
t.Log("no corruption detected.")
}
35 changes: 35 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type EtcdProcessClusterConfig struct {
CompactHashCheckTime time.Duration
GoFailEnabled bool
CompactionBatchLimit int
CompactionSleepInterval time.Duration

WarningUnaryRequestDuration time.Duration
ExperimentalWarningUnaryRequestDuration time.Duration
Expand Down Expand Up @@ -341,6 +342,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}

func WithCompactionSleepInterval(time time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionSleepInterval = time }
}

func WithWatchProcessNotifyInterval(interval time.Duration) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.WatchProcessNotifyInterval = interval }
}
Expand Down Expand Up @@ -582,6 +587,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}
if cfg.CompactionSleepInterval != 0 {
args = append(args, "--experimental-compaction-sleep-interval", cfg.CompactionSleepInterval.String())
}
if cfg.WarningUnaryRequestDuration != 0 {
args = append(args, "--warning-unary-request-duration", cfg.WarningUnaryRequestDuration.String())
}
Expand Down Expand Up @@ -813,6 +821,33 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
return proc.Start(ctx)
}

// UpdateProcOptions updates the options for a specific process. If no opt is set, then the config is identical
// to the cluster.
func (epc *EtcdProcessCluster) UpdateProcOptions(i int, tb testing.TB, opts ...EPClusterOption) error {
cfg := *epc.Cfg
for _, opt := range opts {
opt(&cfg)
}
serverCfg := cfg.EtcdServerProcessConfig(tb, i)

var initialCluster []string
for _, p := range epc.Procs {
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String()))
}
epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "new")

proc, err := NewEtcdProcess(serverCfg)
if err != nil {
return err
}
err = epc.Procs[i].Close()
if err != nil {
return err
}
epc.Procs[i] = proc
return nil
}

func (epc *EtcdProcessCluster) Start(ctx context.Context) error {
return epc.start(func(ep EtcdProcess) error { return ep.Start(ctx) })
}
Expand Down
1 change: 1 addition & 0 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type EtcdProcess interface {

type LogsExpect interface {
ExpectWithContext(context.Context, string) (string, error)
ExpectFunc(ctx context.Context, f func(string) bool) (string, error)
Lines() []string
LineCount() int
}
Expand Down

0 comments on commit ed987e5

Please sign in to comment.