From b67eb696f6a064d6a3a18517c1ed5525faa931e8 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 7 Dec 2017 18:29:20 -0500 Subject: [PATCH] storage: sync entries to disk in parallel with followers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Referenced in #17500. This change implements the optimization in the Raft thesis under the section: 10.2.1 Writing to the leader’s disk in parallel. The optimization allows the leader to sync new entries to its disk after it has sent the corresponding `MsgApp` messages, instead of before. Here, we invoke this optimization by: 1. sending all MsgApps. 2. syncing all entries and Raft state to disk. 3. sending all other messages. Release note (performance improvement): Raft followers now write to their disks in parallel with the leader. --- pkg/storage/replica.go | 57 +++++++++++++++++++++-- pkg/storage/replica_test.go | 92 +++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+), 5 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 711fb3d6787e..3de4f3fc9cbf 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -3430,6 +3430,43 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + // Separate the MsgApp messages from all other Raft message types so that we + // can take advantage of the optimization discussed in the Raft thesis under + // the section: `10.2.1 Writing to the leader’s disk in parallel`. The + // optimization suggests that instead of a leader writing new log entries to + // disk before replicating them to its followers, the leader can instead + // write the entries to disk in parallel with replicating to its followers + // and them writing to their disks. + // + // Here, we invoke this optimization by: + // 1. sending all MsgApps. + // 2. syncing all entries and Raft state to disk. + // 3. sending all other messages. + // + // Since this is all handled in handleRaftReadyRaftMuLocked, we're assured + // that even though we may sync new entries to disk after sending them in + // MsgApps to followers, we'll always have them synced to disk before we + // process followers' MsgAppResps for the corresponding entries because this + // entire method requires RaftMu to be locked. This is a requirement because + // etcd/raft does not support commit quorums that do not include the leader, + // even though the Raft thesis states that this would technically be safe: + // > The leader may even commit an entry before it has been written to its + // > own disk, if a majority of followers have written it to their disks; + // > this is still safe. + // + // However, MsgApps are also used to inform followers of committed entries + // through the Commit index that they contains. Because the optimization + // sends all MsgApps before syncing to disc, we may send out a commit index + // in a MsgApp that we have not ourselves written in HardState.Commit. This + // is ok, because the Commit index can be treated as volatile state, as is + // supported by raft.MustSync. The Raft thesis corroborates this, stating in + // section: `3.8 Persisted state and server restarts` that: + // > Other state variables are safe to lose on a restart, as they can all be + // > recreated. The most interesting example is the commit index, which can + // > safely be reinitialized to zero on a restart. + mgsApps, otherMsgs := splitMsgApps(rd.Messages) + r.sendRaftMessages(ctx, mgsApps) + // Use a more efficient write-only batch because we don't need to do any // reads from the batch. Any reads are performed via the "distinct" batch // which passes the reads through to the underlying DB. @@ -3504,10 +3541,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Update protected state (last index, last term, raft log size and raft // leader ID) and set raft log entry cache. We clear any older, uncommitted // log entries and cache the latest ones. - // - // Note also that we're likely to send messages related to the Entries we - // just appended, and these entries need to be inlined when sending them to - // followers - populating the cache here saves a lot of that work. r.mu.Lock() r.store.raftEntryCache.addEntries(r.RangeID, rd.Entries) r.mu.lastIndex = lastIndex @@ -3519,7 +3552,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } r.mu.Unlock() - r.sendRaftMessages(ctx, rd.Messages) + r.sendRaftMessages(ctx, otherMsgs) for _, e := range rd.CommittedEntries { switch e.Type { @@ -3651,6 +3684,20 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, "", nil } +// splitMsgApps splits the Raft message slice into two slices, one containing +// MsgApps and one containing all other message types. Each slice retains the +// relative ordering between messages in the original slice. +func splitMsgApps(msgs []raftpb.Message) (mgsApps, otherMsgs []raftpb.Message) { + splitIdx := 0 + for i, msg := range msgs { + if msg.Type == raftpb.MsgApp { + msgs[i], msgs[splitIdx] = msgs[splitIdx], msgs[i] + splitIdx++ + } + } + return msgs[:splitIdx], msgs[splitIdx:] +} + func fatalOnRaftReadyErr(ctx context.Context, expl string, err error) { // Mimic the behavior in processRaft. log.Fatalf(ctx, "%s: %s", log.Safe(expl), err) // TODO(bdarnell) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 16e4b423539c..cd864f727fc8 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -8704,6 +8704,98 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { } } +func TestSplitMsgApps(t *testing.T) { + defer leaktest.AfterTest(t)() + + msgApp := func(idx uint64) raftpb.Message { + return raftpb.Message{Index: idx, Type: raftpb.MsgApp} + } + otherMsg := func(idx uint64) raftpb.Message { + return raftpb.Message{Index: idx, Type: raftpb.MsgVote} + } + formatMsgs := func(msgs []raftpb.Message) string { + strs := make([]string, len(msgs)) + for i, msg := range msgs { + strs[i] = fmt.Sprintf("{%s:%d}", msg.Type, msg.Index) + } + return fmt.Sprint(strs) + } + + testCases := []struct { + msgsIn, msgAppsOut, otherMsgsOut []raftpb.Message + }{ + // No msgs. + { + msgsIn: []raftpb.Message{}, + msgAppsOut: []raftpb.Message{}, + otherMsgsOut: []raftpb.Message{}, + }, + // Only msgApps. + { + msgsIn: []raftpb.Message{msgApp(1)}, + msgAppsOut: []raftpb.Message{msgApp(1)}, + otherMsgsOut: []raftpb.Message{}, + }, + { + msgsIn: []raftpb.Message{msgApp(1), msgApp(2)}, + msgAppsOut: []raftpb.Message{msgApp(1), msgApp(2)}, + otherMsgsOut: []raftpb.Message{}, + }, + { + msgsIn: []raftpb.Message{msgApp(2), msgApp(1)}, + msgAppsOut: []raftpb.Message{msgApp(2), msgApp(1)}, + otherMsgsOut: []raftpb.Message{}, + }, + // Only otherMsgs. + { + msgsIn: []raftpb.Message{otherMsg(1)}, + msgAppsOut: []raftpb.Message{}, + otherMsgsOut: []raftpb.Message{otherMsg(1)}, + }, + { + msgsIn: []raftpb.Message{otherMsg(1), otherMsg(2)}, + msgAppsOut: []raftpb.Message{}, + otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(2)}, + }, + { + msgsIn: []raftpb.Message{otherMsg(2), otherMsg(1)}, + msgAppsOut: []raftpb.Message{}, + otherMsgsOut: []raftpb.Message{otherMsg(2), otherMsg(1)}, + }, + // Mixed msgApps and otherMsgs. + { + msgsIn: []raftpb.Message{msgApp(1), otherMsg(2)}, + msgAppsOut: []raftpb.Message{msgApp(1)}, + otherMsgsOut: []raftpb.Message{otherMsg(2)}, + }, + { + msgsIn: []raftpb.Message{otherMsg(1), msgApp(2)}, + msgAppsOut: []raftpb.Message{msgApp(2)}, + otherMsgsOut: []raftpb.Message{otherMsg(1)}, + }, + { + msgsIn: []raftpb.Message{msgApp(1), otherMsg(2), msgApp(3)}, + msgAppsOut: []raftpb.Message{msgApp(1), msgApp(3)}, + otherMsgsOut: []raftpb.Message{otherMsg(2)}, + }, + { + msgsIn: []raftpb.Message{otherMsg(1), msgApp(2), otherMsg(3)}, + msgAppsOut: []raftpb.Message{msgApp(2)}, + otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(3)}, + }, + } + for _, c := range testCases { + inStr := formatMsgs(c.msgsIn) + t.Run(inStr, func(t *testing.T) { + msgAppsRes, otherMsgsRes := splitMsgApps(c.msgsIn) + if !reflect.DeepEqual(msgAppsRes, c.msgAppsOut) || !reflect.DeepEqual(otherMsgsRes, c.otherMsgsOut) { + t.Errorf("expected splitMsgApps(%s)=%s/%s, found %s/%s", inStr, formatMsgs(c.msgAppsOut), + formatMsgs(c.otherMsgsOut), formatMsgs(msgAppsRes), formatMsgs(otherMsgsRes)) + } + }) + } +} + type testQuiescer struct { numProposals int status *raft.Status