Skip to content

Commit

Permalink
etcdserver: backport check scheduledCompactKeyName and finishedCompac…
Browse files Browse the repository at this point in the history
…tKeyName before writing hash to release-3.5.

Fix etcd-io#15919.
Check ScheduledCompactKeyName and FinishedCompactKeyName
before writing hash to hashstore.
If they do not match, then it means this compaction has once been interrupted and its hash value is invalid. In such cases, we won't write the hash values to the hashstore, and avoids the incorrect corruption alarm.

Signed-off-by: caojiamingalan <alan.c.19971111@gmail.com>
  • Loading branch information
CaojiamingAlan committed Jul 15, 2023
1 parent 9ac1d73 commit 6ac9d94
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
36 changes: 32 additions & 4 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,27 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
return nil, compactMainRev, nil
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {
// checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
func (s *store) checkPrevCompactionCompleted() bool {
tx := s.b.ReadTx()
tx.Lock()
defer tx.Unlock()
_, scheduledCompactBytes := tx.UnsafeRange(buckets.Meta, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
}

_, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0)
finishedCompact := int64(0)
if len(finishedCompactBytes) != 0 {
finishedCompact = bytesToRev(finishedCompactBytes[0]).main

}
return scheduledCompact == finishedCompact
}

func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
ch := make(chan struct{})
var j = func(ctx context.Context) {
if ctx.Err() != nil {
Expand All @@ -240,7 +260,13 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
s.compactBarrier(context.TODO(), ch)
return
}
s.hashes.Store(hash)
// Only store the hash value if the previous hash is completed, i.e. this compaction
// hashes every revision from last compaction. For more details, see #15919.
if prevCompactionCompleted {
s.hashes.Store(hash)
} else {
s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
}
close(ch)
}

Expand All @@ -250,17 +276,19 @@ func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-ch
}

func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
if err != nil {
return ch, err
}

return s.compact(traceutil.TODO(), rev, prevCompactRev)
return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
s.mu.Lock()

prevCompactionCompleted := s.checkPrevCompactionCompleted()
ch, prevCompactRev, err := s.updateCompactRev(rev)
trace.Step("check and update compact revision")
if err != nil {
Expand All @@ -269,7 +297,7 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
}
s.mu.Unlock()

return s.compact(trace, rev, prevCompactRev)
return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
}

func (s *store) Commit() {
Expand Down
4 changes: 4 additions & 0 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ func TestStoreCompact(t *testing.T) {
fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
key1 := newTestKeyBytes(revision{1, 0}, false)
key2 := newTestKeyBytes(revision{2, 0}, false)
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}

s.Compact(traceutil.TODO(), 3)
Expand All @@ -344,6 +346,8 @@ func TestStoreCompact(t *testing.T) {
end := make([]byte, 8)
binary.BigEndian.PutUint64(end, uint64(4))
wact := []testutil.Action{
{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}},
{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}},
{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
{Name: "delete", Params: []interface{}{buckets.Key, key2}},
Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type etcdProcessClusterConfig struct {
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
WatchProcessNotifyInterval time.Duration
CompactionBatchLimit int
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -339,6 +340,9 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}
if cfg.CompactionBatchLimit != 0 {
args = append(args, "--experimental-compaction-batch-limit", fmt.Sprintf("%d", cfg.CompactionBatchLimit))
}

etcdCfgs[i] = &etcdServerProcessConfig{
lg: lg,
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/corrupt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/datadir"
Expand Down Expand Up @@ -180,3 +181,61 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) {
assert.NoError(t, err, "error on alarm list")
assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: 0}}, alarmResponse.Alarms)
}

func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) {
checkTime := time.Second
BeforeTest(t)

slowCompactionNodeIndex := 1

// Start a new cluster, with compact hash check enabled.
t.Log("creating a new cluster with 3 nodes...")

epc, err := newEtcdProcessCluster(t, &etcdProcessClusterConfig{
clusterSize: 3,
keepDataDir: true,
CompactHashCheckEnabled: true,
CompactHashCheckTime: checkTime,
logLevel: "info",
CompactionBatchLimit: 1,
})
require.NoError(t, err)
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

// Put 200 identical keys to the cluster, so that the compaction will drop some stale values.
// We need a relatively big number here to make the compaction takes a non-trivial time, and we can interrupt it.
t.Log("putting 200 values to the identical key...")
cc := NewEtcdctl(epc.EndpointsV3(), clientNonTLS, false, false)

for i := 0; i < 200; i++ {
err = cc.Put("key", fmt.Sprint(i))
require.NoError(t, err, "error on put")
}

t.Log("compaction started...")
_, err = cc.Compact(200)

t.Logf("restart proc %d to interrupt its compaction...", slowCompactionNodeIndex)
err = epc.procs[slowCompactionNodeIndex].Restart()
require.NoError(t, err)

// Wait until the node finished compaction.
_, err = epc.procs[slowCompactionNodeIndex].Logs().Expect("finished scheduled compaction")
require.NoError(t, err, "can't get log indicating finished scheduled compaction")

// Wait for compaction hash check
time.Sleep(checkTime * 5)

alarmResponse, err := cc.AlarmList()
require.NoError(t, err, "error on alarm list")
for _, alarm := range alarmResponse.Alarms {
if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT {
t.Fatal("there should be no corruption after resuming the compaction, but corruption detected")
}
}
t.Log("no corruption detected.")
}

0 comments on commit 6ac9d94

Please sign in to comment.