Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
Update etcd
Browse files Browse the repository at this point in the history
Picks up etcd-io/etcd#9982 and etcd-io/etcd#9985 (and no other changes
to packages we use).
  • Loading branch information
bdarnell committed Aug 13, 2018
1 parent d5cea79 commit d18a2a3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
20 changes: 15 additions & 5 deletions github.com/coreos/etcd/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,27 @@ type raftLog struct {
applied uint64

logger Logger

maxMsgSize uint64
}

// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
// newLog returns log using the given storage and default options. It
// recovers the log to the state that it just commits and applies the
// latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
return newLogWithSize(storage, logger, noLimit)
}

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
storage: storage,
logger: logger,
maxMsgSize: maxMsgSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
Expand Down Expand Up @@ -139,7 +149,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, noLimit)
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand Down
9 changes: 9 additions & 0 deletions github.com/coreos/etcd/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
// If we hit a size limit when loadaing CommittedEntries, clamp
// our HardState.Commit to what we're actually returning. This is
// also used as our cursor to resume for the next Ready batch.
if len(rd.CommittedEntries) > 0 {
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
if rd.HardState.Commit > lastCommit.Index {
rd.HardState.Commit = lastCommit.Index
}
}
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
Expand Down
36 changes: 29 additions & 7 deletions github.com/coreos/etcd/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLog(c.Storage, c.Logger)
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
Expand Down Expand Up @@ -441,30 +441,43 @@ func (r *raft) getProgress(id uint64) *Progress {
return r.learnerPrs[id]
}

// sendAppend sends RPC, with entries to the given peer.
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.getProgress(to)
if pr.IsPaused() {
return
return false
}
m := pb.Message{}
m.To = to

term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}

if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return
return false
}

m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return
return false
}
panic(err) // TODO(bdarnell)
}
Expand Down Expand Up @@ -498,6 +511,7 @@ func (r *raft) sendAppend(to uint64) {
}
}
r.send(m)
return true
}

// sendHeartbeat sends an empty MsgApp
Expand Down Expand Up @@ -1020,10 +1034,18 @@ func stepLeader(r *raft, m pb.Message) error {
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.maybeSendAppend(m.From, false) {
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
Expand Down

0 comments on commit d18a2a3

Please sign in to comment.