Skip to content

Commit

Permalink
Merge pull request #9982 from bdarnell/pagination
Browse files Browse the repository at this point in the history
raft: Introduce CommittedEntries pagination
  • Loading branch information
xiang90 authored Aug 11, 2018
2 parents 59e084d + 0a670b7 commit 11dd0b5
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 6 deletions.
20 changes: 15 additions & 5 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,27 @@ type raftLog struct {
applied uint64

logger Logger

maxMsgSize uint64
}

// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
// newLog returns log using the given storage and default options. It
// recovers the log to the state that it just commits and applies the
// latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
return newLogWithSize(storage, logger, noLimit)
}

// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
storage: storage,
logger: logger,
maxMsgSize: maxMsgSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
Expand Down Expand Up @@ -139,7 +149,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, noLimit)
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand Down
9 changes: 9 additions & 0 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
// If we hit a size limit when loadaing CommittedEntries, clamp
// our HardState.Commit to what we're actually returning. This is
// also used as our cursor to resume for the next Ready batch.
if len(rd.CommittedEntries) > 0 {
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
if rd.HardState.Commit > lastCommit.Index {
rd.HardState.Commit = lastCommit.Index
}
}
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
Expand Down
95 changes: 95 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)

// readyWithTimeout selects from n.Ready() with a 1-second timeout. It
// panics on timeout, which is better than the indefinite wait that
// would occur if this channel were read without being wrapped in a
// select.
func readyWithTimeout(n Node) Ready {
select {
case rd := <-n.Ready():
return rd
case <-time.After(time.Second):
panic("timed out waiting for ready")
}
}

// TestNodeStep ensures that node.Step sends msgProp to propc chan
// and other kinds of messages to recvc chan.
func TestNodeStep(t *testing.T) {
Expand Down Expand Up @@ -831,3 +844,85 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
close(stop)
<-done
}

func TestAppendPagination(t *testing.T) {
const maxSizePerMsg = 2048
n := newNetworkWithConfig(func(c *Config) {
c.MaxSizePerMsg = maxSizePerMsg
}, nil, nil, nil)

seenFullMessage := false
// Inspect all messages to see that we never exceed the limit, but
// we do see messages of larger than half the limit.
n.msgHook = func(m raftpb.Message) bool {
if m.Type == raftpb.MsgApp {
size := 0
for _, e := range m.Entries {
size += len(e.Data)
}
if size > maxSizePerMsg {
t.Errorf("sent MsgApp that is too large: %d bytes", size)
}
if size > maxSizePerMsg/2 {
seenFullMessage = true
}
}
return true
}

n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})

// Partition the network while we make our proposals. This forces
// the entries to be batched into larger messages.
n.isolate(1)
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 5; i++ {
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
}
n.recover()

// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
if !seenFullMessage {
t.Error("didn't see any messages more than half the max size; something is wrong with this test")
}
}

func TestCommitPagination(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxSizePerMsg = 2048
r := newRaft(cfg)
n := newNode()
go n.run(r)
n.Campaign(context.TODO())

rd := readyWithTimeout(&n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()

blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 3; i++ {
if err := n.Propose(context.TODO(), blob); err != nil {
t.Fatal(err)
}
}

// The 3 proposals will commit in two batches.
rd = readyWithTimeout(&n)
if len(rd.CommittedEntries) != 2 {
t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
rd = readyWithTimeout(&n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
}
2 changes: 1 addition & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLog(c.Storage, c.Logger)
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
Expand Down
9 changes: 9 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4055,6 +4055,10 @@ type network struct {
storage map[uint64]*MemoryStorage
dropm map[connem]float64
ignorem map[pb.MessageType]bool

// msgHook is called for each message sent. It may inspect the
// message and return true to send it or false to drop it.
msgHook func(pb.Message) bool
}

// newNetwork initializes a network from peers.
Expand Down Expand Up @@ -4173,6 +4177,11 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message {
continue
}
}
if nw.msgHook != nil {
if !nw.msgHook(m) {
continue
}
}
mm = append(mm, m)
}
return mm
Expand Down

0 comments on commit 11dd0b5

Please sign in to comment.