Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: support asynchronous storage writes #14627

Conversation

nvanbenschoten
Copy link
Contributor

@nvanbenschoten nvanbenschoten commented Oct 26, 2022

Fixes #12257.

This change adds opt-in support to raft to perform local storage writes asynchronously from the raft state machine handling loop.

Summary

A new AsyncStorageWrites configuration instructs the raft node to write to its local storage (raft log and state machine) using a request/response message passing interface instead of the default Ready/Advance function call interface. Local storage messages can be pipelined and processed asynchronously (with respect to Ready iteration), facilitating reduced interference between Raft proposals and increased batching of log appends and state machine application. As a result, use of asynchronous storage writes can reduce end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the Ready.Message slice will include new MsgStorageAppend and MsgStorageApply messages. The messages will target a LocalAppendThread and a LocalApplyThread, respectively. Messages to the same target must be reliably processed in order. In other words, they can't be dropped (like messages over the network) and those targeted at the same thread can't be reordered. Messages to different targets can be processed in any order.

MsgStorageAppend carries Raft log entries to append, election votes to persist, and snapshots to apply. All writes performed in response to a MsgStorageAppend are expected to be durable. The message assumes the role of the Entries, HardState, and Snapshot fields in Ready.

MsgStorageApply carries committed entries to apply. The message assumes the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be delivered after the corresponding storage write has been completed. These responses may target the same node or may target other nodes. The storage threads are not responsible for understanding the response messages, only for delivering them to the correct target after performing the storage write.

Design Considerations

  • There must be no regression for existing users that do not enable AsyncStorageWrites. For instance, CommittedEntries must not wait on unstable entries to be stabilized in cases where a follower is given committed entries in a MsgApp.
  • Asynchronous storage work should use a message passing interface, like the rest of this library.
  • The Raft leader and followers should behave symmetrically. Both should be able to use asynchronous storage writes for log appends and entry application.
  • The LocalAppendThread on a follower should be able to send MsgAppResp messages directly to the leader without passing back through the raft state machine handling loop.
  • The unstable log should remain true to its name. It should hold entries until they are stable and should not rely on an intermediate reliable cache.
  • Pseudo-targets should be assigned to messages that target the local storage systems to denote required ordering guarantees.
  • Code should be maximally unified across AsyncStorageWrites=false and AsyncStorageWrites=true. AsyncStorageWrites=false should be a special case of AsyncStorageWrites=true where the library hides the possibility of asynchrony.
  • It should be possible to apply snapshots asynchronously, even though a snapshot touches both the Raft log state and the state machine. The library should make this easy for users to handle by delaying all committed entries until after the snapshot has applied, so snapshot application can be handled by 1) flushing the apply thread, 2) sending the MsgStorageAppend that contains a snapshot to the LocalAppendThread to be applied.

Usage

When asynchronous storage writes is enabled, the responsibility of code using the library is different from what is presented in raft/doc.go (which has been updated to include a section about async storage writes). Users still read from the Node.Ready() channel. However, they process the updates it contains in a different manner. Users no longer consult the HardState, Entries, and Snapshot fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to indicate that they have processed all entries in the Ready (step 4 in doc.go). Instead, all local storage operations are also communicated through messages present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append messages, which target a LocalAppendThread and carry Entries, HardState, and a Snapshot. The second flavor is entry application messages, which target a LocalApplyThread and carry CommittedEntries. Messages to the same target must be reliably processed in order. Messages to different targets can be processed in any order. Each local storage message carries a slice of response messages that must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop will look something like this:

for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}

Usage of Asynchronous Storage Writes will typically also contain a pair of storage handler threads, one for log writes (append) and one for entry application to the local state machine (apply). Those will look something like:

// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

Compatibility

The library remains backwards compatible with existing users and the change does not introduce any breaking changes. Users that do not set AsyncStorageWrites to true in the Config struct will not notice a difference with this change. This is despite the fact that the existing "synchronous storage writes" interface was adapted to share a majority of the same code. For instance, Node.Advance has been adapted to transparently acknowledge an asynchronous log append attempt and an asynchronous state machine application attempt, internally using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to a process and nodes using asynchronous storage writes appear to behave no differently from the outside. Clusters are free to mix nodes running with and without asynchronous storage writes.

