Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: sync entries to disk in parallel with followers #19229

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3430,6 +3430,43 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
}

// Separate the MsgApp messages from all other Raft message types so that we
// can take advantage of the optimization discussed in the Raft thesis under
// the section: `10.2.1 Writing to the leader’s disk in parallel`. The
// optimization suggests that instead of a leader writing new log entries to
// disk before replicating them to its followers, the leader can instead
// write the entries to disk in parallel with replicating to its followers
// and them writing to their disks.
//
// Here, we invoke this optimization by:
// 1. sending all MsgApps.
// 2. syncing all entries and Raft state to disk.
// 3. sending all other messages.
//
// Since this is all handled in handleRaftReadyRaftMuLocked, we're assured
// that even though we may sync new entries to disk after sending them in
// MsgApps to followers, we'll always have them synced to disk before we
// process followers' MsgAppResps for the corresponding entries because this
// entire method requires RaftMu to be locked. This is a requirement because
// etcd/raft does not support commit quorums that do not include the leader,
// even though the Raft thesis states that this would technically be safe:
// > The leader may even commit an entry before it has been written to its
// > own disk, if a majority of followers have written it to their disks;
// > this is still safe.
//
// However, MsgApps are also used to inform followers of committed entries
// through the Commit index that they contains. Because the optimization
// sends all MsgApps before syncing to disc, we may send out a commit index
// in a MsgApp that we have not ourselves written in HardState.Commit. This
// is ok, because the Commit index can be treated as volatile state, as is
// supported by raft.MustSync. The Raft thesis corroborates this, stating in
// section: `3.8 Persisted state and server restarts` that:
// > Other state variables are safe to lose on a restart, as they can all be
// > recreated. The most interesting example is the commit index, which can
// > safely be reinitialized to zero on a restart.
mgsApps, otherMsgs := splitMsgApps(rd.Messages)
r.sendRaftMessages(ctx, mgsApps)

// Use a more efficient write-only batch because we don't need to do any
// reads from the batch. Any reads are performed via the "distinct" batch
// which passes the reads through to the underlying DB.
Expand Down Expand Up @@ -3504,10 +3541,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// Update protected state (last index, last term, raft log size and raft
// leader ID) and set raft log entry cache. We clear any older, uncommitted
// log entries and cache the latest ones.
//
// Note also that we're likely to send messages related to the Entries we
// just appended, and these entries need to be inlined when sending them to
// followers - populating the cache here saves a lot of that work.
r.mu.Lock()
r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries)
r.mu.lastIndex = lastIndex
Expand All @@ -3519,7 +3552,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
r.mu.Unlock()

r.sendRaftMessages(ctx, rd.Messages)
r.sendRaftMessages(ctx, otherMsgs)

for _, e := range rd.CommittedEntries {
switch e.Type {
Expand Down Expand Up @@ -3651,6 +3684,20 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, "", nil
}

// splitMsgApps splits the Raft message slice into two slices, one containing
// MsgApps and one containing all other message types. Each slice retains the
// relative ordering between messages in the original slice.
func splitMsgApps(msgs []raftpb.Message) (mgsApps, otherMsgs []raftpb.Message) {
splitIdx := 0
for i, msg := range msgs {
if msg.Type == raftpb.MsgApp {
msgs[i], msgs[splitIdx] = msgs[splitIdx], msgs[i]
splitIdx++
}
}
return msgs[:splitIdx], msgs[splitIdx:]
}

func fatalOnRaftReadyErr(ctx context.Context, expl string, err error) {
// Mimic the behavior in processRaft.
log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell)
Expand Down
92 changes: 92 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8704,6 +8704,98 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
}
}

func TestSplitMsgApps(t *testing.T) {
defer leaktest.AfterTest(t)()

msgApp := func(idx uint64) raftpb.Message {
return raftpb.Message{Index: idx, Type: raftpb.MsgApp}
}
otherMsg := func(idx uint64) raftpb.Message {
return raftpb.Message{Index: idx, Type: raftpb.MsgVote}
}
formatMsgs := func(msgs []raftpb.Message) string {
strs := make([]string, len(msgs))
for i, msg := range msgs {
strs[i] = fmt.Sprintf("{%s:%d}", msg.Type, msg.Index)
}
return fmt.Sprint(strs)
}

testCases := []struct {
msgsIn, msgAppsOut, otherMsgsOut []raftpb.Message
}{
// No msgs.
{
msgsIn: []raftpb.Message{},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{},
},
// Only msgApps.
{
msgsIn: []raftpb.Message{msgApp(1)},
msgAppsOut: []raftpb.Message{msgApp(1)},
otherMsgsOut: []raftpb.Message{},
},
{
msgsIn: []raftpb.Message{msgApp(1), msgApp(2)},
msgAppsOut: []raftpb.Message{msgApp(1), msgApp(2)},
otherMsgsOut: []raftpb.Message{},
},
{
msgsIn: []raftpb.Message{msgApp(2), msgApp(1)},
msgAppsOut: []raftpb.Message{msgApp(2), msgApp(1)},
otherMsgsOut: []raftpb.Message{},
},
// Only otherMsgs.
{
msgsIn: []raftpb.Message{otherMsg(1)},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{otherMsg(1)},
},
{
msgsIn: []raftpb.Message{otherMsg(1), otherMsg(2)},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(2)},
},
{
msgsIn: []raftpb.Message{otherMsg(2), otherMsg(1)},
msgAppsOut: []raftpb.Message{},
otherMsgsOut: []raftpb.Message{otherMsg(2), otherMsg(1)},
},
// Mixed msgApps and otherMsgs.
{
msgsIn: []raftpb.Message{msgApp(1), otherMsg(2)},
msgAppsOut: []raftpb.Message{msgApp(1)},
otherMsgsOut: []raftpb.Message{otherMsg(2)},
},
{
msgsIn: []raftpb.Message{otherMsg(1), msgApp(2)},
msgAppsOut: []raftpb.Message{msgApp(2)},
otherMsgsOut: []raftpb.Message{otherMsg(1)},
},
{
msgsIn: []raftpb.Message{msgApp(1), otherMsg(2), msgApp(3)},
msgAppsOut: []raftpb.Message{msgApp(1), msgApp(3)},
otherMsgsOut: []raftpb.Message{otherMsg(2)},
},
{
msgsIn: []raftpb.Message{otherMsg(1), msgApp(2), otherMsg(3)},
msgAppsOut: []raftpb.Message{msgApp(2)},
otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(3)},
},
}
for _, c := range testCases {
inStr := formatMsgs(c.msgsIn)
t.Run(inStr, func(t *testing.T) {
msgAppsRes, otherMsgsRes := splitMsgApps(c.msgsIn)
if !reflect.DeepEqual(msgAppsRes, c.msgAppsOut) || !reflect.DeepEqual(otherMsgsRes, c.otherMsgsOut) {
t.Errorf("expected splitMsgApps(%s)=%s/%s, found %s/%s", inStr, formatMsgs(c.msgAppsOut),
formatMsgs(c.otherMsgsOut), formatMsgs(msgAppsRes), formatMsgs(otherMsgsRes))
}
})
}
}

type testQuiescer struct {
numProposals int
status *raft.Status
Expand Down