Skip to content

Commit

Permalink
raft: use RawNode for node's event loop
Browse files Browse the repository at this point in the history
It has always bugged me that any new feature essentially needed to be
tested twice due to the two ways in which apps can use raft (`*node` and
`*RawNode`). Due to upcoming testing work for joint consensus, now is a
good time to rectify this somewhat.

This commit removes most logic from `(*node).run` and uses `*RawNode`
internally. This simplifies the logic and also lead (via debugging) to
some insight on how the semantics of the approaches differ, which is now
documented in the comments.
  • Loading branch information
tbg committed Jul 15, 2019
1 parent 5a734e7 commit 6a9d89d
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 166 deletions.
116 changes: 24 additions & 92 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,51 +198,15 @@ type Peer struct {
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
// TODO(tbg): this should append the ConfChange for the own node first
// and also call applyConfChange below for that node first. Otherwise
// we have a Raft group (for a little while) that doesn't have itself
// in its config, which is bad.
// This whole way of setting things up is rickety. The app should just
// populate the initial ConfState appropriately and then all of this
// goes away.
e := pb.Entry{
Type: pb.EntryConfChange,
Term: 1,
Index: r.raftLog.lastIndex() + 1,
Data: d,
}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = r.raftLog.lastIndex()
// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
for _, peer := range peers {
r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
rn, err := NewRawNode(c, peers)
if err != nil {
panic(err)
}

n := newNode()
n.logger = c.Logger
go n.run(r)

go n.run(rn)
return &n
}

Expand All @@ -251,12 +215,7 @@ func StartNode(c *Config, peers []Peer) Node {
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
r := newRaft(c)

n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
return StartNode(c, nil)
}

type msgWithResult struct {
Expand Down Expand Up @@ -310,30 +269,30 @@ func (n *node) Stop() {
<-n.done
}

func (n *node) run(r *raft) {
func (n *node) run(rn *RawNode) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

r := rn.raft

lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState

for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
} else if rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.Ready()
readyc = n.readyc
}

if lead != r.lead {
Expand Down Expand Up @@ -382,40 +341,13 @@ func (n *node) run(r *raft) {
case <-n.done:
}
case <-n.tickc:
r.tick()
rn.Tick()
case readyc <- rd:
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}

r.msgs = nil
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
rn.commitReady(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
Expand Down
4 changes: 2 additions & 2 deletions raft/node_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func BenchmarkOneNode(b *testing.B) {

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)

defer n.Stop()

Expand Down
88 changes: 56 additions & 32 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,12 @@ func TestNodePropose(t *testing.T) {

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
n.Campaign(context.TODO())
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
if err := n.Campaign(context.TODO()); err != nil {
t.Fatal(err)
}
for {
rd := <-n.Ready()
s.Append(rd.Entries)
Expand Down Expand Up @@ -172,10 +175,11 @@ func TestNodeReadIndex(t *testing.T) {

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
r.readStates = wrs

go n.run(r)
go n.run(rn)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
Expand Down Expand Up @@ -309,8 +313,9 @@ func TestNodeProposeConfig(t *testing.T) {

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
Expand Down Expand Up @@ -347,8 +352,8 @@ func TestNodeProposeConfig(t *testing.T) {
func TestNodeProposeAddDuplicateNode(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
n.Campaign(context.TODO())
rdyEntries := make([]raftpb.Entry, 0)
ticker := time.NewTicker(time.Millisecond * 100)
Expand Down Expand Up @@ -422,8 +427,8 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
// who is the current leader.
func TestBlockProposal(t *testing.T) {
n := newNode()
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, NewMemoryStorage())
go n.run(rn)
defer n.Stop()

errc := make(chan error, 1)
Expand Down Expand Up @@ -463,8 +468,9 @@ func TestNodeProposeWaitDropped(t *testing.T) {

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
Expand Down Expand Up @@ -497,8 +503,9 @@ func TestNodeProposeWaitDropped(t *testing.T) {
func TestNodeTick(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
r := rn.raft
go n.run(rn)
elapsed := r.electionElapsed
n.Tick()

Expand All @@ -517,11 +524,11 @@ func TestNodeTick(t *testing.T) {
func TestNodeStop(t *testing.T) {
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
donec := make(chan struct{})

go func() {
n.run(r)
n.run(rn)
close(donec)
}()

Expand Down Expand Up @@ -618,7 +625,9 @@ func TestNodeStart(t *testing.T) {
n.Advance()
}

n.Campaign(ctx)
if err := n.Campaign(ctx); err != nil {
t.Fatal(err)
}
rd := <-n.Ready()
storage.Append(rd.Entries)
n.Advance()
Expand Down Expand Up @@ -646,10 +655,12 @@ func TestNodeRestart(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 1}

want := Ready{
HardState: st,
// No HardState is emitted because there was no change.
HardState: raftpb.HardState{},
// commit up to index commit index in st
CommittedEntries: entries[:st.Commit],
MustSync: true,
// MustSync is false because no HardState or new entries are provided.
MustSync: false,
}

storage := NewMemoryStorage()
Expand Down Expand Up @@ -691,10 +702,14 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 3}

want := Ready{
HardState: st,
// No HardState is emitted because nothing changed relative to what is
// already persisted.
HardState: raftpb.HardState{},
// commit up to index commit index in st
CommittedEntries: entries,
MustSync: true,
// MustSync is only true when there is a new HardState or new entries;
// neither is the case here.
MustSync: false,
}

s := NewMemoryStorage()
Expand Down Expand Up @@ -800,8 +815,8 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
defer ticker.Stop()
n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
rn := newTestRawNode(1, []uint64{1}, 10, 1, s)
go n.run(rn)
n.Campaign(context.TODO())
stop := make(chan struct{})
done := make(chan struct{})
Expand Down Expand Up @@ -895,9 +910,12 @@ func TestCommitPagination(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxCommittedSizePerReady = 2048
r := newRaft(cfg)
rn, err := NewRawNode(cfg, nil)
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(r)
go n.run(rn)
n.Campaign(context.TODO())

rd := readyWithTimeout(&n)
Expand Down Expand Up @@ -984,9 +1002,12 @@ func TestNodeCommitPaginationAfterRestart(t *testing.T) {
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1

r := newRaft(cfg)
rn, err := NewRawNode(cfg, nil)
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(r)
go n.run(rn)
defer n.Stop()

rd := readyWithTimeout(&n)
Expand All @@ -1011,9 +1032,12 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxUncommittedEntriesSize = maxEntrySize
r := newRaft(cfg)
rn, err := NewRawNode(cfg, nil)
if err != nil {
t.Fatal(err)
}
n := newNode()
go n.run(r)
go n.run(rn)
defer n.Stop()
n.Campaign(context.TODO())

Expand All @@ -1028,14 +1052,14 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
// committing anything. These proposals should not cause the leader's
// log to grow indefinitely.
for i := 0; i < 1024; i++ {
n.Propose(context.TODO(), data)
_ = n.Propose(context.TODO(), data)
}

// Check the size of leader's uncommitted log tail. It should not exceed the
// MaxUncommittedEntriesSize limit.
checkUncommitted := func(exp uint64) {
t.Helper()
if a := r.uncommittedSize; exp != a {
if a := rn.raft.uncommittedSize; exp != a {
t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
}
}
Expand Down
Loading

0 comments on commit 6a9d89d

Please sign in to comment.