Performance

The bulk of the performance evaluation of this functionality thus far has been done with rafttoy, a benchmarking harness developed to experiment with Raft proposal pipeline optimization. The harness can be used to run single-node benchmarks or multi-node benchmarks. It supports plugable raft logs, storage engines, network transports, and pipeline implementations.

To evaluate this change, we fixed the raft log (etcd/wal), storage engine (pebble), and network transport (grpc). We then built (nvanbenschoten/rafttoy#3) a pipeline implementation on top of the new asynchronous storage writes functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:

  • basic (P1): baseline stock raft usage, similar to the code in doc.go
  • parallel append + early ack (P2): CockroachDB's current pipeline, which includes two significant variations to the basic pipeline. The first is that it sends MsgApp messages to followers before writing to local Raft log (see commit for explanation), allowing log appends to occur in parallel across replicas. The second is that it acknowledges committed log entries before applying them (see commit for explanation).
  • async append + async apply + early ack (P3): A pipelining using asynchronous storage writes with a separate append thread and a separate apply thread. Also uses the same early acknowledgement optimization from above to ack committed entries before handing them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput). The testing used an open-loop workload to increase the rate of new raft proposals until a saturation point was reached.

Throughput vs latency of Raft proposal pipeline implementations

The comparison demonstrates two different benefits of asynchronous storage writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms, P2's average latency was 7.3ms, and P3's average latency was 5.2ms. This is a reduction in average latency of 29% from the optimized pipeline that does not use asynchronous storage writes. This matches the expectations outlined in cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. In this test, P1 and P2 topped out at 30MB/s, while P3 could push up to 52MB/s, an increase in maximum throughput of 73%. This is because asynchronous storage writes can improve batching for both log appends and log application. In this experiment, we saw the average append batch size under saturation increase from 928 to 1542, which is a similar ratio to the increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify that this change does not negatively impact the performance of users of this library that do not use asynchronous storage writes.

cc. @tbg @bdarnell

@ahrtr
Copy link
Member

ahrtr commented Oct 26, 2022

Just a reminder, big PR is hard to review. Please consider to deliver a design doc firstly, and breakdown the PR into small ones as possible as you can.

@nvanbenschoten
Copy link
Contributor Author

Yes, I completely agree. This is just a draft PR that includes a sequence of preparatory refactors. I intend to pull those out into separate PRs once the high-level architecture presented here reaches a consensus (no pun intended). The core of this change is really just the last three commits in the PR (two of which are small).

I'll be talking with @tbg and @bdarnell about the approach over the next few days. I'd also be happy to talk through it with you @ahrtr at your convenience, as etcd may want to adopt this interaction model as well.

@serathius
Copy link
Member

serathius commented Oct 27, 2022

Note: I'm working on proposal to limit performance improvements until etcd introduces protective measure to prevent critical reliability issues like we have seen this year.

For this proposal I would really want to see a test plan that would explain what risk this change introduces and how we are preventing them.

@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/asyncRaftLogMsg branch 3 times, most recently from 94ce527 to d953f70 Compare October 27, 2022 20:04
@nvanbenschoten
Copy link
Contributor Author

@serathius thanks for raising the point about testing. I think there are two aspects to the risks posed by this change.


The first aspect is the risk to users who do not enable the new AsyncStorageWrites configuration. The risk to these users is not negligible, but it should be small and contained. This is because there is almost no visible behavior change for those users. One way to see this is that very few existing tests inside the raft package needed to change in response to this change, and no tests outside of the raft package needed to change. In fact, the only externally visible change for these users comes from c05e48f, which parallels #14413 and keeps the rest of the async storage writes change focused. I think it makes sense to pull that preliminary commit into a separate PR and talk through whether we need to add more testing than the commit already adds to gain more confidence in the change.

