Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: force-campaign leaseholder on leader removal #104969

Merged
merged 3 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ var (
// partition from stealing away leadership from an established leader
// (assuming they have an up-to-date log, which they do with a read-only
// workload). With asymmetric or partial network partitions, this can allow an
// unreachable node to steal away leadership, leading to range unavailability.
// unreachable node to steal leadership away from the leaseholder, leading to
// range unavailability if the leaseholder can no longer reach the leader.
//
// The asymmetric partition concerns have largely been addressed by RPC
// dialback (see rpc.dialback.enabled), but the partial partition concerns
Expand Down
258 changes: 258 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5933,6 +5933,161 @@ func TestRaftSnapshotsWithMVCCRangeKeysEverywhere(t *testing.T) {
}
}

// TestRaftCampaignPreVoteCheckQuorum tests that campaignLocked() respects
// PreVote+CheckQuorum, by not granting prevotes if there is an active leader.
func TestRaftCampaignPreVoteCheckQuorum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
RaftEnableCheckQuorum: true,
RaftTickInterval: 100 * time.Millisecond, // speed up test
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

sender := tc.GetFirstStoreFromServer(t, 0).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, sender, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repls := []*kvserver.Replica{repl1, repl2, repl3}

// Make sure n1 is leader.
initialStatus := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)
t.Logf("n1 is leader")

// Campaign n3. It shouldn't win prevotes, reverting to follower
// in the current term.
repl3.Campaign(ctx)
t.Logf("n3 campaigning")

require.Eventually(t, func() bool {
status := repl3.RaftStatus()
logStatus(status)
return status.RaftState == raft.StateFollower
}, 10*time.Second, 500*time.Millisecond)
t.Logf("n3 reverted to follower")

// n1 should still be the leader in the same term, with n2 and n3 followers.
for _, repl := range repls {
st := repl.RaftStatus()
logStatus(st)
if st.ID == 1 {
require.Equal(t, raft.StateLeader, st.RaftState)
} else {
require.Equal(t, raft.StateFollower, st.RaftState)
}
require.Equal(t, initialStatus.Term, st.Term)
}
}

// TestRaftForceCampaignPreVoteCheckQuorum tests that forceCampaignLocked()
// ignores PreVote+CheckQuorum, transitioning directly to candidate and bumping
// the term. It may not actually win or hold onto leadership, but bumping the
// term is proof enough that it called an election.
func TestRaftForceCampaignPreVoteCheckQuorum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx := context.Background()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
RaftEnableCheckQuorum: true,
RaftTickInterval: 200 * time.Millisecond, // speed up test
RaftHeartbeatIntervalTicks: 10, // allow n3 to win the election
RaftElectionTimeoutTicks: 20,
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

sender := tc.GetFirstStoreFromServer(t, 0).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, sender, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)
repls := []*kvserver.Replica{repl1, repl2, repl3}

// Make sure n1 is leader.
initialStatus := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, initialStatus.RaftState)
logStatus(initialStatus)
t.Logf("n1 is leader in term %d", initialStatus.Term)

// Force-campaign n3. It may not win or hold onto leadership, but it's enough
// to know that it bumped the term.
repl3.ForceCampaign(ctx)
t.Logf("n3 campaigning")

var leaderStatus *raft.Status
require.Eventually(t, func() bool {
for _, repl := range repls {
st := repl.RaftStatus()
logStatus(st)
if st.Term <= initialStatus.Term {
return false
}
if st.RaftState == raft.StateLeader {
leaderStatus = st
}
}
return leaderStatus != nil
}, 10*time.Second, 500*time.Millisecond)
t.Logf("n%d is leader, with bumped term %d", leaderStatus.ID, leaderStatus.Term)
}

// TestRaftPreVote tests that Raft PreVote works properly, including the recent
// leader check only enabled via CheckQuorum. Specifically, a replica that's
// partitioned away from the leader (or restarted) should not be able to call an
Expand Down Expand Up @@ -6324,3 +6479,106 @@ func TestRaftCheckQuorum(t *testing.T) {
})
})
}

// TestRaftLeaderRemovesItself tests that when a raft leader removes itself via
// a conf change the leaseholder campaigns for leadership, ignoring
// PreVote+CheckQuorum and transitioning directly to candidate.
//
// We set up three replicas:
//
// n1: Raft leader
// n2: follower
// n3: follower + leaseholder
//
// We disable leader following the leaseholder (which would otherwise transfer
// leadership if we happened to tick during the conf change), and then remove n1
// from the range. n3 should acquire leadership.
//
// We disable election timeouts, such that the only way n3 can become leader is
// by campaigning explicitly. Furthermore, it must skip pre-votes, since with
// PreVote+CheckQuorum n2 wouldn't vote for it (it would think n1 was still the
// leader).
func TestRaftLeaderRemovesItself(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Timing-sensitive, so skip under deadlock detector and stressrace.
skip.UnderDeadlock(t)
skip.UnderStressRace(t)

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
RaftEnableCheckQuorum: true,
RaftTickInterval: 100 * time.Millisecond, // speed up test
// Set a large election timeout. We don't want replicas to call
// elections due to timeouts, we want them to campaign and obtain
// votes despite PreVote+CheckQuorum.
RaftElectionTimeoutTicks: 200,
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableLeaderFollowsLeaseholder: true, // the leader should stay put
},
},
},
})
defer tc.Stopper().Stop(ctx)

