Skip to content

Commit

Permalink
fix the data inconsistency by moving the SetConsistentIndex into the …
Browse files Browse the repository at this point in the history
…transaction lock
  • Loading branch information
ahrtr committed Mar 28, 2022
1 parent be29295 commit c14ab30
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 7 deletions.
14 changes: 12 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,8 @@ func (s *EtcdServer) apply(

// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.setUnlockCallback(e.Index, e.Term)
//s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}

Expand All @@ -1795,13 +1796,22 @@ func (s *EtcdServer) apply(
return appliedt, appliedi, shouldStop
}

func (s *EtcdServer) setUnlockCallback(index, term uint64) {
if s.be != nil {
s.be.SetOnPreUnlockFunc(func() {
s.consistIndex.SetConsistentIndex(index, term)
})
}
}

// applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
s.setUnlockCallback(e.Index, e.Term)
//s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",
Expand Down
8 changes: 3 additions & 5 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
}}

_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
assert.Equal(t, uint64(2), appliedi)
consistIndex := srv.consistIndex.ConsistentIndex()
if consistIndex != appliedi {
t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi)
}

t.Run("verify-backend", func(t *testing.T) {
tx := be.BatchTx()
Expand All @@ -698,9 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx))
})
rindex, rterm := schema.ReadConsistentIndex(be.BatchTx())
rindex, _ := schema.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)

}

func realisticRaftNode(lg *zap.Logger) *raftNode {
Expand Down
17 changes: 17 additions & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type Backend interface {
Defrag() error
ForceCommit()
Close() error

// SetOnPreUnlockFunc sets a callback function which is called before unlocking the batchTx.
SetOnPreUnlockFunc(func())
OnPreUnlock()
}

type Snapshot interface {
Expand Down Expand Up @@ -119,9 +123,22 @@ type backend struct {

hooks Hooks

onUnlock func()

lg *zap.Logger
}

func (b *backend) SetOnPreUnlockFunc(f func()) {
b.onUnlock = f
}

func (b *backend) OnPreUnlock() {
if b.onUnlock != nil {
b.onUnlock()
b.onUnlock = nil
}
}

type BackendConfig struct {
// Path is the file path to the backend file.
Path string
Expand Down
2 changes: 2 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,8 @@ func (b *fakeBackend) Snapshot() backend.Snapshot
func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil }
func (b *fakeBackend) Close() error { return nil }
func (b *fakeBackend) SetOnPreUnlockFunc(func()) {}
func (b *fakeBackend) OnPreUnlock() {}

type indexGetResp struct {
rev revision
Expand Down
1 change: 1 addition & 0 deletions server/storage/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (tw *storeTxnWrite) End() {
tw.s.revMu.Lock()
tw.s.currentRev++
}
tw.s.b.OnPreUnlock()
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions server/storage/schema/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
s.be.OnPreUnlock()
tx.UnsafePut(Members, mkey, mvalue)
}

Expand All @@ -73,6 +74,7 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) {
tx := s.be.BatchTx()
tx.Lock()
defer tx.Unlock()
s.be.OnPreUnlock()
tx.UnsafeDelete(Members, mkey)
tx.UnsafePut(MembersRemoved, mkey, []byte("removed"))
}
Expand Down

0 comments on commit c14ab30

Please sign in to comment.