There is also a risk that some of the internal refactors made to unify logic between the AsyncStorageWrites: true and AsyncStorageWrites: false configs are faulty. The fact that we don't see any impact on tests outside of the raft package gives us some confidence that no such bugs were introduced. Something else we can do to stress the change is upgrade CockroachDB to use this code (with AsyncStorageWrites: false) and stress its layered test suite. Together, I think that gives us good confidence in the correctness of these reactors.

There's also the performance aspect of these reactors. Are we regressing performance for users where AsyncStorageWrites: false? I'll need to run some benchmarks to confirm that we're not.


The second aspect is that there are risks of bugs that would specifically affect users that do enable the new AsyncStorageWrites configuration. There's a higher likelihood of this, as async storage writes introduces new states and requires additional progress tracking.

The design of this change fought back against these new states resulting in undertested code by unifying the majority of the logic between the AsyncStorageWrites: true and AsyncStorageWrites: false configs. This means that the new progress tracking is exercised with AsyncStorageWrites: false, so it's not dead code when async storage writes are not used. Still, there are interactions that can only occur with async storage writes due to different orderings between remote responses, log write acknowledgment, and log application acknowledgment.

I think a form of randomized testing could help give us more confidence that we handle these new states correctly, regardless of the ordering of responses. I'll look into this.

@codecov-commenter
Copy link

codecov-commenter commented Oct 27, 2022

Codecov Report

Merging #14627 (fe4eb46) into main (e24402d) will decrease coverage by 0.15%.
The diff coverage is 87.58%.

@@            Coverage Diff             @@
##             main   #14627      +/-   ##
==========================================
- Coverage   75.70%   75.54%   -0.16%     
==========================================
  Files         457      459       +2     
  Lines       37300    37562     +262     
==========================================
+ Hits        28239    28378     +139     
- Misses       7309     7392      +83     
- Partials     1752     1792      +40     
Flag Coverage Δ
all 75.54% <87.58%> (-0.16%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
raft/rafttest/interaction_env.go 100.00% <ø> (ø)
...t/interaction_env_handler_process_append_thread.go 69.56% <69.56%> (ø)
.../rafttest/interaction_env_handler_process_ready.go 72.97% <70.00%> (-0.25%) ⬇️
...st/interaction_env_handler_process_apply_thread.go 80.00% <80.00%> (ø)
raft/raft.go 89.37% <82.45%> (-0.65%) ⬇️
raft/rafttest/interaction_env_handler_stabilize.go 92.85% <88.23%> (-7.15%) ⬇️
raft/rawnode.go 85.85% <92.50%> (+9.99%) ⬆️
raft/log.go 79.09% <94.59%> (+0.98%) ⬆️
raft/log_unstable.go 95.95% <100.00%> (+2.02%) ⬆️
raft/node.go 86.19% <100.00%> (-1.53%) ⬇️
... and 42 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

raft/log.go Outdated Show resolved Hide resolved
raft/log_unstable.go Show resolved Hide resolved
raft/log.go Show resolved Hide resolved
raft/log.go Outdated Show resolved Hide resolved
raft/node.go Show resolved Hide resolved
raft/node.go Outdated Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/asyncRaftLogMsg branch 4 times, most recently from 7ede50c to fe4eb46 Compare November 2, 2022 23:16
@xiang90
Copy link
Contributor

xiang90 commented Nov 3, 2022

@nvanbenschoten

It seems that even with this change, we are far away from saturating the I/O and Compute resources. Do you get a chance to see why? I am asking since I want to make sure we are trying to get both the short-term and long-term direction right :)

raft/rafttest/interaction_env_handler_add_nodes.go Outdated Show resolved Hide resolved
raft/util.go Show resolved Hide resolved
@nvanbenschoten
Copy link
Contributor Author

It seems that even with this change, we are far away from saturating the I/O and Compute resources. Do you get a chance to see why? I am asking since I want to make sure we are trying to get both the short-term and long-term direction right :)

