Skip to content

Commit

Permalink
Merge pull request #32 from pavelkalinnikov/rm-unlimited-read
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg authored Jun 7, 2023
2 parents e5a2a1e + 1df7629 commit 515b142
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 16 deletions.
26 changes: 26 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,32 @@ func (l *raftLog) restore(s pb.Snapshot) {
l.unstable.restore(s)
}

// scan visits all log entries in the [lo, hi) range, returning them via the
// given callback. The callback can be invoked multiple times, with consecutive
// sub-ranges of the requested range. Returns up to pageSize bytes worth of
// entries at a time. May return more if a single entry size exceeds the limit.
//
// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
// error (possibly after passing some entries through the callback).
//
// If the callback returns an error, scan terminates and returns this error
// immediately. This can be used to stop the scan early ("break" the loop).
func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.Entry) error) error {
for lo < hi {
ents, err := l.slice(lo, hi, pageSize)
if err != nil {
return err
} else if len(ents) == 0 {
return fmt.Errorf("got 0 entries in [%d, %d)", lo, hi)
}
if err := v(ents); err != nil {
return err
}
lo += uint64(len(ents))
}
return nil
}

// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
Expand Down
58 changes: 58 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,64 @@ func TestSlice(t *testing.T) {
}
}

func TestScan(t *testing.T) {
offset := uint64(47)
num := uint64(20)
last := offset + num
half := offset + num/2
entries := func(from, to uint64) []pb.Entry {
res := make([]pb.Entry, 0, to-from)
for i := from; i < to; i++ {
res = append(res, pb.Entry{Index: i, Term: i})
}
return res
}
entrySize := entsSize(entries(half, half+1))

storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(pb.Snapshot{
Metadata: pb.SnapshotMetadata{Index: offset}}))
require.NoError(t, storage.Append(entries(offset+1, half)))
l := newLog(storage, raftLogger)
l.append(entries(half, last)...)

// Test that scan() returns the same entries as slice(), on all inputs.
for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} {
for lo := offset + 1; lo < last; lo++ {
for hi := lo; hi <= last; hi++ {
var got []pb.Entry
require.NoError(t, l.scan(lo, hi, pageSize, func(e []pb.Entry) error {
got = append(got, e...)
require.True(t, len(e) == 1 || entsSize(e) <= pageSize)
return nil
}))
want, err := l.slice(lo, hi, noLimit)
require.NoError(t, err)
require.Equal(t, want, got, "scan() and slice() mismatch on [%d, %d) @ %d", lo, hi, pageSize)
}
}
}

// Test that the callback error is propagated to the caller.
iters := 0
require.ErrorIs(t, errBreak, l.scan(offset+1, half, 0, func([]pb.Entry) error {
iters++
if iters == 2 {
return errBreak
}
return nil
}))
require.Equal(t, 2, iters)

// Test that we max out the limit, and not just always return a single entry.
// NB: this test works only because the requested range length is even.
require.NoError(t, l.scan(offset+1, offset+11, entrySize*2, func(ents []pb.Entry) error {
require.Len(t, ents, 2)
require.Equal(t, entrySize*2, entsSize(ents))
return nil
}))
}

func mustTerm(term uint64, err error) uint64 {
if err != nil {
panic(err)
Expand Down
49 changes: 33 additions & 16 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,19 +913,46 @@ func (r *raft) hup(t CampaignType) {
r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
return
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
if r.hasUnappliedConfChanges() {
r.logger.Warningf("%x cannot campaign at term %d since there are still pending configuration changes to apply", r.id, r.Term)
return
}

r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}

// errBreak is a sentinel error used to break a callback-based loop.
var errBreak = errors.New("break")

func (r *raft) hasUnappliedConfChanges() bool {
if r.raftLog.applied >= r.raftLog.committed { // in fact applied == committed
return false
}
found := false
// Scan all unapplied committed entries to find a config change. Paginate the
// scan, to avoid a potentially unlimited memory spike.
lo, hi := r.raftLog.applied+1, r.raftLog.committed+1
// Reuse the maxApplyingEntsSize limit because it is used for similar purposes
// (limiting the read of unapplied committed entries) when raft sends entries
// via the Ready struct for application.
// TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
// outside the raft package.
pageSize := r.raftLog.maxApplyingEntsSize
if err := r.raftLog.scan(lo, hi, pageSize, func(ents []pb.Entry) error {
for i := range ents {
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
found = true
return errBreak
}
}
return nil
}); err != nil && err != errBreak {
r.logger.Panicf("error scanning unapplied entries [%d, %d): %v", lo, hi, err)
}
return found
}

// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
Expand Down Expand Up @@ -1985,16 +2012,6 @@ func (r *raft) reduceUncommittedSize(s entryPayloadSize) {
}
}

func numOfPendingConf(ents []pb.Entry) int {
n := 0
for i := range ents {
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
n++
}
}
return n
}

func releasePendingReadIndexMessages(r *raft) {
if len(r.pendingReadIndexMessages) == 0 {
// Fast path for the common case to avoid a call to storage.LastIndex()
Expand Down

0 comments on commit 515b142

Please sign in to comment.