Skip to content

Commit

Permalink
raft: let raft step return error when proposal is dropped to allow fa…
Browse files Browse the repository at this point in the history
…il-fast.
  • Loading branch information
absolute8511 committed Jan 11, 2018
1 parent 52f73c5 commit b5b998b
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 29 deletions.
63 changes: 63 additions & 0 deletions CODE_OF_CONDUCT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
## CoreOS Community Code of Conduct

### Contributor Code of Conduct

As contributors and maintainers of this project, and in the interest of
fostering an open and welcoming community, we pledge to respect all people who
contribute through reporting issues, posting feature requests, updating
documentation, submitting pull requests or patches, and other activities.

We are committed to making participation in this project a harassment-free
experience for everyone, regardless of level of experience, gender, gender
identity and expression, sexual orientation, disability, personal appearance,
body size, race, ethnicity, age, religion, or nationality.

Examples of unacceptable behavior by participants include:

* The use of sexualized language or imagery
* Personal attacks
* Trolling or insulting/derogatory comments
* Public or private harassment
* Publishing others' private information, such as physical or electronic addresses, without explicit permission
* Other unethical or unprofessional conduct.

Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct. By adopting this Code of Conduct,
project maintainers commit themselves to fairly and consistently applying these
principles to every aspect of managing this project. Project maintainers who do
not follow or enforce the Code of Conduct may be permanently removed from the
project team.

This code of conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community.

Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting a project maintainer, Brandon Philips
<brandon.philips@coreos.com>, and/or Meghan Schofield
<meghan.schofield@coreos.com>.

This Code of Conduct is adapted from the Contributor Covenant
(http://contributor-covenant.org), version 1.2.0, available at
http://contributor-covenant.org/version/1/2/0/

### CoreOS Events Code of Conduct

CoreOS events are working conferences intended for professional networking and
collaboration in the CoreOS community. Attendees are expected to behave
according to professional standards and in accordance with their employer’s
policies on appropriate workplace behavior.

While at CoreOS events or related social networking opportunities, attendees
should not engage in discriminatory or offensive speech or actions including
but not limited to gender, sexuality, race, age, disability, or religion.
Speakers should be especially aware of these concerns.

CoreOS does not condone any statements by speakers contrary to these standards.
CoreOS reserves the right to deny entrance and/or eject from an event (without
refund) any individual found to be engaging in discriminatory or offensive
speech or actions.

Please bring any concerns to the immediate attention of designated on-site
staff, Brandon Philips <brandon.philips@coreos.com>, and/or Meghan Schofield
<meghan.schofield@coreos.com>.
10 changes: 8 additions & 2 deletions etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ GET gets the key or a range of keys [key, range_end) if range_end is given.

RPC: Range

RPC: Range

#### Options

- hex -- print out key and value as hex encode string
Expand Down Expand Up @@ -182,6 +184,8 @@ Removes the specified key or range of keys [key, range_end) if range_end is give

RPC: DeleteRange

RPC: DeleteRange

#### Options

- prefix -- delete keys by matching prefix
Expand Down Expand Up @@ -343,6 +347,8 @@ Watch watches events stream on keys or prefixes, [key or prefix, range_end) if r

RPC: Watch

RPC: Watch

#### Options

- hex -- print out key and value as hex encode string
Expand Down Expand Up @@ -1367,7 +1373,7 @@ RPC: UserRevokeRole

The approximate total number of keys transferred to the destination cluster, updated every 30 seconds.

#### Examples
- dest-cert -- TLS certificate file for destination cluster

```
./etcdctl make-mirror mirror.example.com:2379
Expand Down Expand Up @@ -1434,7 +1440,7 @@ Prints the version of etcdctl.

Prints etcd version and API version.

#### Examples
No output on success.

```bash
./etcdctl version
Expand Down
40 changes: 40 additions & 0 deletions etcdserver/etcdserverpb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mvcc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}