@xiang90 I re-ran the rafttoy benchmark setup that I posted above to determine why and where we were hitting the limiting bottleneck. As you called out, ~50MB/s of Raft proposals shouldn't be saturating compute, network, or I/O on this cluster1. It turns out that we were approaching disk write I/O saturation. This was due to the high degree of write amplification in the LSM that the test was using as the replicated state machine's storage engine. In other words, LSM compactions of applied entries were responsible for most disk write I/O.

I constructed a new setup using an in-memory replicated state machine storage engine and etcd's WAL for the Raft log. This meant that all disk I/O came from Raft log manipulation, so there would be no write amplification in the test. I also increased the entry size to 4KB so that I could try to saturate the hardware with < 1M proposals/s.

Throughput vs  latency of Raft proposal pipeline implementations

With this setup, we can saturate the hardware. Asynchronous storage writes allow nodes to write continuously to their Raft log in a tight loop, with minimal interference between entry log writes and entries at different stages of the Raft pipeline. While the basic pipeline maxes out at 340 MB/s worth of proposals, the async storage writes pipeline maxes out at 583 MB/s.

However, we don't saturate compute or disk I/O. Instead, with async storage writes, we can saturate the 10 Gigabit network between the Raft leader and its two followers (10Gb/s / 8 = 1.25GB/s = 625MB/s per follower). Experimentally, 600 MB/s is about as much as I can push from the leader VM to both follower VMs concurrently using iperf3.

Footnotes

  1. a 3 node AWS cluster of m5.4xlarge instances with gp3 EBS volumes (16000 IOPS, 1GB/s throughput)

@xiang90
Copy link
Contributor

xiang90 commented Nov 4, 2022

@nvanbenschoten

Thanks for the detailed analysis. That makes a lot of sense!

I was surprised that we could only do 30MB/s before the optimization as I did some perf analysis in 2017 with a very different result. I thought there might be something significant changed.

@xiang90
Copy link
Contributor

xiang90 commented Nov 4, 2022

@nvanbenschoten

Awesome work, by the way! HUGE improvements.

Copy link
Contributor

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review but maybe 50 comments is a good stopping point anyway?

Looks like the following commits should be uncontroversial; maybe you want to land them separately sooner rather than later?

  • raft: delete unused Ready.containsUpdates method
  • raft: clean up IsLocalMsg and IsResponseMsg logic
  • raft: don't apply entries when applying snapshot
  • raft: remove IsEmptySnap check from raftLog.hasPendingSnapshot
  • raft: clarify conditions in unstable.stableTo
  • raft: rename raftLog.nextEnts to raftLog.nextCommittedEnts
  • raft: make Message.Snapshot nullable, halve struct size

Also, for the fix-ups to this PR, mind keeping them in a suffix of fixup commits for now, because I don't think I'll be able to find them again should they be squashed. Probably you're doing that anyway but can't hurt to mention it.

raft/log.go Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
raft/raft_test.go Show resolved Hide resolved
raft/raft_test.go Outdated Show resolved Hide resolved
raft/log.go Outdated Show resolved Hide resolved
raft/raft.go Outdated Show resolved Hide resolved
raft/raft.go Outdated Show resolved Hide resolved
raft/raft.go Outdated Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
Copy link
Contributor

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finishing up first pass.

raft/raft.go Show resolved Hide resolved
raft/raft.go Show resolved Hide resolved
raft/raft.go Outdated Show resolved Hide resolved
raft/rawnode.go Show resolved Hide resolved
raft/rawnode.go Outdated Show resolved Hide resolved
raft/rawnode_test.go Show resolved Hide resolved
raft/node.go Outdated Show resolved Hide resolved
raft/node.go Outdated Show resolved Hide resolved
raft/node.go Outdated Show resolved Hide resolved
raft/node.go Outdated Show resolved Hide resolved
raft/node.go Outdated Show resolved Hide resolved
raft/node.go Outdated Show resolved Hide resolved
@tbg
Copy link
Contributor

tbg commented Nov 14, 2022

