Skip to content

Commit

Permalink
raft: let raft step return error when proposal is dropped to allow fa…
Browse files Browse the repository at this point in the history
…il-fast.
  • Loading branch information
absolute8511 committed Jan 8, 2018
1 parent 7617b92 commit 3bc5754
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 49 deletions.
54 changes: 40 additions & 14 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type Node interface {
Campaign(ctx context.Context) error
// Propose proposes that data be appended to the log.
Propose(ctx context.Context, data []byte) error
ProposeWithCancel(ctx context.Context, cancel context.CancelFunc, data []byte) error
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
Expand Down Expand Up @@ -224,10 +225,15 @@ func RestartNode(c *Config) Node {
return &n
}

type messageWithCancel struct {
m pb.Message
cancel context.CancelFunc
}

// node is the canonical implementation of the Node interface
type node struct {
propc chan pb.Message
recvc chan pb.Message
propc chan messageWithCancel
recvc chan messageWithCancel
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
Expand All @@ -242,8 +248,8 @@ type node struct {

func newNode() node {
return node{
propc: make(chan pb.Message),
recvc: make(chan pb.Message),
propc: make(chan messageWithCancel),
recvc: make(chan messageWithCancel),
confc: make(chan pb.ConfChange),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
Expand Down Expand Up @@ -271,7 +277,7 @@ func (n *node) Stop() {
}

func (n *node) run(r *raft) {
var propc chan pb.Message
var propc chan messageWithCancel
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei, prevLastUnstablet uint64
Expand Down Expand Up @@ -314,13 +320,21 @@ func (n *node) run(r *raft) {
// TODO: maybe buffer the config propose if there exists one (the way
// described in raft dissertation)
// Currently it is dropped in Step silently.
case m := <-propc:
case mc := <-propc:
m := mc.m
m.From = r.id
r.Step(m)
case m := <-n.recvc:
err := r.Step(m)
if err == ErrProposalDropped && mc.cancel != nil {
mc.cancel()
}
case mc := <-n.recvc:
m := mc.m
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m) // raft never returns an error
err := r.Step(m) // raft never returns an error
if err == ErrProposalDropped && mc.cancel != nil {
mc.cancel()
}
}
case cc := <-n.confc:
if cc.NodeID == None {
Expand Down Expand Up @@ -409,6 +423,13 @@ func (n *node) Propose(ctx context.Context, data []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

func (n *node) ProposeWithCancel(ctx context.Context, cancel context.CancelFunc, data []byte) error {
return n.stepWithCancel(ctx, cancel, pb.Message{
Type: pb.MsgProp,
Entries: []pb.Entry{{Data: data}},
})
}

func (n *node) Step(ctx context.Context, m pb.Message) error {
// ignore unexpected local messages receiving over network
if IsLocalMsg(m.Type) {
Expand All @@ -429,13 +450,18 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) step(ctx context.Context, m pb.Message) error {
return n.stepWithCancel(ctx, nil, m)
}

func (n *node) stepWithCancel(ctx context.Context, cancel context.CancelFunc, m pb.Message) error {
mc := messageWithCancel{m: m, cancel: cancel}
ch := n.recvc
if m.Type == pb.MsgProp {
if mc.m.Type == pb.MsgProp {
ch = n.propc
}

select {
case ch <- m:
case ch <- mc:
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -478,7 +504,7 @@ func (n *node) Status() Status {

func (n *node) ReportUnreachable(id uint64) {
select {
case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
case n.recvc <- messageWithCancel{m: pb.Message{Type: pb.MsgUnreachable, From: id}}:
case <-n.done:
}
}
Expand All @@ -487,15 +513,15 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
rej := status == SnapshotFailure

select {
case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
case n.recvc <- messageWithCancel{m: pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}}:
case <-n.done:
}
}

func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
select {
// manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
case n.recvc <- messageWithCancel{m: pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}}:
case <-n.done:
case <-ctx.Done():
}
Expand Down
114 changes: 107 additions & 7 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"reflect"
"strings"
"testing"
"time"

Expand All @@ -30,8 +31,8 @@ import (
func TestNodeStep(t *testing.T) {
for i, msgn := range raftpb.MessageType_name {
n := &node{
propc: make(chan raftpb.Message, 1),
recvc: make(chan raftpb.Message, 1),
propc: make(chan messageWithCancel, 1),
recvc: make(chan messageWithCancel, 1),
}
msgt := raftpb.MessageType(i)
n.Step(context.TODO(), raftpb.Message{Type: msgt})
Expand Down Expand Up @@ -64,7 +65,7 @@ func TestNodeStep(t *testing.T) {
func TestNodeStepUnblock(t *testing.T) {
// a node without buffer to block step
n := &node{
propc: make(chan raftpb.Message),
propc: make(chan messageWithCancel),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -109,8 +110,9 @@ func TestNodeStepUnblock(t *testing.T) {
// TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
func TestNodePropose(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down Expand Up @@ -147,8 +149,9 @@ func TestNodePropose(t *testing.T) {
// It also ensures that ReadState can be read out through ready chan.
func TestNodeReadIndex(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

Expand Down Expand Up @@ -214,7 +217,10 @@ func TestDisableProposalForwarding(t *testing.T) {
}

// send proposal to r3(follower) where DisableProposalForwarding is true
r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
err := r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
if err != ErrProposalDropped {
t.Fatalf("should return drop proposal error while disable proposal forwarding")
}

// verify r3(follower) does not forward the proposal when DisableProposalForwarding is true
if len(r3.msgs) != 0 {
Expand Down Expand Up @@ -284,8 +290,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
// to the underlying raft.
func TestNodeProposeConfig(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down Expand Up @@ -426,6 +433,99 @@ func TestBlockProposal(t *testing.T) {
}
}

func TestNodeDropPropose(t *testing.T) {
msgs := []raftpb.Message{}
droppingMsg := []byte("test_dropping")
normalMsg := []byte("normal_message")
normalDoneCh := make(chan bool)
dropStep := func(r *raft, m raftpb.Message) error {
if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(droppingMsg)) {
t.Logf("dropping message: %v", m.String())
return ErrProposalDropped
}
if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(normalMsg)) {
close(normalDoneCh)
}
msgs = append(msgs, m)
return nil
}

n := newNode()
s := NewMemoryStorage()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
go n.run(r)
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
s.Append(rd.Entries)
// change the step function to dropStep until this raft becomes leader
if rd.SoftState.Lead == r.id {
r.step = dropStep
n.Advance()
break
}
n.Advance()
}

proposalTimeout := time.Second
ctx, cancel := context.WithTimeout(context.Background(), proposalTimeout)
// propose with cancel should be cancelled earyly if dropped
err := n.ProposeWithCancel(ctx, cancel, droppingMsg)
if err != nil {
t.Errorf("should propose success: %v", err)
}
select {
case <-ctx.Done():
if ctx.Err() != context.Canceled {
t.Errorf("should cancel propose for dropped proposal with cancel")
}
case <-time.After(proposalTimeout / 2):
t.Errorf("should return early for dropped proposal")
}
cancel()

ctx, cancel = context.WithTimeout(context.Background(), time.Second)
// other propose should wait until timeout if dropped
err = n.Propose(ctx, droppingMsg)
if err != nil {
t.Errorf("should propose success: %v", err)
}
select {
case <-ctx.Done():
if ctx.Err() != context.DeadlineExceeded {
t.Errorf("should timeout propose for dropped proposal with no cancel")
}
case <-time.After(proposalTimeout * 2):
t.Errorf("should return early for dropped proposal")
}
cancel()

ctx, cancel = context.WithTimeout(context.Background(), proposalTimeout)
err = n.ProposeWithCancel(ctx, cancel, normalMsg)
if err != nil {
t.Errorf("should propose success: %v", err)
}
select {
case <-ctx.Done():
t.Errorf("should not fail for normal proposal: %v", ctx.Err())
case <-time.After(proposalTimeout):
t.Errorf("should return early for normal proposal")
case <-normalDoneCh:
}
cancel()

n.Stop()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
}
if msgs[0].Type != raftpb.MsgProp {
t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
}
if !bytes.Equal(msgs[0].Entries[0].Data, normalMsg) {
t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, normalMsg)
}
}

// TestNodeTick ensures that node.Tick() will increase the
// elapsed of the underlying raft state machine.
func TestNodeTick(t *testing.T) {
Expand Down
Loading

0 comments on commit 3bc5754

Please sign in to comment.