From 324370688034a6cf887097c428934f988e83abb6 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sun, 28 Aug 2022 04:22:12 +0800 Subject: [PATCH] fix the potential data loss for clusters with only one member For a cluster with only one member, the raft always send identical unstable entries and committed entries to etcdserver, and etcd responds to the client once it finishes (actually partially) the applying workflow. When the client receives the response, it doesn't mean etcd has already successfully saved the data, including BoltDB and WAL, because: 1. etcd commits the boltDB transaction periodically instead of on each request; 2. etcd saves WAL entries in parallel with applying the committed entries. Accordingly, it may run into a situation of data loss when the etcd crashes immediately after responding to the client and before the boltDB and WAL successfully save the data to disk. Note that this issue can only happen for clusters with only one member. For clusters with multiple members, it isn't an issue, because etcd will not commit & apply the data before it being replicated to majority members. When the client receives the response, it means the data must have been applied. It further means the data must have been committed. Note: for clusters with multiple members, the raft will never send identical unstable entries and committed entries to etcdserver. Signed-off-by: Benjamin Wang --- server/etcdserver/raft.go | 69 +++++++++++++--- server/etcdserver/raft_test.go | 131 +++++++++++++++++++++++++++++-- server/etcdserver/server.go | 52 ++++++++---- server/etcdserver/server_test.go | 4 +- 4 files changed, 224 insertions(+), 32 deletions(-) diff --git a/server/etcdserver/raft.go b/server/etcdserver/raft.go index 704e45a7aac..128f35c0578 100644 --- a/server/etcdserver/raft.go +++ b/server/etcdserver/raft.go @@ -68,8 +68,10 @@ func init() { type toApply struct { entries []raftpb.Entry snapshot raftpb.Snapshot - // notifyc synchronizes etcd server applies with the raft node - notifyc chan struct{} + // snapNotifyc synchronizes etcd server applies with the raft node + snapNotifyc chan struct{} + // walNotifyc synchronizes etcd server applies with the WAL persistence + walNotifyc chan struct{} } type raftNode struct { @@ -200,11 +202,16 @@ func (r *raftNode) start(rh *raftReadyHandler) { } } - notifyc := make(chan struct{}, 1) + snapNotifyc := make(chan struct{}, 1) + var walNotifyc chan struct{} + if shouldWaitWALSync(rd) { + walNotifyc = make(chan struct{}, 1) + } ap := toApply{ - entries: rd.CommittedEntries, - snapshot: rd.Snapshot, - notifyc: notifyc, + entries: rd.CommittedEntries, + snapshot: rd.Snapshot, + snapNotifyc: snapNotifyc, + walNotifyc: walNotifyc, } updateCommittedIndex(&ap, rh) @@ -237,6 +244,12 @@ func (r *raftNode) start(rh *raftReadyHandler) { if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) } + + if walNotifyc != nil { + // etcdserver should wait for this notification before responding to client. + walNotifyc <- struct{}{} + } + if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } @@ -252,7 +265,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { } // etcdserver now claim the snapshot has been persisted onto the disk - notifyc <- struct{}{} + snapNotifyc <- struct{}{} // gofail: var raftBeforeApplySnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) @@ -272,7 +285,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { msgs := r.processMessages(rd.Messages) // now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots - notifyc <- struct{}{} + snapNotifyc <- struct{}{} // Candidate or follower needs to wait for all pending configuration // changes to be applied before sending messages. @@ -293,7 +306,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { // to be in sync with scheduled config-change job // (assume notifyc has cap of 1) select { - case notifyc <- struct{}{}: + case snapNotifyc <- struct{}{}: case <-r.stopped: return } @@ -303,7 +316,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.transport.Send(msgs) } else { // leader already processed 'MsgSnap' and signaled - notifyc <- struct{}{} + snapNotifyc <- struct{}{} } r.Advance() @@ -314,6 +327,42 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +// For a cluster with only one member, the raft may send both the +// unstable entries and committed entries to etcdserver, and there +// may have overlapped log entries between them. +// +// etcd responds to the client once it finishes (actually partially) +// the applying workflow. But when the client receives the response, +// it doesn't mean etcd has already successfully saved the data, +// including BoltDB and WAL, because: +// 1. etcd commits the boltDB transaction periodically instead of on each request; +// 2. etcd saves WAL entries in parallel with applying the committed entries. +// Accordingly, it might run into a situation of data loss when the etcd crashes +// immediately after responding to the client and before the boltDB and WAL +// successfully save the data to disk. +// Note that this issue can only happen for clusters with only one member. +// +// For clusters with multiple members, it isn't an issue, because etcd will +// not commit & apply the data before it being replicated to majority members. +// When the client receives the response, it means the data must have been applied. +// It further means the data must have been committed. +// Note: for clusters with multiple members, the raft will never send identical +// unstable entries and committed entries to etcdserver. +// +// Refer to https://github.com/etcd-io/etcd/issues/14370. +func shouldWaitWALSync(rd raft.Ready) bool { + if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 { + return false + } + + // Check if there is overlap between unstable and committed entries + // assuming that their index and term are only incrementing. + lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1] + firstUnstableEntry := rd.Entries[0] + return lastCommittedEntry.Term > firstUnstableEntry.Term || + (lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index) +} + func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) { var ci uint64 if len(ap.entries) != 0 { diff --git a/server/etcdserver/raft_test.go b/server/etcdserver/raft_test.go index 8e5585ad0c9..7d26ff28db3 100644 --- a/server/etcdserver/raft_test.go +++ b/server/etcdserver/raft_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3" @@ -190,6 +191,52 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { // TestConfigChangeBlocksApply ensures toApply blocks if committed entries contain config-change. func TestConfigChangeBlocksApply(t *testing.T) { + testEtcdserverAndRaftInteraction(t, testConfigChangeBlockApply) +} + +func TestWALSyncNotBlockApply(t *testing.T) { + testEtcdserverAndRaftInteraction(t, testWALSyncNotBlockApply) +} + +func TestWALSyncBlockApply(t *testing.T) { + testEtcdserverAndRaftInteraction(t, testWALSyncBlockApply) +} + +func testConfigChangeBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply { + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } + return <-srv.r.applyc +} + +func testWALSyncNotBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply { + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } + ap := <-srv.r.applyc + if ap.walNotifyc != nil { + t.Error("unexpected ap.walNotifyc, expected nil") + } + return ap +} + +func testWALSyncBlockApply(t *testing.T, srv *EtcdServer, n *readyNode) toApply { + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateFollower}, + Entries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, + } + ap := <-srv.r.applyc + if ap.walNotifyc == nil { + t.Error("unexpected ap.walNotifyc, expected not nil") + } + assert.NotEqual(t, nil, ap.walNotifyc) + return ap +} + +func testEtcdserverAndRaftInteraction(t *testing.T, testFunc func(*testing.T, *EtcdServer, *readyNode) toApply) { n := newNopReadyNode() r := newRaftNode(raftNodeConfig{ @@ -208,11 +255,7 @@ func TestConfigChangeBlocksApply(t *testing.T) { }) defer srv.r.Stop() - n.readyc <- raft.Ready{ - SoftState: &raft.SoftState{RaftState: raft.StateFollower}, - CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}}, - } - ap := <-srv.r.applyc + ap := testFunc(t, srv, n) continueC := make(chan struct{}) go func() { @@ -228,7 +271,7 @@ func TestConfigChangeBlocksApply(t *testing.T) { } // finish toApply, unblock raft routine - <-ap.notifyc + <-ap.snapNotifyc select { case <-continueC: @@ -317,3 +360,79 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) { } } } + +func TestShouldWaitWALSync(t *testing.T) { + testcases := []struct { + name string + unstableEntries []raftpb.Entry + commitedEntries []raftpb.Entry + expectedResult bool + }{ + { + name: "both entries are nil", + unstableEntries: nil, + commitedEntries: nil, + expectedResult: false, + }, + { + name: "both entries are empty slices", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other empty", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other has data", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "one empty and the other has data", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has different term and index", + unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has identical data", + unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: true, + }, + { + name: "has overlapped entry", + unstableEntries: []raftpb.Entry{ + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + {Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}}, + {Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}}, + }, + commitedEntries: []raftpb.Entry{ + {Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}}, + {Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}}, + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + }, + expectedResult: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + shouldWALSync := shouldWaitWALSync(raft.Ready{ + Entries: tc.unstableEntries, + CommittedEntries: tc.commitedEntries, + }) + assert.Equal(t, tc.expectedResult, shouldWALSync) + }) + } +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 99a2159d993..0706cbbf5f4 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -927,7 +927,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { // wait for the raft routine to finish the disk writes before triggering a // snapshot. or applied index might be greater than the last index in raft // storage, since the raft routine might be slower than toApply routine. - <-apply.notifyc + <-apply.snapNotifyc s.triggerSnapshot(ep) select { @@ -975,7 +975,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { } // wait for raftNode to persist snapshot onto the disk - <-toApply.notifyc + <-toApply.snapNotifyc newbe, err := serverstorage.OpenSnapshotBackend(s.Cfg, s.snapshotter, toApply.snapshot, s.beHooks) if err != nil { @@ -1122,8 +1122,9 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) { if len(ents) == 0 { return } + var shouldstop bool - if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop { + if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.walNotifyc); shouldstop { go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } @@ -1792,8 +1793,10 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { func (s *EtcdServer) apply( es []raftpb.Entry, confState *raftpb.ConfState, + walNotifyc chan struct{}, ) (appliedt uint64, appliedi uint64, shouldStop bool) { s.lg.Debug("Applying entries", zap.Int("num-entries", len(es))) + walNotified := false for i := range es { e := es[i] s.lg.Debug("Applying entry", @@ -1802,7 +1805,16 @@ func (s *EtcdServer) apply( zap.Stringer("type", e.Type)) switch e.Type { case raftpb.EntryNormal: - s.applyEntryNormal(&e) + postApplyFunc := s.applyEntryNormal(&e) + if postApplyFunc != nil { + // wait for the WAL entries to be synced. Note we only need + // to wait for the notification once. + if walNotifyc != nil && !walNotified { + walNotified = true + <-walNotifyc + } + postApplyFunc() + } s.setAppliedIndex(e.Index) s.setTerm(e.Term) @@ -1823,6 +1835,12 @@ func (s *EtcdServer) apply( s.setAppliedIndex(e.Index) s.setTerm(e.Term) shouldStop = shouldStop || removedSelf + // wait for the WAL entries to be synced. Note we only need + // to wait for the notification once. + if walNotifyc != nil && !walNotified { + walNotified = true + <-walNotifyc + } s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err}) default: @@ -1838,7 +1856,7 @@ func (s *EtcdServer) apply( } // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer -func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { +func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) func() { shouldApplyV3 := membership.ApplyV2storeOnly applyV3Performed := false var ar *apply.Result @@ -1870,7 +1888,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if s.isLeader() { s.lessor.Promote(s.Cfg.ElectionTimeout()) } - return + + return nil } var raftReq pb.InternalRaftRequest @@ -1879,15 +1898,17 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { rp := &r pbutil.MustUnmarshal(rp, e.Data) s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) - s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3)) - return + return func() { + s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3)) + } } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) - s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3)) - return + return func() { + s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3)) + } } id := raftReq.ID @@ -1909,16 +1930,17 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // do not re-toApply applied entries. if !shouldApplyV3 { - return + return nil } if ar == nil { - return + return nil } if ar.Err != errors.ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { - s.w.Trigger(id, ar) - return + return func() { + s.w.Trigger(id, ar) + } } lg := s.Logger() @@ -1938,6 +1960,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a}) s.w.Trigger(id, ar) }) + + return nil } func noSideEffect(r *pb.InternalRaftRequest) bool { diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index f60f73f4d36..8f1cdbca882 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -688,7 +688,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { Data: pbutil.MustMarshal(cc), }} - _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) + _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, nil) consistIndex := srv.consistIndex.ConsistentIndex() assert.Equal(t, uint64(2), appliedi) @@ -762,7 +762,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { ents = append(ents, ent) } - _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}) + _, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, nil) if !shouldStop { t.Errorf("shouldStop = %t, want %t", shouldStop, true) }