I know it's tempting but please don't rebase, I'm worried we'll lose a lot of the review state. Mind just continuing the commit history with fixup commits (git commit --fixup :/^raft:.support.asynchronous for example) and no reordering so I can keep my sanity? Looks like there are a slew of little fixes and edits to come.

The review is pretty large but a lot of it is cosmetic. A few TODOs remain that should be addressed before merge, I tried to point them all out via comments. I'm in favor of deferring anything that doesn't need to be done now, given the size of this PR.

The main aspect of this PR that still needs a resolution is the "To/From" field use #14627 (comment), feels like reviewers are generally skeptical of this. Maybe we should schedule sync time so you can try to convince me again (or vice versa).

I also need to understand the uncommitted size tracking better https://github.com/etcd-io/etcd/pull/14627/files/1abfc66b165d234251dd590051629428380cbcc2..1ccb57531dfedf2f6094d4adb2b9f486a6425007#diff-b9adbc46e4a317ffbb3d11a66c38d6b9af41a09170d77d87efbd96d115da452f

The LogTerm abuse also seems unsavory - would not mind this as a follow-up fix - but then we have to worry about users picking up this first pass and then having to migrate to the next, so better to keep it simple.

@tbg tbg requested review from tbg and removed request for tbg November 14, 2022 11:39
This commit adds new proto fields and message types for the upcoming
async storage writes functionality. These proto changes are not yet
used.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Fixes etcd-io#12257.

This change adds opt-in support to raft to perform local storage writes
asynchronously from the raft state machine handling loop.

A new AsyncStorageWrites configuration instructs the raft node to write to its
local storage (raft log and state machine) using a request/response message
passing interface instead of the default `Ready`/`Advance` function call
interface. Local storage messages can be pipelined and processed asynchronously
(with respect to `Ready` iteration), facilitating reduced interference between
Raft proposals and increased batching of log appends and state machine
application. As a result, use of asynchronous storage writes can reduce
end-to-end commit latency and increase maximum throughput.

When AsyncStorageWrites is enabled, the `Ready.Message` slice will include new
`MsgStorageAppend` and `MsgStorageApply` messages. The messages will target a
`LocalAppendThread` and a `LocalApplyThread`, respectively. Messages to the same
target must be reliably processed in order. In other words, they can't be
dropped (like messages over the network) and those targeted at the same thread
can't be reordered. Messages to different targets can be processed in any order.

`MsgStorageAppend` carries Raft log entries to append, election votes to persist,
and snapshots to apply. All writes performed in response to a `MsgStorageAppend`
are expected to be durable. The message assumes the role of the Entries,
HardState, and Snapshot fields in Ready.

`MsgStorageApply` carries committed entries to apply. The message assumes
the role of the CommittedEntries field in Ready.

Local messages each carry one or more response messages which should be
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes. The storage
threads are not responsible for understanding the response messages, only
for delivering them to the correct target after performing the storage
write.

\## Design Considerations

- There must be no regression for existing users that do not enable `AsyncStorageWrites`.
  For instance, CommittedEntries must not wait on unstable entries to be stabilized in
  cases where a follower is given committed entries in a MsgApp.
- Asynchronous storage work should use a message passing interface, like the
  rest of this library.
- The Raft leader and followers should behave symmetrically. Both should be able
  to use asynchronous storage writes for log appends and entry application.
- The LocalAppendThread on a follower should be able to send MsgAppResp messages
  directly to the leader without passing back through the raft state machine
  handling loop.
- The `unstable` log should remain true to its name. It should hold entries
  until they are stable and should not rely on an intermediate reliable cache.
- Pseudo-targets should be assigned to messages that target the local storage
  systems to denote required ordering guarantees.
- Code should be maximally unified across `AsyncStorageWrites=false` and
  `AsyncStorageWrites=true`. `AsyncStorageWrites=false` should be a special case of
  `AsyncStorageWrites=true` where the library hides the possibility of asynchrony.
- It should be possible to apply snapshots asynchronously, even though a
  snapshot touches both the Raft log state and the state machine. The library
  should make this easy for users to handle by delaying all committed entries
  until after the snapshot has applied, so snapshot application can be handled
  by 1) flushing the apply thread, 2) sending the `MsgStorageAppend` that contains
  a snapshot to the `LocalAppendThread` to be applied.

