Skip to content

Commit

Permalink
Merge pull request #8 from nvanbenschoten/nvanbenschoten/asyncRaftLogMsg
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg authored Dec 21, 2022
2 parents 91eb751 + 09c91d8 commit 65a0bf3
Show file tree
Hide file tree
Showing 41 changed files with 3,731 additions and 621 deletions.
1 change: 1 addition & 0 deletions diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func mustTemp(pre, body string) string {
func ltoa(l *raftLog) string {
s := fmt.Sprintf("lastIndex: %d\n", l.lastIndex())
s += fmt.Sprintf("applied: %d\n", l.applied)
s += fmt.Sprintf("applying: %d\n", l.applying)
for i, e := range l.allEntries() {
s += fmt.Sprintf("#%d: %+v\n", i, e)
}
Expand Down
100 changes: 100 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,94 @@ given ID MUST be used only once even if the old node has been removed.
This means that for example IP addresses make poor node IDs since they
may be reused. Node IDs must be non-zero.
# Usage with Asynchronous Storage Writes
The library can be configured with an alternate interface for local storage
writes that can provide better performance in the presence of high proposal
concurrency by minimizing interference between proposals. This feature is called
AsynchronousStorageWrites, and can be enabled using the flag on the Config
struct with the same name.
When Asynchronous Storage Writes is enabled, the responsibility of code using
the library is different from what was presented above. Users still read from
the Node.Ready() channel. However, they process the updates it contains in a
different manner. Users no longer consult the HardState, Entries, and Snapshot
fields (steps 1 and 3 above). They also no longer call Node.Advance() to
indicate that they have processed all entries in the Ready (step 4 above).
Instead, all local storage operations are also communicated through messages
present in the Ready.Message slice.
The local storage messages come in two flavors. The first flavor is log append
messages, which target a LocalAppendThread and carry Entries, HardState, and a
Snapshot. The second flavor is entry application messages, which target a
LocalApplyThread and carry CommittedEntries. Messages to the same target must be
reliably processed in order. Messages to different targets can be processed in
any order.
Each local storage message carries a slice of response messages that must
delivered after the corresponding storage write has been completed. These
responses may target the same node or may target other nodes.
With Asynchronous Storage Writes enabled, the total state machine handling loop
will look something like this:
for {
select {
case <-s.Ticker:
n.Tick()
case rd := <-s.Node.Ready():
for _, m := range rd.Messages {
switch m.To {
case raft.LocalAppendThread:
toAppend <- m
case raft.LocalApplyThread:
toApply <-m
default:
sendOverNetwork(m)
}
}
case <-s.done:
return
}
}
Usage of Asynchronous Storage Writes will typically also contain a pair of
storage handler threads, one for log writes (append) and one for entry
application to the local state machine (apply). Those will look something like:
// append thread
go func() {
for {
select {
case m := <-toAppend:
saveToStorage(m.State, m.Entries, m.Snapshot)
send(m.Responses)
case <-s.done:
return
}
}
}
// apply thread
go func() {
for {
select {
case m := <-toApply:
for _, entry := range m.CommittedEntries {
process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
s.Node.ApplyConfChange(cc)
}
}
send(m.Responses)
case <-s.done:
return
}
}
}
# Implementation notes
This implementation is up to date with the final Raft thesis
Expand Down Expand Up @@ -295,5 +383,17 @@ stale log entries:
that the follower that sent this 'MsgUnreachable' is not reachable, often
indicating 'MsgApp' is lost. When follower's progress state is replicate,
the leader sets it back to probe.
'MsgStorageAppend' is a message from a node to its local append storage
thread to write entries, hard state, and/or a snapshot to stable storage.
The message will carry one or more responses, one of which will be a
'MsgStorageAppendResp' back to itself. The responses can also contain
'MsgAppResp', 'MsgVoteResp', and 'MsgPreVoteResp' messages. Used with
AsynchronousStorageWrites.
'MsgStorageApply' is a message from a node to its local apply storage
thread to apply committed entries. The message will carry one response,
which will be a 'MsgStorageApplyResp' back to itself. Used with
AsynchronousStorageWrites.
*/
package raft
Loading

0 comments on commit 65a0bf3

Please sign in to comment.