diff --git a/raft/log.go b/raft/log.go index c3036d3c90d..a3be7d48674 100644 --- a/raft/log.go +++ b/raft/log.go @@ -38,17 +38,27 @@ type raftLog struct { applied uint64 logger Logger + + maxMsgSize uint64 } -// newLog returns log using the given storage. It recovers the log to the state -// that it just commits and applies the latest snapshot. +// newLog returns log using the given storage and default options. It +// recovers the log to the state that it just commits and applies the +// latest snapshot. func newLog(storage Storage, logger Logger) *raftLog { + return newLogWithSize(storage, logger, noLimit) +} + +// newLogWithSize returns a log using the given storage and max +// message size. +func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ - storage: storage, - logger: logger, + storage: storage, + logger: logger, + maxMsgSize: maxMsgSize, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -139,7 +149,7 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, noLimit) + ents, err := l.slice(off, l.committed+1, l.maxMsgSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } diff --git a/raft/node.go b/raft/node.go index b24ba609f38..57a974fcfe6 100644 --- a/raft/node.go +++ b/raft/node.go @@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt + // If we hit a size limit when loadaing CommittedEntries, clamp + // our HardState.Commit to what we're actually returning. This is + // also used as our cursor to resume for the next Ready batch. + if len(rd.CommittedEntries) > 0 { + lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1] + if rd.HardState.Commit > lastCommit.Index { + rd.HardState.Commit = lastCommit.Index + } + } } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot diff --git a/raft/node_test.go b/raft/node_test.go index e8c3132a739..e2002208f8d 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -26,6 +26,19 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) +// readyWithTimeout selects from n.Ready() with a 1-second timeout. It +// panics on timeout, which is better than the indefinite wait that +// would occur if this channel were read without being wrapped in a +// select. +func readyWithTimeout(n Node) Ready { + select { + case rd := <-n.Ready(): + return rd + case <-time.After(time.Second): + panic("timed out waiting for ready") + } +} + // TestNodeStep ensures that node.Step sends msgProp to propc chan // and other kinds of messages to recvc chan. func TestNodeStep(t *testing.T) { @@ -831,3 +844,85 @@ func TestNodeProposeAddLearnerNode(t *testing.T) { close(stop) <-done } + +func TestAppendPagination(t *testing.T) { + const maxSizePerMsg = 2048 + n := newNetworkWithConfig(func(c *Config) { + c.MaxSizePerMsg = maxSizePerMsg + }, nil, nil, nil) + + seenFullMessage := false + // Inspect all messages to see that we never exceed the limit, but + // we do see messages of larger than half the limit. + n.msgHook = func(m raftpb.Message) bool { + if m.Type == raftpb.MsgApp { + size := 0 + for _, e := range m.Entries { + size += len(e.Data) + } + if size > maxSizePerMsg { + t.Errorf("sent MsgApp that is too large: %d bytes", size) + } + if size > maxSizePerMsg/2 { + seenFullMessage = true + } + } + return true + } + + n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup}) + + // Partition the network while we make our proposals. This forces + // the entries to be batched into larger messages. + n.isolate(1) + blob := []byte(strings.Repeat("a", 1000)) + for i := 0; i < 5; i++ { + n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}}) + } + n.recover() + + // After the partition recovers, tick the clock to wake everything + // back up and send the messages. + n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat}) + if !seenFullMessage { + t.Error("didn't see any messages more than half the max size; something is wrong with this test") + } +} + +func TestCommitPagination(t *testing.T) { + s := NewMemoryStorage() + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + cfg.MaxSizePerMsg = 2048 + r := newRaft(cfg) + n := newNode() + go n.run(r) + n.Campaign(context.TODO()) + + rd := readyWithTimeout(&n) + if len(rd.CommittedEntries) != 1 { + t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + + blob := []byte(strings.Repeat("a", 1000)) + for i := 0; i < 3; i++ { + if err := n.Propose(context.TODO(), blob); err != nil { + t.Fatal(err) + } + } + + // The 3 proposals will commit in two batches. + rd = readyWithTimeout(&n) + if len(rd.CommittedEntries) != 2 { + t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + rd = readyWithTimeout(&n) + if len(rd.CommittedEntries) != 1 { + t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() +} diff --git a/raft/raft.go b/raft/raft.go index ae91498bc10..7de19f94890 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -302,7 +302,7 @@ func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLog(c.Storage, c.Logger) + raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) diff --git a/raft/raft_test.go b/raft/raft_test.go index a34bf3fa68f..46bdefbcf2d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4055,6 +4055,10 @@ type network struct { storage map[uint64]*MemoryStorage dropm map[connem]float64 ignorem map[pb.MessageType]bool + + // msgHook is called for each message sent. It may inspect the + // message and return true to send it or false to drop it. + msgHook func(pb.Message) bool } // newNetwork initializes a network from peers. @@ -4173,6 +4177,11 @@ func (nw *network) filter(msgs []pb.Message) []pb.Message { continue } } + if nw.msgHook != nil { + if !nw.msgHook(m) { + continue + } + } mm = append(mm, m) } return mm