\## Usage

When asynchronous storage writes is enabled, the responsibility of code using
the library is different from what is presented in raft/doc.go (which has been
updated to include a section about async storage writes). Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 in doc.go). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 in doc.go).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.

The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order. Each local storage message carries a slice of response messages that
must delivered after the corresponding storage write has been completed.

With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:

```go
for {
	select {
	case <-s.Ticker:
		n.Tick()
	case rd := <-s.Node.Ready():
		for _, m := range rd.Messages {
			switch m.To {
			case raft.LocalAppendThread:
				toAppend <- m
			case raft.LocalApplyThread:
				toApply <-m
			default:
				sendOverNetwork(m)
			}
		}
	case <-s.done:
		return
	}
}
```

Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:

```go
// append thread
go func() {
	for {
		select {
		case m := <-toAppend:
			saveToStorage(m.State, m.Entries, m.Snapshot)
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}

// apply thread
go func() {
	for {
		select {
		case m := <-toApply:
			for _, entry := range m.CommittedEntries {
				process(entry)
				if entry.Type == raftpb.EntryConfChange {
					var cc raftpb.ConfChange
					cc.Unmarshal(entry.Data)
					s.Node.ApplyConfChange(cc)
				}
			}
			send(m.Responses)
		case <-s.done:
			return
		}
	}
}
```

\## Compatibility

The library remains backwards compatible with existing users and the change does
not introduce any breaking changes. Users that do not set `AsyncStorageWrites`
to true in the `Config` struct will not notice a difference with this change.
This is despite the fact that the existing "synchronous storage writes"
interface was adapted to share a majority of the same code. For instance,
`Node.Advance` has been adapted to transparently acknowledge an asynchronous log
append attempt and an asynchronous state machine application attempt, internally
using the same message passing mechanism introduced in this change.

The change has no cross-version compatibility concerns. All changes are local to
a process and nodes using asynchronous storage writes appear to behave no
differently from the outside. Clusters are free to mix nodes running with and
without asynchronous storage writes.

\## Performance

