Skip to content

Commit

Permalink
raft: Avoid scanning raft log in becomeLeader
Browse files Browse the repository at this point in the history
Scanning the uncommitted portion of the raft log to determine whether
there are any pending config changes can be expensive. In
cockroachdb/cockroach#18601, we've seen that a new leader can spend so
much time scanning its log post-election that it fails to send
its first heartbeats in time to prevent a second election from
starting immediately.

Instead of tracking whether a pending config change exists with a
boolean, this commit tracks the latest log index at which a pending
config change *could* exist. This is a less expensive solution to
the problem, and the impact of false positives should be minimal since
a newly-elected leader should be able to quickly commit the tail of
its log.
  • Loading branch information
bdarnell committed Apr 17, 2018
1 parent a9b9ef5 commit 4a7f0fd
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 63 deletions.
2 changes: 0 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ func (n *node) run(r *raft) {
}
case cc := <-n.confc:
if cc.NodeID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
Expand All @@ -342,7 +341,6 @@ func (n *node) run(r *raft) {
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
r.resetPendingConf()
default:
panic("unexpected conf type")
}
Expand Down
6 changes: 5 additions & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
n.Tick()
case rd := <-n.Ready():
s.Append(rd.Entries)
applied := false
for _, e := range rd.Entries {
rdyEntries = append(rdyEntries, e)
switch e.Type {
Expand All @@ -356,10 +357,13 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
n.ApplyConfChange(cc)
applyConfChan <- struct{}{}
applied = true
}
}
n.Advance()
if applied {
applyConfChan <- struct{}{}
}
}
}
}()
Expand Down
36 changes: 20 additions & 16 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,13 @@ type raft struct {
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
// New configuration is ignored if there exists unapplied configuration.
pendingConf bool
// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64

readOnly *readOnly

Expand Down Expand Up @@ -528,7 +533,7 @@ func (r *raft) reset(term uint64) {
r.prs[id].Match = r.raftLog.lastIndex()
}
}
r.pendingConf = false
r.pendingConfIndex = 0
r.readOnly = newReadOnly(r.readOnly.option)
}

Expand Down Expand Up @@ -632,12 +637,13 @@ func (r *raft) becomeLeader() {
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}

nconf := numOfPendingConf(ents)
if nconf > 1 {
panic("unexpected multiple uncommitted config entry")
}
if nconf == 1 {
r.pendingConf = true
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
if len(ents) > 0 {
r.pendingConfIndex = ents[len(ents)-1].Index
}

r.appendEntry(pb.Entry{Data: nil})
Expand Down Expand Up @@ -843,11 +849,13 @@ func stepLeader(r *raft, m pb.Message) {

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
r.pendingConf = true
}
}
r.appendEntry(m.Entries...)
Expand Down Expand Up @@ -1186,7 +1194,6 @@ func (r *raft) promotable() bool {
}

func (r *raft) addNode(id uint64) {
r.pendingConf = false
if _, ok := r.prs[id]; ok {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
Expand All @@ -1202,7 +1209,6 @@ func (r *raft) addNode(id uint64) {

func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false

// do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 {
Expand All @@ -1220,8 +1226,6 @@ func (r *raft) removeNode(id uint64) {
}
}

func (r *raft) resetPendingConf() { r.pendingConf = false }

func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
}
Expand Down
61 changes: 19 additions & 42 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2499,8 +2499,8 @@ func TestStepConfig(t *testing.T) {
if g := r.raftLog.lastIndex(); g != index+1 {
t.Errorf("index = %d, want %d", g, index+1)
}
if !r.pendingConf {
t.Errorf("pendingConf = %v, want true", r.pendingConf)
if r.pendingConfIndex != index+1 {
t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1)
}
}

Expand All @@ -2514,7 +2514,7 @@ func TestStepIgnoreConfig(t *testing.T) {
r.becomeLeader()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
index := r.raftLog.lastIndex()
pendingConf := r.pendingConf
pendingConfIndex := r.pendingConfIndex
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
ents, err := r.raftLog.entries(index+1, noLimit)
Expand All @@ -2524,57 +2524,39 @@ func TestStepIgnoreConfig(t *testing.T) {
if !reflect.DeepEqual(ents, wents) {
t.Errorf("ents = %+v, want %+v", ents, wents)
}
if r.pendingConf != pendingConf {
t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf)
if r.pendingConfIndex != pendingConfIndex {
t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex)
}
}

// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag
// TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex
// based on uncommitted entries.
func TestRecoverPendingConfig(t *testing.T) {
func TestNewLeaderPendingConfig(t *testing.T) {
tests := []struct {
entType pb.EntryType
wpending bool
addEntry bool
wpendingIndex uint64
}{
{pb.EntryNormal, false},
{pb.EntryConfChange, true},
{false, 0},
{true, 1},
}
for i, tt := range tests {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: tt.entType})
if tt.addEntry {
r.appendEntry(pb.Entry{Type: pb.EntryNormal})
}
r.becomeCandidate()
r.becomeLeader()
if r.pendingConf != tt.wpending {
t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending)
if r.pendingConfIndex != tt.wpendingIndex {
t.Errorf("#%d: pendingConfIndex = %d, want %d",
i, r.pendingConfIndex, tt.wpendingIndex)
}
}
}

// TestRecoverDoublePendingConfig tests that new leader will panic if
// there exist two uncommitted config entries.
func TestRecoverDoublePendingConfig(t *testing.T) {
func() {
defer func() {
if err := recover(); err == nil {
t.Errorf("expect panic, but nothing happens")
}
}()
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.becomeCandidate()
r.becomeLeader()
}()
}

// TestAddNode tests that addNode could update pendingConf and nodes correctly.
// TestAddNode tests that addNode could update nodes correctly.
func TestAddNode(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes()
wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) {
Expand All @@ -2586,7 +2568,6 @@ func TestAddNode(t *testing.T) {
// immediately when checkQuorum is set.
func TestAddNodeCheckQuorum(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.checkQuorum = true

r.becomeCandidate()
Expand Down Expand Up @@ -2617,15 +2598,11 @@ func TestAddNodeCheckQuorum(t *testing.T) {
}
}

// TestRemoveNode tests that removeNode could update pendingConf, nodes and
// TestRemoveNode tests that removeNode could update nodes and
// and removed list correctly.
func TestRemoveNode(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w)
Expand Down
2 changes: 0 additions & 2 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
// ApplyConfChange applies a config change to the local node.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.NodeID == None {
rn.raft.resetPendingConf()
return &pb.ConfState{Nodes: rn.raft.nodes()}
}
switch cc.Type {
Expand All @@ -178,7 +177,6 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
case pb.ConfChangeRemoveNode:
rn.raft.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
rn.raft.resetPendingConf()
default:
panic("unexpected conf type")
}
Expand Down

0 comments on commit 4a7f0fd

Please sign in to comment.