Skip to content

Commit

Permalink
Merge pull request #110 from erikgrinaker/accept-more-raft-snaps
Browse files Browse the repository at this point in the history
Accept any snapshot that allows replication
  • Loading branch information
ahrtr authored Jan 10, 2024
2 parents 25e4bb1 + d87942f commit f1c02c9
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 30 deletions.
15 changes: 7 additions & 8 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,10 +1478,13 @@ func stepLeader(r *raft, m pb.Message) error {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
// TODO(tbg): we should also enter this branch if a snapshot is
// received that is below pr.PendingSnapshot but which makes it
// possible to use the log again.
case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex():
// Note that we don't take into account PendingSnapshot to
// enter this branch. No matter at which index a snapshot
// was actually applied, as long as this allows catching up
// the follower from the log, we will accept it. This gives
// systems more flexibility in how they implement snapshots;
// see the comments on PendingSnapshot.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
Expand Down Expand Up @@ -1560,10 +1563,6 @@ func stepLeader(r *raft, m pb.Message) error {
if pr.State != tracker.StateSnapshot {
return nil
}
// TODO(tbg): this code is very similar to the snapshot handling in
// MsgAppResp above. In fact, the code there is more correct than the
// code here and should likely be updated to match (or even better, the
// logic pulled into a newly created Progress state machine handler).
if !m.Reject {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
Expand Down
8 changes: 7 additions & 1 deletion rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
//
// Example:
//
// deliver-msgs <idx>
// deliver-msgs <idx> type=MsgApp drop=(2,3)
err = env.handleDeliverMsgs(t, d)
case "process-ready":
// Example:
Expand Down Expand Up @@ -149,6 +149,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
//
// forget-leader 1
err = env.handleForgetLeader(t, d)
case "send-snapshot":
// Sends a snapshot to a node. Takes the source and destination node.
// The message will be queued, but not delivered automatically.
//
// Example: send-snapshot 1 3
env.handleSendSnapshot(t, d)
case "propose":
// Propose an entry.
//
Expand Down
21 changes: 15 additions & 6 deletions rafttest/interaction_env_handler_deliver_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
)

func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData) error {
var typ raftpb.MessageType = -1
var rs []Recipient
for _, arg := range d.CmdArgs {
if len(arg.Vals) == 0 {
Expand All @@ -50,11 +51,19 @@ func (env *InteractionEnv) handleDeliverMsgs(t *testing.T, d datadriven.TestData
t.Fatalf("can't both deliver and drop msgs to %d", id)
}
rs = append(rs, Recipient{ID: id, Drop: true})
case "type":
var s string
arg.Scan(t, i, &s)
v, ok := raftpb.MessageType_value[s]
if !ok {
t.Fatalf("unknown message type %s", s)
}
typ = raftpb.MessageType(v)
}
}
}

if n := env.DeliverMsgs(rs...); n == 0 {
if n := env.DeliverMsgs(typ, rs...); n == 0 {
env.Output.WriteString("no messages\n")
}
return nil
Expand All @@ -66,14 +75,14 @@ type Recipient struct {
}

// DeliverMsgs goes through env.Messages and, depending on the Drop flag,
// delivers or drops messages to the specified Recipients. Returns the
// number of messages handled (i.e. delivered or dropped). A handled message
// is removed from env.Messages.
func (env *InteractionEnv) DeliverMsgs(rs ...Recipient) int {
// delivers or drops messages to the specified Recipients. Only messages of type
// typ are delivered (-1 for all types). Returns the number of messages handled
// (i.e. delivered or dropped). A handled message is removed from env.Messages.
func (env *InteractionEnv) DeliverMsgs(typ raftpb.MessageType, rs ...Recipient) int {
var n int
for _, r := range rs {
var msgs []raftpb.Message
msgs, env.Messages = splitMsgs(env.Messages, r.ID, r.Drop)
msgs, env.Messages = splitMsgs(env.Messages, r.ID, typ, r.Drop)
n += len(msgs)
for _, msg := range msgs {
if r.Drop {
Expand Down
11 changes: 5 additions & 6 deletions rafttest/interaction_env_handler_process_append_thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rafttest

import (
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -86,13 +87,11 @@ func processAppend(n *Node, st raftpb.HardState, ents []raftpb.Entry, snap raftp
return err
}
}
if err := s.Append(ents); err != nil {
return err
}
if !raft.IsEmptySnap(snap) {
if err := s.ApplySnapshot(snap); err != nil {
return err
if len(ents) > 0 {
return errors.New("can't apply snapshot and entries at the same time")
}
return s.ApplySnapshot(snap)
}
return nil
return s.Append(ents)
}
50 changes: 50 additions & 0 deletions rafttest/interaction_env_handler_send_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"testing"

"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

func (env *InteractionEnv) handleSendSnapshot(t *testing.T, d datadriven.TestData) error {
idxs := nodeIdxs(t, d)
require.Len(t, idxs, 2)
return env.SendSnapshot(idxs[0], idxs[1])
}

// SendSnapshot sends a snapshot.
func (env *InteractionEnv) SendSnapshot(fromIdx, toIdx int) error {
snap, err := env.Nodes[fromIdx].Snapshot()
if err != nil {
return err
}
from, to := uint64(fromIdx+1), uint64(toIdx+1)
msg := raftpb.Message{
Type: raftpb.MsgSnap,
Term: env.Nodes[fromIdx].BasicStatus().Term,
From: from,
To: to,
Snapshot: &snap,
}
env.Messages = append(env.Messages, msg)
_, _ = env.Output.WriteString(raft.DescribeMessage(msg, nil))
return nil
}
10 changes: 6 additions & 4 deletions rafttest/interaction_env_handler_stabilize.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
id := rn.Status().ID
// NB: we grab the messages just to see whether to print the header.
// DeliverMsgs will do it again.
if msgs, _ := splitMsgs(env.Messages, id, false /* drop */); len(msgs) > 0 {
if msgs, _ := splitMsgs(env.Messages, id, -1 /* typ */, false /* drop */); len(msgs) > 0 {
fmt.Fprintf(env.Output, "> %d receiving messages\n", id)
env.withIndent(func() { env.DeliverMsgs(Recipient{ID: id}) })
env.withIndent(func() { env.DeliverMsgs(-1 /* typ */, Recipient{ID: id}) })
done = false
}
}
Expand Down Expand Up @@ -112,10 +112,12 @@ func (env *InteractionEnv) Stabilize(idxs ...int) error {
}
}

func splitMsgs(msgs []raftpb.Message, to uint64, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
// splitMsgs extracts messages for the given recipient of the given type (-1 for
// all types) from msgs, and returns them along with the remainder of msgs.
func splitMsgs(msgs []raftpb.Message, to uint64, typ raftpb.MessageType, drop bool) (toMsgs []raftpb.Message, rmdr []raftpb.Message) {
// NB: this method does not reorder messages.
for _, msg := range msgs {
if msg.To == to && !(drop && isLocalMsg(msg)) {
if msg.To == to && !(drop && isLocalMsg(msg)) && (typ < 0 || msg.Type == typ) {
toMsgs = append(toMsgs, msg)
} else {
rmdr = append(rmdr, msg)
Expand Down
163 changes: 163 additions & 0 deletions testdata/snapshot_succeed_via_app_resp_behind.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# This is a variant of snapshot_succeed_via_app_resp in which the snapshot
# that is being sent is behind the PendingSnapshot index tracked by the leader.

# Turn off output during the setup of the test.
log-level none
----
ok

# Start with three nodes, but the third is disconnected from the log.
add-nodes 2 voters=(1,2,3) index=10
----
ok

add-nodes 1 voters=(1,2,3) index=5
----
ok

# Elect 1 as leader. We first stabilize 3 to process the vote, then stabilize 1
# and 2 to complete the leader election. We don't stabilize 3 after the
# election, so that it does not receive and process any MsgApp yet.
campaign 1
----
ok

process-ready 1
----
ok

stabilize 3
----
ok

stabilize 1 2
----
ok

log-level debug
----
ok

# We now have a leader at index 11 (it appended an empty entry when elected). 3
# is still at index 5, and has not received any MsgApp from the leader yet.
raft-state
----
1: StateLeader (Voter) Term:1 Lead:1
2: StateFollower (Voter) Term:1 Lead:1
3: StateFollower (Voter) Term:1 Lead:0

status 1
----
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateProbe match=0 next=11 paused inactive

raft-log 3
----
log is empty: first index=6, last index=5

# Send a manual snapshot from 1 to 3, which will be at index 11. This snapshot
# does not move 3 to StateSnapshot.
send-snapshot 1 3
----
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# Propose and commit an additional entry, which makes the leader's
# last index 12, beyond the snapshot it sent at index 11.
log-level none
----
ok

propose 1 "foo"
----
ok

stabilize 1 2
----
ok

log-level debug
----
ok

status 1
----
1: StateReplicate match=12 next=13
2: StateReplicate match=12 next=13
3: StateProbe match=0 next=11 paused inactive

# 3 now gets the first MsgApp the leader originally sent, trying to append entry
# 11 but this is rejected because the follower's log started at index 5.
deliver-msgs 3 type=MsgApp
----
1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""]
DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1

# Note below that the RejectionHint is 5, which is below the first index 10 of the
# leader. Once the leader receives this, it will move 3 into StateSnapshot with
# PendingSnapshot=lastIndex=12.
process-ready 3
----
Ready MustSync=false:
Lead:1 State:StateFollower
Messages:
3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5)

# 3 receives and applies the snapshot, but doesn't respond with MsgAppResp yet.
deliver-msgs 3
----
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
INFO log [committed=5, applied=5, applying=5, unstable.offset=6, unstable.offsetInProgress=6, len(unstable.Entries)=0] starts to restore snapshot [index: 11, term: 1]
INFO 3 switched to configuration voters=(1 2 3)
INFO 3 [commit: 11, lastindex: 11, lastterm: 1] restored snapshot [index: 11, term: 1]
INFO 3 [commit: 11] restored snapshot [index: 11, term: 1]


# 1 sees the MsgApp rejection and asks for a snapshot at index 12 (which is 1's
# current last index).
stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5)
DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10
DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6]
DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6]
DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# Drop the extra MsgSnap(index=12) that 1 just sent, to keep the test tidy.
deliver-msgs drop=(3)
----
dropped: 1->3 MsgSnap Term:1 Log:0/0 Snapshot: Index:12 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false

# 3 sends the affirmative MsgAppResp that resulted from applying the snapshot
# at index 11.
stabilize 3
----
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:11
Snapshot Index:11 Term:1 ConfState:Voters:[1 2 3] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false
Messages:
3->1 MsgAppResp Term:1 Log:0/11

stabilize 1
----
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/11
DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=12]
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/11 Commit:12 Entries:[1/12 EntryNormal "\"foo\""]

# 3 is in StateReplicate thanks to receiving the snapshot at index 11.
# This is despite its PendingSnapshot having been 12.
status 1
----
1: StateReplicate match=12 next=13
2: StateReplicate match=12 next=13
3: StateReplicate match=11 next=13 inflight=1
Loading

0 comments on commit f1c02c9

Please sign in to comment.