tmpb, berr := tmptx.CreateBucketIfNotExists(next)
tmpb.FillPercent = 0.9 // for seq write in for each
if berr != nil {
return berr
}
Expand Down
9 changes: 6 additions & 3 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ func TestNodeStepUnblock(t *testing.T) {
// TestNodePropose ensures that node.Propose sends the given proposal to the underlying raft.
func TestNodePropose(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down Expand Up @@ -147,8 +148,9 @@ func TestNodePropose(t *testing.T) {
// It also ensures that ReadState can be read out through ready chan.
func TestNodeReadIndex(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}

Expand Down Expand Up @@ -284,8 +286,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
// to the underlying raft.
func TestNodeProposeConfig(t *testing.T) {
msgs := []raftpb.Message{}
appendStep := func(r *raft, m raftpb.Message) {
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}

n := newNode()
Expand Down
50 changes: 29 additions & 21 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
campaignTransfer CampaignType = "CampaignTransfer"
)

var ErrProposalDropped = errors.New("raft proposal dropped")

// lockedRand is a small wrapper around rand.Rand to provide
// synchronization. Only the methods needed by the code are exposed
// (e.g. Intn).
Expand Down Expand Up @@ -872,25 +874,28 @@ func (r *raft) Step(m pb.Message) error {
}

default:
r.step(r, m)
err := r.step(r, m)
if err != nil {
return err
}
}
return nil
}

type stepFunc func(r *raft, m pb.Message)
type stepFunc func(r *raft, m pb.Message) error

func stepLeader(r *raft, m pb.Message) {
func stepLeader(r *raft, m pb.Message) error {
// These message types do not require any progress for m.From.
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return
return nil
case pb.MsgCheckQuorum:
if !r.checkQuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
}
return
return nil
case pb.MsgProp:
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
Expand All @@ -899,11 +904,11 @@ func stepLeader(r *raft, m pb.Message) {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return
return ErrProposalDropped
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return
return ErrProposalDropped
}

for i, e := range m.Entries {
Expand All @@ -919,7 +924,7 @@ func stepLeader(r *raft, m pb.Message) {
}
r.appendEntry(m.Entries...)
r.bcastAppend()
return
return nil
case pb.MsgReadIndex:
if r.quorum() > 1 {
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
Expand All @@ -946,14 +951,14 @@ func stepLeader(r *raft, m pb.Message) {
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
}

return
return nil
}

// All other message types require a progress for m.From (pr).
pr := r.getProgress(m.From)
if pr == nil {
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
return
return nil
}
switch m.Type {
case pb.MsgAppResp:
Expand Down Expand Up @@ -1009,12 +1014,12 @@ func stepLeader(r *raft, m pb.Message) {
}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return
return nil
}

ackCount := r.readOnly.recvAck(m)
if ackCount < r.quorum() {
return
return nil
}

rss := r.readOnly.advance(m)
Expand All @@ -1028,7 +1033,7 @@ func stepLeader(r *raft, m pb.Message) {
}
case pb.MsgSnapStatus:
if pr.State != ProgressStateSnapshot {
return
return nil
}
if !m.Reject {
pr.becomeProbe()
Expand Down Expand Up @@ -1060,14 +1065,14 @@ func stepLeader(r *raft, m pb.Message) {
if lastLeadTransferee == leadTransferee {
r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
r.id, r.Term, leadTransferee, leadTransferee)
return
return nil
}
r.abortLeaderTransfer()
r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
}
if leadTransferee == r.id {
r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
return
return nil
}
// Transfer leadership to third party.
r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
Expand All @@ -1081,11 +1086,12 @@ func stepLeader(r *raft, m pb.Message) {
r.sendAppend(leadTransferee)
}
}
return nil
}

// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) {
func stepCandidate(r *raft, m pb.Message) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
Expand All @@ -1098,7 +1104,7 @@ func stepCandidate(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
r.handleAppendEntries(m)
Expand All @@ -1125,9 +1131,10 @@ func stepCandidate(r *raft, m pb.Message) {
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}

func stepFollower(r *raft, m pb.Message) {
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
Expand All @@ -1154,7 +1161,7 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgTransferLeader:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
return
return nil
}
m.To = r.lead
r.send(m)
Expand All @@ -1171,17 +1178,18 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgReadIndex:
if r.lead == None {
r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
return
return nil
}
m.To = r.lead
r.send(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
return
return nil
}
r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
}
return nil
}

func (r *raft) handleAppendEntries(m pb.Message) {
Expand Down
3 changes: 2 additions & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ func testUpdateTermFromMessage(t *testing.T, state StateType) {
// Reference: section 5.1
func TestRejectStaleTermMessage(t *testing.T) {
called := false
fakeStep := func(r *raft, m pb.Message) {
fakeStep := func(r *raft, m pb.Message) error {
called = true
return nil
}
r := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r.step = fakeStep
Expand Down
Loading

0 comments on commit b5b998b

Please sign in to comment.