Skip to content

Commit

Permalink
raft: async leader vote
Browse files Browse the repository at this point in the history
This commit introduces an intermediate state that delays the
acknowledgement of a node's self-vote during an election until
that vote has been durably persisted (i.e. on the next call to
Advance). This change can be viewed as the election counterpart
to #14413.

This is an intermediate state that limits code movement for the
rest of the async storage writes change.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
nvanbenschoten committed Oct 27, 2022
1 parent e41a12d commit c05e48f
Show file tree
Hide file tree
Showing 15 changed files with 207 additions and 26 deletions.
23 changes: 21 additions & 2 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
// know who is the current leader; node will accept proposal when it knows
// who is the current leader.
func TestBlockProposal(t *testing.T) {
rn := newTestRawNode(1, 10, 1, newTestMemoryStorage(withPeers(1)))
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
go n.run()
defer n.Stop()
Expand All @@ -412,6 +413,9 @@ func TestBlockProposal(t *testing.T) {
}

n.Campaign(context.TODO())
rd := <-n.Ready()
s.Append(rd.Entries)
n.Advance()
select {
case err := <-errc:
if err != nil {
Expand Down Expand Up @@ -607,9 +611,14 @@ func TestNodeStart(t *testing.T) {
}

{
// Persist vote.
rd := <-n.Ready()
storage.Append(rd.Entries)
n.Advance()
// Append empty entry.
rd = <-n.Ready()
storage.Append(rd.Entries)
n.Advance()
}

n.Propose(ctx, []byte("foo"))
Expand Down Expand Up @@ -744,8 +753,12 @@ func TestNodeAdvance(t *testing.T) {
defer cancel()

n.Campaign(ctx)
// Persist vote.
rd := readyWithTimeout(n)
// Commit empty entry.
storage.Append(rd.Entries)
n.Advance()
// Append empty entry.
rd = readyWithTimeout(n)
storage.Append(rd.Entries)
n.Advance()

Expand Down Expand Up @@ -899,9 +912,15 @@ func TestCommitPagination(t *testing.T) {
defer cancel()
n.Campaign(ctx)

// Persist vote.
rd := readyWithTimeout(n)
s.Append(rd.Entries)
n.Advance()
// Append empty entry.
rd = readyWithTimeout(n)
s.Append(rd.Entries)
n.Advance()
// Apply empty entry.
rd = readyWithTimeout(n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
Expand Down
38 changes: 27 additions & 11 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ type raft struct {

msgs []pb.Message

// voteSelfOnAdvance is a marker that the local raft node should vote for
// itself upon its next call to advance. This is not meant to be the final
// approach towards handling self-votes, but it's a useful intermediate
// point to get all tests working and to write some additional tests that
// demonstrate possible race conditions when self-voting is asynchronous.
// This is replaced in a later commit.
voteSelfOnAdvance pb.Message

// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
Expand Down Expand Up @@ -594,6 +602,7 @@ func (r *raft) advance(rd Ready) {
if !IsEmptySnap(rd.Snapshot) {
r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
}
r.maybeVoteForSelf()
}

// maybeCommit attempts to advance the commit index. Returns true if
Expand All @@ -604,6 +613,22 @@ func (r *raft) maybeCommit() bool {
return r.raftLog.maybeCommit(mci, r.Term)
}

// maybeVoteForSelf attempts to inform a (pre-)candidate node that its
// vote for itself has been made durable and can now be counted towards
// the active election, if one is still ongoing. Returns true if the
// node was informed of a self-vote.
func (r *raft) maybeVoteForSelf() bool {
if r.voteSelfOnAdvance.Type == 0 {
return false
}
voteMsg := r.voteSelfOnAdvance
// NB: Clear the voteSelfOnAdvance marker before calling Step.
// Step may re-set the marker and cause us to loop.
r.voteSelfOnAdvance = pb.Message{}
_ = r.Step(voteMsg)
return true
}

func (r *raft) reset(term uint64) {
if r.Term != term {
r.Term = term
Expand Down Expand Up @@ -818,16 +843,6 @@ func (r *raft) campaign(t CampaignType) {
voteMsg = pb.MsgVote
term = r.Term
}
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
var ids []uint64
{
idMap := r.prs.Voters.IDs()
Expand All @@ -839,6 +854,7 @@ func (r *raft) campaign(t CampaignType) {
}
for _, id := range ids {
if id == r.id {
r.voteSelfOnAdvance = pb.Message{To: id, From: id, Term: term, Type: voteRespMsgType(voteMsg)}
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
Expand All @@ -848,7 +864,7 @@ func (r *raft) campaign(t CampaignType) {
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
r.send(pb.Message{To: id, Term: term, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}

Expand Down
2 changes: 2 additions & 0 deletions raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) {
for i := 1; i < 2*et; i++ {
r.tick()
}
r.maybeVoteForSelf()

if r.Term != 2 {
t.Errorf("term = %d, want 2", r.Term)
Expand Down Expand Up @@ -218,6 +219,7 @@ func TestLeaderElectionInOneRoundRPC(t *testing.T) {
r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(idsBySize(tt.size)...)))

r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
r.maybeVoteForSelf()
for id, vote := range tt.votes {
r.Step(pb.Message{From: id, To: 1, Term: r.Term, Type: pb.MsgVoteResp, Reject: !vote})
}
Expand Down
96 changes: 96 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func mustAppendEntry(r *raft, ents ...pb.Entry) {
type stateMachine interface {
Step(m pb.Message) error
readMessages() []pb.Message
maybeVoteForSelf() bool
}

func (r *raft) readMessages() []pb.Message {
Expand Down Expand Up @@ -372,6 +373,7 @@ func TestLearnerPromotion(t *testing.T) {
setRandomizedElectionTimeout(n1, n1.electionTimeout)
for i := 0; i < n1.electionTimeout; i++ {
n1.tick()
n1.maybeVoteForSelf()
}

if n1.state != StateLeader {
Expand All @@ -393,6 +395,7 @@ func TestLearnerPromotion(t *testing.T) {
setRandomizedElectionTimeout(n2, n2.electionTimeout)
for i := 0; i < n2.electionTimeout; i++ {
n2.tick()
n2.maybeVoteForSelf()
}

nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
Expand Down Expand Up @@ -690,6 +693,7 @@ func TestLearnerLogReplication(t *testing.T) {
setRandomizedElectionTimeout(n1, n1.electionTimeout)
for i := 0; i < n1.electionTimeout; i++ {
n1.tick()
n1.maybeVoteForSelf()
}

nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
Expand Down Expand Up @@ -1741,6 +1745,7 @@ func testCandidateResetTerm(t *testing.T, mt pb.MessageType) {
c.resetRandomizedElectionTimeout()
for i := 0; i < c.randomizedElectionTimeout; i++ {
c.tick()
c.maybeVoteForSelf()
}

if c.state != StateCandidate {
Expand All @@ -1763,6 +1768,90 @@ func testCandidateResetTerm(t *testing.T, mt pb.MessageType) {
}
}

func TestCandidateSelfVoteAfterLostElection(t *testing.T) {
testCandidateSelfVoteAfterLostElection(t, false)
}

func TestCandidateSelfVoteAfterLostElectionPreVote(t *testing.T) {
testCandidateSelfVoteAfterLostElection(t, true)
}

func testCandidateSelfVoteAfterLostElection(t *testing.T, preVote bool) {
sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.preVote = preVote

// n1 calls an election.
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
voteMsg := sm.voteSelfOnAdvance

// n1 hears that n2 already won the election before it has had a
// change to sync its vote to disk and account for its self-vote.
// Becomes a follower.
sm.Step(pb.Message{From: 2, To: 1, Term: sm.Term, Type: pb.MsgHeartbeat})
if sm.state != StateFollower {
t.Errorf("state = %v, want %v", sm.state, StateFollower)
}

// n1 remains a follower even after its self-vote is delivered.
sm.Step(voteMsg)
if sm.state != StateFollower {
t.Errorf("state = %v, want %v", sm.state, StateFollower)
}

// Its self-vote does not make its way to its ProgressTracker.
granted, _, _ := sm.prs.TallyVotes()
if granted != 0 {
t.Errorf("granted = %v, want %v", granted, 0)
}
}

func TestCandidateDeliversPreCandidateSelfVoteAfterBecomingCandidate(t *testing.T) {
sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.preVote = true

// n1 calls an election.
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
preVoteMsg := sm.voteSelfOnAdvance
if sm.state != StatePreCandidate {
t.Errorf("state = %v, want %v", sm.state, StatePreCandidate)
}

// n1 receives pre-candidate votes from both other peers before
// voting for itself. n1 becomes a candidate.
// NB: pre-vote messages carry the local term + 1.
sm.Step(pb.Message{From: 2, To: 1, Term: sm.Term + 1, Type: pb.MsgPreVoteResp})
sm.Step(pb.Message{From: 3, To: 1, Term: sm.Term + 1, Type: pb.MsgPreVoteResp})
if sm.state != StateCandidate {
t.Errorf("state = %v, want %v", sm.state, StateCandidate)
}

// n1 remains a candidate even after its delayed pre-vote self-vote is
// delivered.
sm.Step(preVoteMsg)
voteMsg := sm.voteSelfOnAdvance
if sm.state != StateCandidate {
t.Errorf("state = %v, want %v", sm.state, StateCandidate)
}

// Its pre-vote self-vote does not make its way to its ProgressTracker.
granted, _, _ := sm.prs.TallyVotes()
if granted != 0 {
t.Errorf("granted = %v, want %v", granted, 0)
}

// A single vote from n2 does not move n1 to the leader.
sm.Step(pb.Message{From: 2, To: 1, Term: sm.Term, Type: pb.MsgVoteResp})
if sm.state != StateCandidate {
t.Errorf("state = %v, want %v", sm.state, StateCandidate)
}

// n1 becomes the leader once its self-vote is received.
sm.Step(voteMsg)
if sm.state != StateLeader {
t.Errorf("state = %v, want %v", sm.state, StateLeader)
}
}

func TestLeaderStepdownWhenQuorumActive(t *testing.T) {
sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))

Expand Down Expand Up @@ -3372,11 +3461,15 @@ func testCampaignWhileLeader(t *testing.T, preVote bool) {
// We don't call campaign() directly because it comes after the check
// for our current state.
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
for r.maybeVoteForSelf() {
}
if r.state != StateLeader {
t.Errorf("expected single-node election to become leader but got %s", r.state)
}
term := r.Term
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
for r.maybeVoteForSelf() {
}
if r.state != StateLeader {
t.Errorf("expected to remain leader but got %s", r.state)
}
Expand Down Expand Up @@ -4744,6 +4837,8 @@ func (nw *network) send(msgs ...pb.Message) {
nw.t.Log(DescribeMessage(m, nil))
}
p.Step(m)
for p.maybeVoteForSelf() {
}
msgs = append(msgs[1:], nw.filter(p.readMessages())...)
}
}
Expand Down Expand Up @@ -4810,6 +4905,7 @@ type blackHole struct{}

func (blackHole) Step(pb.Message) error { return nil }
func (blackHole) readMessages() []pb.Message { return nil }
func (blackHole) maybeVoteForSelf() bool { return false }

var nopStepper = &blackHole{}

Expand Down
12 changes: 11 additions & 1 deletion raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ func TestRawNodeJointAutoLeave(t *testing.T) {
rd = rawNode.Ready()
t.Log(DescribeReady(rd, nil))
s.Append(rd.Entries)
rawNode.Advance(rd)
rd = rawNode.Ready()
t.Log(DescribeReady(rd, nil))
s.Append(rd.Entries)
// Check that the right ConfChange comes out.
if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
t.Fatalf("expected exactly one more entry, got %+v", rd)
Expand Down Expand Up @@ -743,11 +747,14 @@ func TestRawNodeStart(t *testing.T) {
t.Fatalf("unexpected ready: %+v", rawNode.Ready())
}
rawNode.Campaign()
rd := rawNode.Ready()
storage.Append(rd.Entries)
rawNode.Advance(rd)
rawNode.Propose([]byte("foo"))
if !rawNode.HasReady() {
t.Fatal("expected a Ready")
}
rd := rawNode.Ready()
rd = rawNode.Ready()
if !reflect.DeepEqual(entries, rd.Entries) {
t.Fatalf("expected to see entries\n%s, not\n%s", DescribeEntries(entries, nil), DescribeEntries(rd.Entries, nil))
}
Expand Down Expand Up @@ -861,6 +868,9 @@ func TestRawNodeStatus(t *testing.T) {
if err := rn.Campaign(); err != nil {
t.Fatal(err)
}
rd := rn.Ready()
s.Append(rd.Entries)
rn.Advance(rd)
status := rn.Status()
if status.Lead != 1 {
t.Fatal("not lead")
Expand Down
3 changes: 2 additions & 1 deletion raft/testdata/campaign.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ campaign 1
----
INFO 1 is starting a new election at term 0
INFO 1 became candidate at term 1
INFO 1 received MsgVoteResp from 1 at term 1
INFO 1 [logterm: 1, index: 2] sent MsgVote request to 2 at term 1
INFO 1 [logterm: 1, index: 2] sent MsgVote request to 3 at term 1

Expand All @@ -31,6 +30,8 @@ stabilize
Messages:
1->2 MsgVote Term:1 Log:1/2
1->3 MsgVote Term:1 Log:1/2
INFO 1 received MsgVoteResp from 1 at term 1
INFO 1 has received 1 MsgVoteResp votes and 0 vote rejections
> 2 receiving messages
1->2 MsgVote Term:1 Log:1/2
INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1]
Expand Down
3 changes: 2 additions & 1 deletion raft/testdata/campaign_learner_must_vote.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ campaign 2
----
INFO 2 is starting a new election at term 1
INFO 2 became candidate at term 2
INFO 2 received MsgVoteResp from 2 at term 2
INFO 2 [logterm: 1, index: 4] sent MsgVote request to 1 at term 2
INFO 2 [logterm: 1, index: 4] sent MsgVote request to 3 at term 2

Expand All @@ -67,6 +66,8 @@ HardState Term:2 Vote:2 Commit:4
Messages:
2->1 MsgVote Term:2 Log:1/4
2->3 MsgVote Term:2 Log:1/4
INFO 2 received MsgVoteResp from 2 at term 2
INFO 2 has received 1 MsgVoteResp votes and 0 vote rejections

# n2 is now campaigning while n1 is down (does not respond). The latest config
# has n3 as a voter, but n3 doesn't even have the corresponding conf change in
Expand Down
Loading

0 comments on commit c05e48f

Please sign in to comment.