The bulk of the performance evaluation of this functionality thus far has been
done with [rafttoy](https://github.com/nvanbenschoten/rafttoy), a benchmarking
harness developed to experiment with Raft proposal pipeline optimization. The
harness can be used to run single-node benchmarks or multi-node benchmarks. It
supports plugable raft logs, storage engines, network transports, and pipeline
implementations.

To evaluate this change, we fixed the raft log (`etcd/wal`), storage engine
(`pebble`), and network transport (`grpc`). We then built (nvanbenschoten/rafttoy#3)
a pipeline implementation on top of the new asynchronous storage writes
functionality and compared it against two other pipeline implementations.

The three pipeline implementations we compared were:
- **basic** (P1): baseline stock raft usage, similar to the code in `doc.go`
- **parallel append + early ack** (P2): CockroachDB's current pipeline, which includes
  two significant variations to the basic pipeline. The first is that it sends
  MsgApp messages to followers before writing to local Raft log (see [commit](cockroachdb/cockroach@b67eb69)
  for explanation), allowing log appends to occur in parallel across replicas.
  The second is that it acknowledges committed log entries before applying them
  (see [commit](cockroachdb/cockroach@87aaea7)
  for explanation).
- **async append + async apply + early ack** (P3): A pipelining using asynchronous storage
  writes with a separate append thread and a separate apply thread. Also uses the same
  early acknowledgement optimization from above to ack committed entries before handing
  them to the apply thread.

All testing was performed on a 3 node AWS cluster of m5.4xlarge instances with
gp3 EBS volumes (16000 IOPS, 1GB/s throughput).

![Throughput vs latency of Raft proposal pipeline implementations](https://user-images.githubusercontent.com/5438456/197925200-11352c09-569b-460c-ae42-effbf407c4e5.svg)

The comparison demonstrates two different benefits of asynchronous storage
writes.

The first is that it reduces end-to-end latency of proposals by 20-25%. For
instance, when serving 16MB/s of write traffic, P1's average latency was 13.2ms,
P2's average latency was 7.3ms, and P3's average latency was 5.24ms. This is a
reduction in average latency of 28% from the optimized pipeline that does not
use asynchronous storage writes. This matches expectations outlined in
cockroachdb/cockroach#17500.

The second is that it increases the maximum throughput at saturation. This is
because asynchronous storage writes can improve batching for both log appends
and log application. In this experiment, we saw the average append batch size
under saturation increase from 928 to 1542, which is a similar ratio to the
increase in peak throughput. We see a similar difference for apply batch sizes.

There is more benchmarking to do. For instance, we'll need to thoroughly verify
that this change does not negatively impact the performance of users of this
library that do not use asynchronous storage writes.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This commit makes it more clear that the asyncStorageWrites handling is
entirely local to RawNode and that the raft object always operates in
"async storage" mode.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This commit adds a new data-driven test the reproduces a scenario
similar to the one described in newStorageAppendRespMsg, exercising a
few interesting interactions between asynchronous storage writes, term
changes, and log truncation.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Pure code movement.

Eliminates asyncStorageWrites handling in node.go.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This commit removes certain cases where `MsgStorageAppendResp` messages
were attached as responses to a `MsgStorageAppend` message, even when
the response contained no useful information. The most common case where
this comes up is when the HardState changes but no new entries are
appended to the log.

Avoiding the response in these cases eliminates useless work.

Additionally, if the HardState does not include a new vote and only
includes a new Commit then there will be no response messages on the on
`MsgStorageAppend`. Users of this library can use this condition to
determine when an fsync is not necessary, similar to how it used to use
the `Ready.MustSync` flag.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This avoids a call to stable `Storage`. It turns a regression in firstIndex/op
from 2 to 3 (or 5 to 7) into an improvement from 2 to 1 (or 5 to 3).

```
name                     old firstIndex/op  new firstIndex/op  delta
RawNode/single-voter-10          3.00 ± 0%          1.00 ± 0%  -66.67%  (p=0.000 n=10+10)
RawNode/two-voters-10            7.00 ± 0%          3.00 ± 0%  -57.14%  (p=0.000 n=10+10)
```

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
@nvanbenschoten
Copy link
Contributor Author

Thanks @tbg for the thorough review! I'm still working through your comments and trying to thin out the threads. Please don't bother taking a look until I ping here, at which point we can bottom out on the remaining few threads.

I've also spent portions of the past week integrating this change back into CockroachDB. I'll be posting results in cockroachdb/cockroach#17500 later. The high-level summary is that integration of async storage writes (log appends only, not yet entry application) into cockroach does provide the same 20-40% average and tail latency improvements that we had simulated above. However, it doesn't provide the throughput improvements at the top end because cockroach becomes CPU bound earlier than rafttoy and fails to benefit much from the added opportunities for batching. That roughly matches expectations, as the latency win was always the goal here and the possibility of a throughput win in cockroach was speculative. I think it's also possible that the batching is more impactful for entry application than for log appends, so pulling that async in cockroach (at some later point) will still provide a throughput win.

This commit fixes the interactions between commit entry pagination and async
storage writes. The pagination now properly applies across multiple Ready
structs, acting as a limit on outstanding committed entries that have yet to be
acked through a MsgStorageApplyResp message.

The commit also resolves an abuse of the LogTerm field in MsgStorageApply{Resp}.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
@xiang90
Copy link
Contributor

xiang90 commented Dec 13, 2022

Thanks @tbg for the thorough review! I'm still working through your comments and trying to thin out the threads. Please don't bother taking a look until I ping here, at which point we can bottom out on the remaining few threads.

Amazing work! I assume this would also help etcd with tail latency.

@tbg
Copy link
Contributor

tbg commented Dec 21, 2022

etcd-io/raft#8

@tbg tbg closed this Dec 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

Proposal: support fully control fsync frequency in raft
8 participants