logStatus := func(s *raft.Status) {
t.Helper()
require.NotNil(t, s)
t.Logf("n%d %s at term=%d commit=%d", s.ID, s.RaftState, s.Term, s.Commit)
}

send1 := tc.GetFirstStoreFromServer(t, 0).TestSender()
send3 := tc.GetFirstStoreFromServer(t, 2).TestSender()

// Create a range, upreplicate it, and replicate a write.
key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
_, pErr := kv.SendWrapped(ctx, send1, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{1, 1, 1})

repl1, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
require.NoError(t, err)
repl2, err := tc.GetFirstStoreFromServer(t, 1).GetReplica(desc.RangeID)
require.NoError(t, err)
repl3, err := tc.GetFirstStoreFromServer(t, 2).GetReplica(desc.RangeID)
require.NoError(t, err)

// Move the lease to n3, and make sure everyone has applied it.
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(2))
require.Eventually(t, func() bool {
lease, _ := repl3.GetLease()
return lease.Replica.ReplicaID == repl3.ReplicaID()
}, 10*time.Second, 500*time.Millisecond)
_, pErr = kv.SendWrapped(ctx, send3, incrementArgs(key, 1))
require.NoError(t, pErr.GoError())
tc.WaitForValues(t, key, []int64{2, 2, 2})
t.Logf("n3 has lease")

// Make sure n1 is still leader.
st := repl1.RaftStatus()
require.Equal(t, raft.StateLeader, st.RaftState)
logStatus(st)

// Remove n1 and wait for n3 to become leader.
tc.RemoveVotersOrFatal(t, key, tc.Target(0))
t.Logf("n1 removed from range")

require.Eventually(t, func() bool {
logStatus(repl2.RaftStatus())
logStatus(repl3.RaftStatus())
if repl3.RaftStatus().RaftState == raft.StateLeader {
t.Logf("n3 is leader")
return true
}
return false
}, 10*time.Second, 500*time.Millisecond)
}
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,20 @@ func (r *Replica) LastAssignedLeaseIndex() kvpb.LeaseAppliedIndex {
return r.mu.proposalBuf.LastAssignedLeaseIndexRLocked()
}

// Campaign campaigns the replica.
func (r *Replica) Campaign(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.campaignLocked(ctx)
}

// ForceCampaign force-campaigns the replica.
func (r *Replica) ForceCampaign(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.forceCampaignLocked(ctx)
}

// LastAssignedLeaseIndexRLocked is like LastAssignedLeaseIndex, but requires
// b.mu to be held in read mode.
func (b *propBuf) LastAssignedLeaseIndexRLocked() kvpb.LeaseAppliedIndex {
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type proposer interface {
// The following require the proposer to hold an exclusive lock.
withGroupLocked(func(proposerRaft) error) error
registerProposalLocked(*ProposalData)
campaignLocked(ctx context.Context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a comment might be helpful - most of the callers have a *RawNode* in scope, but this method also enqueues to the scheduler.

Copy link
Contributor Author

@erikgrinaker erikgrinaker Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that problematic though? This was mostly to avoid having to write the comment all over again at the call site (and yes, I realize the interface will mask the comment anyway, but presumably people would go to the actual implementation and read it there).

// rejectProposalWithRedirectLocked rejects a proposal and redirects the
// proposer to try it on another node. This is used to sometimes reject lease
// acquisitions when another replica is the leader; the intended consequence
Expand Down Expand Up @@ -670,9 +671,7 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
b.p.rejectProposalWithRedirectLocked(ctx, p, li.leader)
if b.p.shouldCampaignOnRedirect(raftGroup) {
log.VEventf(ctx, 2, "campaigning because Raft leader not live in node liveness map")
if err := raftGroup.Campaign(); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
b.p.campaignLocked(ctx)
}
return true
}
Expand Down Expand Up @@ -1340,6 +1339,10 @@ func (rp *replicaProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool
)
}

func (rp *replicaProposer) campaignLocked(ctx context.Context) {
(*Replica)(rp).campaignLocked(ctx)
}

func (rp *replicaProposer) flowControlHandle(ctx context.Context) kvflowcontrol.Handle {
handle, found := rp.mu.replicaFlowControlIntegration.handle()
if !found {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ func (t *testProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool {
return t.leaderNotLive
}

func (t *testProposer) campaignLocked(ctx context.Context) {
if err := t.raftGroup.Campaign(); err != nil {
panic(err)
}
}

func (t *testProposer) rejectProposalWithRedirectLocked(
_ context.Context, _ *ProposalData, redirectTo roachpb.ReplicaID,
) {
Expand Down
Loading