diff --git a/node/raft.go b/node/raft.go index b6c4189..5d4e59f 100644 --- a/node/raft.go +++ b/node/raft.go @@ -444,7 +444,11 @@ func (rc *raftNode) startRaft(ds DataStorage, standalone bool) error { if rc.join { startPeers = nil } - rc.node = raft.StartNode(c, startPeers, isLearner) + if len(startPeers) == 0 { + rc.node = raft.RestartNode(c) + } else { + rc.node = raft.StartNode(c, startPeers, isLearner) + } } rc.initForTransport() rc.wgServe.Add(1) diff --git a/raft/bootstrap.go b/raft/bootstrap.go new file mode 100644 index 0000000..56d1199 --- /dev/null +++ b/raft/bootstrap.go @@ -0,0 +1,97 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "errors" + + pb "github.com/youzan/ZanRedisDB/raft/raftpb" +) + +// Bootstrap initializes the RawNode for first use by appending configuration +// changes for the supplied peers. This method returns an error if the Storage +// is nonempty. +// +// It is recommended that instead of calling this method, applications bootstrap +// their state manually by setting up a Storage that has a first index > 1 and +// which stores the desired ConfState as its InitialState. +func (rn *RawNode) Bootstrap(peers []Peer) error { + if len(peers) == 0 { + return errors.New("must provide at least one peer to Bootstrap") + } + lastIndex, err := rn.raft.raftLog.storage.LastIndex() + if err != nil { + return err + } + + if lastIndex != 0 { + return errors.New("can't bootstrap a nonempty Storage") + } + + // We've faked out initial entries above, but nothing has been + // persisted. Start with an empty HardState (thus the first Ready will + // emit a HardState update for the app to persist). + rn.prevHardSt = emptyState + + // TODO(tbg): remove StartNode and give the application the right tools to + // bootstrap the initial membership in a cleaner way. + rn.raft.becomeFollower(1, None) + ents := make([]pb.Entry, len(peers)) + for i, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, ReplicaID: peer.ReplicaID, + NodeGroup: pb.Group{NodeId: peer.NodeID, Name: rn.raft.group.Name, GroupId: rn.raft.group.GroupId, + RaftReplicaId: peer.ReplicaID}, + Context: peer.Context} + data, err := cc.Marshal() + if err != nil { + return err + } + // TODO(tbg): this should append the ConfChange for the own node first + // and also call applyConfChange below for that node first. Otherwise + // we have a Raft group (for a little while) that doesn't have itself + // in its config, which is bad. + // This whole way of setting things up is rickety. The app should just + // populate the initial ConfState appropriately and then all of this + // goes away. + e := pb.Entry{ + Type: pb.EntryConfChange, + Term: 1, + Index: uint64(i + 1), + Data: data, + } + ents[i] = e + } + rn.raft.raftLog.append(ents...) + + // Now apply them, mainly so that the application can call Campaign + // immediately after StartNode in tests. Note that these nodes will + // be added to raft twice: here and when the application's Ready + // loop calls ApplyConfChange. The calls to addNode must come after + // all calls to raftLog.append so progress.next is set after these + // bootstrapping entries (it is an error if we try to append these + // entries since they have already been committed). + // We do not set raftLog.applied so the application will be able + // to observe all conf changes via Ready.CommittedEntries. + // + // TODO(bdarnell): These entries are still unstable; do we need to preserve + // the invariant that committed < unstable? + rn.raft.raftLog.committed = uint64(len(ents)) + for _, peer := range peers { + grp := pb.Group{NodeId: peer.NodeID, Name: rn.raft.group.Name, GroupId: rn.raft.group.GroupId, + RaftReplicaId: peer.ReplicaID} + rn.raft.addNode(peer.ReplicaID, grp) + } + return nil +} diff --git a/raft/node.go b/raft/node.go index 7b31010..17a242a 100644 --- a/raft/node.go +++ b/raft/node.go @@ -211,68 +211,37 @@ type Peer struct { } type prevState struct { - havePrevLastUnstablei bool - prevLastUnstablei uint64 - prevLastUnstablet uint64 - prevSnapi uint64 - prevSoftSt *SoftState - prevHardSt pb.HardState - prevLead uint64 + prevLead uint64 } -func newPrevState(r *raft) *prevState { +func newPrevState(r *RawNode) *prevState { return &prevState{ - prevSoftSt: r.softState(), - prevHardSt: emptyState, - prevLead: None, + prevLead: None, } } // StartNode returns a new Node given configuration and a list of raft peers. // It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(c *Config, peers []Peer, isLearner bool) Node { - if isLearner { - c.learners = append(c.learners, c.Group) - } - r := newRaft(c) - // become the follower at term 1 and apply initial configuration - // entries of term 1 - r.becomeFollower(1, None) - for _, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, ReplicaID: peer.ReplicaID, - NodeGroup: pb.Group{NodeId: peer.NodeID, Name: r.group.Name, GroupId: r.group.GroupId, - RaftReplicaId: peer.ReplicaID}, - Context: peer.Context} - d, err := cc.Marshal() - if err != nil { - panic("unexpected marshal error") - } - e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} - r.raftLog.append(e) - } - // Mark these initial entries as committed. - // TODO(bdarnell): These entries are still unstable; do we need to preserve - // the invariant that committed < unstable? - r.raftLog.committed = r.raftLog.lastIndex() - // Now apply them, mainly so that the application can call Campaign - // immediately after StartNode in tests. Note that these nodes will - // be added to raft twice: here and when the application's Ready - // loop calls ApplyConfChange. The calls to addNode must come after - // all calls to raftLog.append so progress.next is set after these - // bootstrapping entries (it is an error if we try to append these - // entries since they have already been committed). - // We do not set raftLog.applied so the application will be able - // to observe all conf changes via Ready.CommittedEntries. - for _, peer := range peers { - r.addNode(peer.ReplicaID, pb.Group{NodeId: peer.NodeID, Name: r.group.Name, GroupId: r.group.GroupId, - RaftReplicaId: peer.ReplicaID}) + //if isLearner { + // c.learners = append(c.learners, c.Group) + //} + if len(peers) == 0 { + panic("no peers given; use RestartNode instead") + } + //r := newRaft(c) + + rn, err := NewRawNode(c) + if err != nil { + panic(err) } + rn.Bootstrap(peers) n := newNode() n.logger = c.Logger - n.r = r - n.prevS = newPrevState(r) - off := max(r.raftLog.applied+1, r.raftLog.firstIndex()) + n.r = rn + n.prevS = newPrevState(rn) + off := max(rn.raft.raftLog.applied+1, rn.raft.raftLog.firstIndex()) n.lastSteppedIndex = off n.NotifyEventCh() return &n @@ -283,13 +252,16 @@ func StartNode(c *Config, peers []Peer, isLearner bool) Node { // If the caller has an existing state machine, pass in the last log index that // has been applied to it; otherwise use zero. func RestartNode(c *Config) Node { - r := newRaft(c) + rn, err := NewRawNode(c) + if err != nil { + panic(err) + } n := newNode() n.logger = c.Logger - n.r = r - n.prevS = newPrevState(r) - off := max(r.raftLog.applied+1, r.raftLog.firstIndex()) + n.r = rn + n.prevS = newPrevState(rn) + off := max(rn.raft.raftLog.applied+1, rn.raft.raftLog.firstIndex()) n.lastSteppedIndex = off n.NotifyEventCh() return &n @@ -306,7 +278,7 @@ type node struct { stop chan struct{} status chan chan Status eventNotifyCh chan bool - r *raft + r *RawNode prevS *prevState newReadyFunc func(*raft, *SoftState, pb.HardState, bool) Ready needAdvance bool @@ -358,16 +330,16 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) { if busySnap && m.Type == pb.MsgApp { // ignore msg app while busy snapshot } else { - n.handleReceivedMessage(n.r, m) + n.handleReceivedMessage(n.r.raft, m) } msgs[i].Entries = nil } - if n.handleTicks(n.r) { + if n.handleTicks(n.r.raft) { hasEvent = true } - needHandleProposal := n.handleLeaderUpdate(n.r) + needHandleProposal := n.handleLeaderUpdate(n.r.raft) var ev bool - ev, needHandleProposal = n.handleConfChanged(n.r, needHandleProposal) + ev, needHandleProposal = n.handleConfChanged(n.r.raft, needHandleProposal) if ev { hasEvent = ev } @@ -375,12 +347,12 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) { props := n.propQ.Get() for _, p := range props { hasEvent = true - n.handleProposal(n.r, p) + n.handleProposal(n.r.raft, p) } } - n.handleStatus(n.r) + n.handleStatus(n.r.raft) _ = hasEvent - rd := n.newReadyFunc(n.r, n.prevS.prevSoftSt, n.prevS.prevHardSt, moreEntriesToApply) + rd := n.r.readyWithoutAccept(moreEntriesToApply) if rd.containsUpdates() { n.needAdvance = true var stepIndex uint64 @@ -391,8 +363,8 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) { fi := rd.CommittedEntries[0].Index if n.lastSteppedIndex != 0 && fi > n.lastSteppedIndex+1 { e := fmt.Sprintf("raft.node: %x(%v) index not continued: %v, %v, %v, snap:%v, prev: %v, logs: %v ", - n.r.id, n.r.group, fi, n.lastSteppedIndex, stepIndex, rd.Snapshot.Metadata.String(), n.prevS, - n.r.raftLog.String()) + n.r.raft.id, n.r.raft.group, fi, n.lastSteppedIndex, stepIndex, rd.Snapshot.Metadata.String(), n.prevS, + n.r.raft.raftLog.String()) n.logger.Error(e) } stepIndex = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index @@ -404,10 +376,10 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) { } func (n *node) DebugString() string { - ents := n.r.raftLog.allEntries() + ents := n.r.raft.raftLog.allEntries() e := fmt.Sprintf("raft.node: %x(%v) index not continued: %v, prev: %v, logs: %v, %v ", - n.r.id, n.r.group, n.lastSteppedIndex, n.prevS, len(ents), - n.r.raftLog.String()) + n.r.raft.id, n.r.raft.group, n.lastSteppedIndex, n.prevS, len(ents), + n.r.raft.raftLog.String()) return e } @@ -475,35 +447,8 @@ func (n *node) addReqMessageToQueue(req pb.Message) { } func (n *node) Advance(rd Ready) { - if rd.SoftState != nil { - n.prevS.prevSoftSt = rd.SoftState - } - if len(rd.Entries) > 0 { - n.prevS.prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index - n.prevS.prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term - n.prevS.havePrevLastUnstablei = true - } - if !IsEmptyHardState(rd.HardState) { - n.prevS.prevHardSt = rd.HardState - } - if !IsEmptySnap(rd.Snapshot) { - n.prevS.prevSnapi = rd.Snapshot.Metadata.Index - } - - n.r.msgs = nil - n.r.readStates = nil - - appliedI := rd.appliedCursor() - if appliedI != 0 { - // since the committed entries may less than the hard commit index due to the - // limit for buffer len, we should not use the hard state commit index. - n.r.raftLog.appliedTo(appliedI) - } - if n.prevS.havePrevLastUnstablei { - n.r.raftLog.stableTo(n.prevS.prevLastUnstablei, n.prevS.prevLastUnstablet) - n.prevS.havePrevLastUnstablei = false - } - n.r.raftLog.stableSnapTo(n.prevS.prevSnapi) + n.r.acceptReady(rd) + n.r.Advance(rd) n.needAdvance = false } @@ -512,7 +457,7 @@ func (n *node) ConfChangedCh() <-chan pb.ConfChange { } func (n *node) HandleConfChanged(cc pb.ConfChange) { - n.processConfChanged(n.r, cc, true) + n.processConfChanged(n.r.raft, cc, true) } func (n *node) handleConfChanged(r *raft, needHandleProposal bool) (bool, bool) { diff --git a/raft/node_bench_test.go b/raft/node_bench_test.go index 8316f22..243ceff 100644 --- a/raft/node_bench_test.go +++ b/raft/node_bench_test.go @@ -28,7 +28,7 @@ func BenchmarkOneNode(b *testing.B) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + r := newTestRawNode(1, []uint64{1}, 10, 1, s) n.r = r n.prevS = newPrevState(r) defer n.Stop() diff --git a/raft/node_test.go b/raft/node_test.go index 1862280..b695dcc 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -136,9 +136,10 @@ func TestNodePropose(t *testing.T) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - n.r = r - n.prevS = newPrevState(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n.r = rn + r := rn.raft + n.prevS = newPrevState(rn) n.NotifyEventCh() n.Campaign(context.TODO()) for { @@ -189,10 +190,11 @@ func TestNodeReadIndex(t *testing.T) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft r.readStates = wrs - n.r = r - n.prevS = newPrevState(r) + n.r = rn + n.prevS = newPrevState(rn) n.Campaign(context.TODO()) time.Sleep(time.Millisecond) @@ -347,9 +349,10 @@ func TestNodeProposeConfig(t *testing.T) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - n.r = r - n.prevS = newPrevState(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + r := rn.raft + n.r = rn + n.prevS = newPrevState(rn) n.NotifyEventCh() n.Campaign(context.TODO()) for { @@ -398,9 +401,9 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - n.r = r - n.prevS = newPrevState(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n.r = rn + n.prevS = newPrevState(rn) n.Campaign(context.TODO()) rdyEntries := make([]raftpb.Entry, 0) ticker := time.NewTicker(time.Millisecond * 100) @@ -476,9 +479,10 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) { // who is the current leader. func TestBlockProposal(t *testing.T) { n := newNode() - r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) - n.r = r - n.prevS = newPrevState(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage()) + n.r = rn + r := rn.raft + n.prevS = newPrevState(rn) defer closeAndFreeRaft(r) defer n.Stop() @@ -512,9 +516,10 @@ func TestNodeTick(t *testing.T) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - n.r = r - n.prevS = newPrevState(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n.r = rn + r := rn.raft + n.prevS = newPrevState(rn) elapsed := r.electionElapsed n.Tick() for len(n.tickc) != 0 { @@ -533,9 +538,9 @@ func TestNodeStop(t *testing.T) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) - n.r = r - n.prevS = newPrevState(r) + rn := newTestRawNode(1, []uint64{1}, 10, 1, s) + n.r = rn + n.prevS = newPrevState(rn) n.NotifyEventCh() donec := make(chan struct{}) @@ -654,7 +659,9 @@ func TestNodeStart(t *testing.T) { n.Advance(g) } - n.Campaign(ctx) + if err := n.Campaign(ctx); err != nil { + t.Fatal(err) + } rd, _ := n.StepNode(true, false) storage.Append(rd.Entries) n.Advance(rd) @@ -685,10 +692,12 @@ func TestNodeRestart(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 1} want := Ready{ - HardState: st, + // No HardState is emitted because there was no change. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries[:st.Commit], - MustSync: true, + // MustSync is false because no HardState or new entries are provided. + MustSync: false, } storage := NewMemoryStorage() @@ -753,10 +762,14 @@ func TestNodeRestartFromSnapshot(t *testing.T) { st := raftpb.HardState{Term: 1, Commit: 3} want := Ready{ - HardState: st, + // No HardState is emitted because nothing changed relative to what is + // already persisted. + HardState: raftpb.HardState{}, // commit up to index commit index in st CommittedEntries: entries, - MustSync: true, + // MustSync is only true when there is a new HardState or new entries; + // neither is the case here. + MustSync: false, } s := NewMemoryStorage() @@ -902,7 +915,7 @@ func TestNodeProposeAddLearnerNode(t *testing.T) { n := newNode() s := NewMemoryStorage() defer s.Close() - r := newTestRaft(1, []uint64{1}, 10, 1, s) + r := newTestRawNode(1, []uint64{1}, 10, 1, s) n.r = r n.prevS = newPrevState(r) n.Campaign(context.TODO()) @@ -1003,7 +1016,10 @@ func TestCommitPagination(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxSizePerMsg = 2048 - r := newRaft(cfg) + r, err := NewRawNode(cfg) + if err != nil { + t.Fatal(err) + } n := newNode() n.r = r n.prevS = newPrevState(r) @@ -1090,7 +1106,10 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) { // this and *will* return it (which is how the Commit index ended up being 10 initially). cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 - r := newRaft(cfg) + r, err := NewRawNode(cfg) + if err != nil { + t.Fatal(err) + } n := newNode() defer s.Close() n.r = r @@ -1114,7 +1133,10 @@ func TestNodeCommitEntriesWhileNoMoreApply(t *testing.T) { s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) cfg.MaxSizePerMsg = 2048 - r := newRaft(cfg) + r, err := NewRawNode(cfg) + if err != nil { + t.Fatal(err) + } n := newNode() n.r = r n.prevS = newPrevState(r) diff --git a/raft/raft_test.go b/raft/raft_test.go index 460f123..191b269 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4600,3 +4600,14 @@ func newTestLearnerRaft(id uint64, peers []uint64, learners []uint64, election, cfg.learners = peerGrps return newRaft(cfg) } + +// newTestRawNode sets up a RawNode with the given peers. The configuration will +// not be reflected in the Storage. +func newTestRawNode(id uint64, peers []uint64, election, heartbeat int, storage Storage) *RawNode { + cfg := newTestConfig(id, peers, election, heartbeat, storage) + rn, err := NewRawNode(cfg) + if err != nil { + panic(err) + } + return rn +} diff --git a/raft/rawnode.go b/raft/rawnode.go index ad0ce4a..3773327 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -36,38 +36,14 @@ type RawNode struct { prevHardSt pb.HardState } -func (rn *RawNode) newReady() Ready { - return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt, true) -} - -func (rn *RawNode) commitReady(rd Ready) { - if rd.SoftState != nil { - rn.prevSoftSt = rd.SoftState - } - if !IsEmptyHardState(rd.HardState) { - rn.prevHardSt = rd.HardState - } - // If entries were applied (or a snapshot), update our cursor for - // the next Ready. Note that if the current HardState contains a - // new Commit index, this does not mean that we're also applying - // all of the new entries due to commit pagination by size. - if index := rd.appliedCursor(); index > 0 { - rn.raft.raftLog.appliedTo(index) - } - if len(rd.Entries) > 0 { - e := rd.Entries[len(rd.Entries)-1] - rn.raft.raftLog.stableTo(e.Index, e.Term) - } - if !IsEmptySnap(rd.Snapshot) { - rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) - } - if len(rd.ReadStates) != 0 { - rn.raft.readStates = nil - } -} - -// NewRawNode returns a new RawNode given configuration and a list of raft peers. -func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { +// NewRawNode instantiates a RawNode from the given configuration. +// +// See Bootstrap() for bootstrapping an initial state; this replaces the former +// 'peers' argument to this method (with identical behavior). However, It is +// recommended that instead of calling Bootstrap, applications bootstrap their +// state manually by setting up a Storage that has a first index > 1 and which +// stores the desired ConfState as its InitialState. +func NewRawNode(config *Config) (*RawNode, error) { if config.ID == 0 { panic("config.ID must not be zero") } @@ -75,44 +51,10 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { rn := &RawNode{ raft: r, } - lastIndex, err := config.Storage.LastIndex() - if err != nil { - panic(err) // TODO(bdarnell) - } - // If the log is empty, this is a new RawNode (like StartNode); otherwise it's - // restoring an existing RawNode (like RestartNode). - // TODO(bdarnell): rethink RawNode initialization and whether the application needs - // to be able to tell us when it expects the RawNode to exist. - if lastIndex == 0 { - r.becomeFollower(1, None) - ents := make([]pb.Entry, len(peers)) - for i, peer := range peers { - cc := pb.ConfChange{Type: pb.ConfChangeAddNode, - ReplicaID: peer.ReplicaID, - NodeGroup: pb.Group{NodeId: peer.NodeID, GroupId: r.group.GroupId, RaftReplicaId: peer.ReplicaID}, - Context: peer.Context} - data, err := cc.Marshal() - if err != nil { - panic("unexpected marshal error") - } - - ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} - } - r.raftLog.append(ents...) - r.raftLog.committed = uint64(len(ents)) - for _, peer := range peers { - r.addNode(peer.ReplicaID, - pb.Group{NodeId: peer.NodeID, GroupId: r.group.GroupId, RaftReplicaId: peer.ReplicaID}) - } - } // Set the initial hard and soft states after performing all initialization. rn.prevSoftSt = r.softState() - if lastIndex == 0 { - rn.prevHardSt = emptyState - } else { - rn.prevHardSt = r.hardState() - } + rn.prevHardSt = r.hardState() return rn, nil } @@ -200,13 +142,35 @@ func (rn *RawNode) Step(m pb.Message) error { return ErrStepPeerNotFound } -// Ready returns the current point-in-time state of this RawNode. -func (rn *RawNode) Ready() Ready { - rd := rn.newReady() - rn.raft.msgs = nil +// Ready returns the outstanding work that the application needs to handle. This +// includes appending and applying entries or a snapshot, updating the HardState, +// and sending messages. The returned Ready() *must* be handled and subsequently +// passed back via Advance(). +func (rn *RawNode) Ready(moreEntriesToApply bool) Ready { + rd := rn.readyWithoutAccept(moreEntriesToApply) + rn.acceptReady(rd) return rd } +// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there +// is no obligation that the Ready must be handled. +func (rn *RawNode) readyWithoutAccept(moreEntriesToApply bool) Ready { + return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt, moreEntriesToApply) +} + +// acceptReady is called when the consumer of the RawNode has decided to go +// ahead and handle a Ready. Nothing must alter the state of the RawNode between +// this call and the prior call to Ready(). +func (rn *RawNode) acceptReady(rd Ready) { + if rd.SoftState != nil { + rn.prevSoftSt = rd.SoftState + } + if len(rd.ReadStates) != 0 { + rn.raft.readStates = nil + } + rn.raft.msgs = nil +} + // HasReady called when RawNode user need to check if any Ready pending. // Checking logic in this method should be consistent with Ready.containsUpdates(). func (rn *RawNode) HasReady() bool { @@ -232,7 +196,10 @@ func (rn *RawNode) HasReady() bool { // Advance notifies the RawNode that the application has applied and saved progress in the // last Ready results. func (rn *RawNode) Advance(rd Ready) { - rn.commitReady(rd) + if !IsEmptyHardState(rd.HardState) { + rn.prevHardSt = rd.HardState + } + rn.raft.advance(rd) } // Status returns the current status of the given group. diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 3b478fa..b5f82dc 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -17,6 +17,7 @@ package raft import ( "bytes" "fmt" + "math" "reflect" "testing" @@ -26,20 +27,44 @@ import ( // TestRawNodeStep ensures that RawNode.Step ignore local message. func TestRawNodeStep(t *testing.T) { for i, msgn := range raftpb.MessageType_name { - s := NewMemoryStorage() - defer s.Close() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{NodeID: 1, ReplicaID: 1}}) - if err != nil { - t.Fatal(err) - } - msgt := raftpb.MessageType(i) - err = rawNode.Step(raftpb.Message{Type: msgt}) - // LocalMsg should be ignored. - if IsLocalMsg(msgt) { - if err != ErrStepLocalMsg { - t.Errorf("%d: step should ignore %s", msgt, msgn) + t.Run(msgn, func(t *testing.T) { + s := NewMemoryStorage() + defer s.Close() + s.SetHardState(raftpb.HardState{Term: 1, Commit: 1}) + s.Append([]raftpb.Entry{{Term: 1, Index: 1}}) + peerGrps := make([]*raftpb.Group, 0) + for _, pid := range []uint64{1} { + grp := raftpb.Group{ + NodeId: pid, + RaftReplicaId: pid, + GroupId: 1, + } + peerGrps = append(peerGrps, &grp) } - } + if err := s.ApplySnapshot(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{ + ConfState: raftpb.ConfState{ + Nodes: []uint64{1}, + Groups: peerGrps, + }, + Index: 1, + Term: 1, + }}); err != nil { + t.Fatal(err) + } + + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s)) + if err != nil { + t.Fatal(err) + } + msgt := raftpb.MessageType(i) + err = rawNode.Step(raftpb.Message{Type: msgt}) + // LocalMsg should be ignored. + if IsLocalMsg(msgt) { + if err != ErrStepLocalMsg { + t.Errorf("%d: step should ignore %s", msgt, msgn) + } + } + }) } } @@ -52,17 +77,10 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { s := NewMemoryStorage() defer s.Close() var err error - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{NodeID: 1, ReplicaID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - s.Append(rd.Entries) - rawNode.Advance(rd) - - if d := rawNode.Ready(); d.MustSync || !IsEmptyHardState(d.HardState) || len(d.Entries) > 0 { - t.Fatalf("expected empty hard state with must-sync=false: %#v", d) - } rawNode.Campaign() proposed := false @@ -71,11 +89,14 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { ccdata []byte ) for { - rd = rawNode.Ready() + rd := rawNode.Ready(true) s.Append(rd.Entries) + rawNode.Advance(rd) // Once we are the leader, propose a command and a ConfChange. if !proposed && rd.SoftState.Lead == rawNode.raft.id { - rawNode.Propose([]byte("somedata")) + if err = rawNode.Propose([]byte("somedata")); err != nil { + t.Fatal(err) + } grp := raftpb.Group{ NodeId: 1, @@ -90,16 +111,13 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { rawNode.ProposeConfChange(cc) proposed = true - } - rawNode.Advance(rd) - - // Exit when we have four entries: one ConfChange, one no-op for the election, - // our proposed command and proposed ConfChange. - lastIndex, err = s.LastIndex() - if err != nil { - t.Fatal(err) - } - if lastIndex >= 4 { + } else if proposed { + // We proposed last cycle, which means we appended the conf change + // in this cycle. + lastIndex, err = s.LastIndex() + if err != nil { + t.Fatal(err) + } break } } @@ -127,17 +145,17 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { func TestRawNodeProposeAddDuplicateNode(t *testing.T) { s := NewMemoryStorage() defer s.Close() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), []Peer{{NodeID: 1, ReplicaID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, s)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() + rd := rawNode.Ready(true) s.Append(rd.Entries) rawNode.Advance(rd) rawNode.Campaign() for { - rd = rawNode.Ready() + rd = rawNode.Ready(true) s.Append(rd.Entries) if rd.SoftState.Lead == rawNode.raft.id { rawNode.Advance(rd) @@ -148,7 +166,7 @@ func TestRawNodeProposeAddDuplicateNode(t *testing.T) { proposeConfChangeAndApply := func(cc raftpb.ConfChange) { rawNode.ProposeConfChange(cc) - rd = rawNode.Ready() + rd = rawNode.Ready(true) s.Append(rd.Entries) for _, entry := range rd.CommittedEntries { if entry.Type == raftpb.EntryConfChange { @@ -211,8 +229,8 @@ func TestRawNodeReadIndex(t *testing.T) { s := NewMemoryStorage() defer s.Close() - c := newTestConfig(1, nil, 10, 1, s) - rawNode, err := NewRawNode(c, []Peer{{NodeID: 1, ReplicaID: 1}}) + c := newTestConfig(1, []uint64{1}, 10, 1, s) + rawNode, err := NewRawNode(c) if err != nil { t.Fatal(err) } @@ -222,7 +240,7 @@ func TestRawNodeReadIndex(t *testing.T) { if !hasReady { t.Errorf("HasReady() returns %t, want %t", hasReady, true) } - rd := rawNode.Ready() + rd := rawNode.Ready(true) if !reflect.DeepEqual(rd.ReadStates, wrs) { t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs) } @@ -236,7 +254,7 @@ func TestRawNodeReadIndex(t *testing.T) { wrequestCtx := []byte("somedata2") rawNode.Campaign() for { - rd = rawNode.Ready() + rd = rawNode.Ready(true) s.Append(rd.Entries) if rd.SoftState.Lead == rawNode.raft.id { @@ -270,71 +288,119 @@ func TestRawNodeReadIndex(t *testing.T) { // TestNodeStop from node_test.go has no equivalent in rawNode because there is // no goroutine in RawNode. -// TestRawNodeStart ensures that a node can be started correctly. The node should -// start with correct configuration change entries, and can accept and commit -// proposals. +// TestRawNodeStart ensures that a node can be started correctly. Note that RawNode +// requires the application to bootstrap the state, i.e. it does not accept peers +// and will not create faux configuration change entries. func TestRawNodeStart(t *testing.T) { + want := Ready{ + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: raftpb.HardState{Term: 1, Commit: 3, Vote: 1}, + Entries: []raftpb.Entry{ + {Term: 1, Index: 2, Data: nil}, + {Term: 1, Index: 3, Data: []byte("foo")}, + }, + CommittedEntries: []raftpb.Entry{ + {Term: 1, Index: 2, Data: nil}, + {Term: 1, Index: 3, Data: []byte("foo")}}, + MustSync: true, + } + + storage := NewRealMemoryStorage() + defer storage.Close() + storage.ents[0].Index = 1 + + // TODO(tbg): this is a first prototype of what bootstrapping could look + // like (without the annoying faux ConfChanges). We want to persist a + // ConfState at some index and make sure that this index can't be reached + // from log position 1, so that followers are forced to pick up the + // ConfState in order to move away from log position 1 (unless they got + // bootstrapped in the same way already). Failing to do so would mean that + // followers diverge from the bootstrapped nodes and don't learn about the + // initial config. + // + // NB: this is exactly what CockroachDB does. The Raft log really begins at + // index 10, so empty followers (at index 1) always need a snapshot first. + type appenderStorage interface { + Storage + ApplySnapshot(raftpb.Snapshot) error + } + bootstrap := func(storage appenderStorage, cs raftpb.ConfState) error { + if len(cs.Nodes) == 0 { + return fmt.Errorf("no voters specified") + } + fi, err := storage.FirstIndex() + if err != nil { + return err + } + if fi < 2 { + return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap") + } + if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil { + // TODO(tbg): match exact error + return fmt.Errorf("should not have been able to load first index") + } + li, err := storage.LastIndex() + if err != nil { + return err + } + if _, err = storage.Entries(li, li, math.MaxUint64); err == nil { + return fmt.Errorf("should not have been able to load last index") + } + hs, ics, err := storage.InitialState() + if err != nil { + return err + } + if !IsEmptyHardState(hs) { + return fmt.Errorf("HardState not empty") + } + if len(ics.Nodes) != 0 { + return fmt.Errorf("ConfState not empty") + } + + meta := raftpb.SnapshotMetadata{ + Index: 1, + Term: 0, + ConfState: cs, + } + snap := raftpb.Snapshot{Metadata: meta} + return storage.ApplySnapshot(snap) + } + grp := raftpb.Group{ NodeId: 1, GroupId: 1, RaftReplicaId: 1, } - cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, ReplicaID: 1, NodeGroup: grp} - ccdata, err := cc.Marshal() - if err != nil { - t.Fatalf("unexpected marshal error: %v", err) - } - wants := []Ready{ - { - HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0}, - Entries: []raftpb.Entry{ - {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - }, - CommittedEntries: []raftpb.Entry{ - {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - }, - MustSync: true, - }, - { - HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1}, - Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}}, - MustSync: true, - }, + if err := bootstrap(storage, raftpb.ConfState{Nodes: []uint64{1}, Groups: []*raftpb.Group{&grp}}); err != nil { + t.Fatal(err) } - storage := NewMemoryStorage() - defer storage.Close() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{NodeID: 1, ReplicaID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() - t.Logf("rd %v", rd) - if !reflect.DeepEqual(rd, wants[0]) { - t.Fatalf("#%d: g = %+v,\n w %+v", 1, rd, wants[0]) - } else { - storage.Append(rd.Entries) - rawNode.Advance(rd) + + if rawNode.HasReady() { + t.Fatalf("unexpected ready: %+v", rawNode.Ready(true)) } - storage.Append(rd.Entries) - rawNode.Advance(rd) rawNode.Campaign() - rd = rawNode.Ready() + rawNode.Propose([]byte("foo")) + if !rawNode.HasReady() { + t.Fatal("expected a Ready") + } + rd := rawNode.Ready(true) storage.Append(rd.Entries) rawNode.Advance(rd) - rawNode.Propose([]byte("foo")) - if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) { - t.Errorf("#%d: g = %+v,\n w %+v", 2, rd, wants[1]) - } else { - storage.Append(rd.Entries) - rawNode.Advance(rd) + rd.SoftState, want.SoftState = nil, nil + + if !reflect.DeepEqual(rd, want) { + t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want) } if rawNode.HasReady() { - t.Errorf("unexpected Ready: %+v", rawNode.Ready()) + t.Errorf("unexpected Ready: %+v", rawNode.Ready(true)) } } @@ -356,17 +422,17 @@ func TestRawNodeRestart(t *testing.T) { defer storage.Close() storage.SetHardState(st) storage.Append(entries) - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), nil) + rawNode, err := NewRawNode(newTestConfig(1, []uint64{1}, 10, 1, storage)) if err != nil { t.Fatal(err) } - rd := rawNode.Ready() + rd := rawNode.Ready(true) if !reflect.DeepEqual(rd, want) { t.Errorf("g = %+v,\n w %+v", rd, want) } rawNode.Advance(rd) if rawNode.HasReady() { - t.Errorf("unexpected Ready: %+v", rawNode.Ready()) + t.Errorf("unexpected Ready: %+v", rawNode.Ready(true)) } } @@ -405,11 +471,11 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { s.SetHardState(st) s.ApplySnapshot(snap) s.Append(entries) - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s), nil) + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, s)) if err != nil { t.Fatal(err) } - if rd := rawNode.Ready(); !reflect.DeepEqual(rd, want) { + if rd := rawNode.Ready(true); !reflect.DeepEqual(rd, want) { t.Errorf("g = %+v,\n w %+v", rd, want) } else { rawNode.Advance(rd) @@ -425,7 +491,7 @@ func TestRawNodeRestartFromSnapshot(t *testing.T) { func TestRawNodeStatus(t *testing.T) { storage := NewMemoryStorage() defer storage.Close() - rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage), []Peer{{NodeID: 1, ReplicaID: 1}}) + rawNode, err := NewRawNode(newTestConfig(1, nil, 10, 1, storage)) if err != nil { t.Fatal(err) } @@ -488,13 +554,13 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { Data: []byte("boom"), }) - rawNode, err := NewRawNode(cfg, []Peer{{NodeID: 1, ReplicaID: 1}}) + rawNode, err := NewRawNode(cfg) if err != nil { t.Fatal(err) } for highestApplied := uint64(0); highestApplied != 11; { - rd := rawNode.Ready() + rd := rawNode.Ready(true) n := len(rd.CommittedEntries) if n == 0 { t.Fatalf("stopped applying entries at index %d", highestApplied) @@ -583,3 +649,37 @@ func BenchmarkStatusProgress(b *testing.B) { }) } } + +func TestRawNodeConsumeReady(t *testing.T) { + // Check that readyWithoutAccept() does not call acceptReady (which resets + // the messages) but Ready() does. + s := NewMemoryStorage() + rn := newTestRawNode(1, []uint64{1}, 3, 1, s) + m1 := raftpb.Message{Context: []byte("foo")} + m2 := raftpb.Message{Context: []byte("bar")} + + // Inject first message, make sure it's visible via readyWithoutAccept. + rn.raft.msgs = append(rn.raft.msgs, m1) + rd := rn.readyWithoutAccept(true) + if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) { + t.Fatalf("expected only m1 sent, got %+v", rd.Messages) + } + if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) { + t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs) + } + // Now call Ready() which should move the message into the Ready (as opposed + // to leaving it in both places). + rd = rn.Ready(true) + if len(rn.raft.msgs) > 0 { + t.Fatalf("messages not reset: %+v", rn.raft.msgs) + } + if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) { + t.Fatalf("expected only m1 sent, got %+v", rd.Messages) + } + // Add a message to raft to make sure that Advance() doesn't drop it. + rn.raft.msgs = append(rn.raft.msgs, m2) + rn.Advance(rd) + if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) { + t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs) + } +}