Skip to content

Commit

Permalink
kvserver: give testContext a real raft transport
Browse files Browse the repository at this point in the history
Some tests do add peers and so we'll see some use of the
raft transport. This doesn't have to work properly, just
not crash, which this achieves.

This is essentially a re-do of #69730 which was lost in #72383.

Release note: None
  • Loading branch information
tbg committed Dec 20, 2022
1 parent fc04d8e commit 6b5cb4c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 32 deletions.
58 changes: 28 additions & 30 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,26 +658,31 @@ func (r *Replica) AdminMerge(
return errors.Errorf("ranges not collocated; %s != %s", lReplicas, rReplicas)
}

// Ensure that every current replica of the LHS has been initialized.
// Otherwise there is a rare race where the replica GC queue can GC a
// replica of the RHS too early. The comment on
// TestStoreRangeMergeUninitializedLHSFollower explains the situation in full.
if err := waitForReplicasInit(
ctx, r.store.cfg.NodeDialer, origLeftDesc.RangeID, origLeftDesc.Replicas().Descriptors(),
); err != nil {
return errors.Wrap(err, "waiting for all left-hand replicas to initialize")
}
// Out of an abundance of caution, also ensure that replicas of the RHS have
// all been initialized. If for whatever reason the initial upreplication
// snapshot for a NON_VOTER on the RHS fails, it will have to get picked up
// by the raft snapshot queue to upreplicate and may be uninitialized at
// this point. As such, if we send a subsume request to the RHS in this sort
// of state, we will wastefully and unintentionally block all traffic on it
// for 5 seconds.
if err := waitForReplicasInit(
ctx, r.store.cfg.NodeDialer, rightDesc.RangeID, rightDesc.Replicas().Descriptors(),
); err != nil {
return errors.Wrap(err, "waiting for all right-hand replicas to initialize")
disableWaitForReplicasInTesting := r.store.TestingKnobs() != nil &&
r.store.TestingKnobs().DisableMergeWaitForReplicasInit

if !disableWaitForReplicasInTesting {
// Ensure that every current replica of the LHS has been initialized.
// Otherwise there is a rare race where the replica GC queue can GC a
// replica of the RHS too early. The comment on
// TestStoreRangeMergeUninitializedLHSFollower explains the situation in full.
if err := waitForReplicasInit(
ctx, r.store.cfg.NodeDialer, origLeftDesc.RangeID, origLeftDesc.Replicas().Descriptors(),
); err != nil {
return errors.Wrap(err, "waiting for all left-hand replicas to initialize")
}
// Out of an abundance of caution, also ensure that replicas of the RHS have
// all been initialized. If for whatever reason the initial upreplication
// snapshot for a NON_VOTER on the RHS fails, it will have to get picked up
// by the raft snapshot queue to upreplicate and may be uninitialized at
// this point. As such, if we send a subsume request to the RHS in this sort
// of state, we will wastefully and unintentionally block all traffic on it
// for 5 seconds.
if err := waitForReplicasInit(
ctx, r.store.cfg.NodeDialer, rightDesc.RangeID, rightDesc.Replicas().Descriptors(),
); err != nil {
return errors.Wrap(err, "waiting for all right-hand replicas to initialize")
}
}

mergeReplicas := lReplicas.Descriptors()
Expand Down Expand Up @@ -750,6 +755,9 @@ func (r *Replica) AdminMerge(

err = contextutil.RunWithTimeout(ctx, "waiting for merge application", mergeApplicationTimeout,
func(ctx context.Context) error {
if disableWaitForReplicasInTesting {
return nil
}
return waitForApplication(ctx, r.store.cfg.NodeDialer, rightDesc.RangeID, mergeReplicas,
rhsSnapshotRes.LeaseAppliedIndex)
})
Expand Down Expand Up @@ -820,11 +828,6 @@ func waitForApplication(
replicas []roachpb.ReplicaDescriptor,
leaseIndex uint64,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
repl := repl // copy for goroutine
Expand Down Expand Up @@ -854,11 +857,6 @@ func waitForReplicasInit(
rangeID roachpb.RangeID,
replicas []roachpb.ReplicaDescriptor,
) error {
if dialer == nil && len(replicas) == 1 {
// This early return supports unit tests (testContext{}) that also
// want to perform merges.
return nil
}
return contextutil.RunWithTimeout(ctx, "wait for replicas init", 5*time.Second, func(ctx context.Context) error {
g := ctxgroup.WithContext(ctx)
for _, repl := range replicas {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (tc *testContext) Start(ctx context.Context, t testing.TB, stopper *stop.St
// testContext tests like to move the manual clock around and assume that they can write at past
// timestamps.
cfg.TestingKnobs.DontCloseTimestamps = true
cfg.TestingKnobs.DisableMergeWaitForReplicasInit = true
tc.StartWithStoreConfig(ctx, t, stopper, cfg)
}

Expand Down Expand Up @@ -14008,7 +14009,7 @@ func TestStoreTenantMetricsAndRateLimiterRefcount(t *testing.T) {
Key: leftRepl.Desc().StartKey.AsRawKey(),
},
}, "testing")
require.Nil(t, pErr)
require.NoError(t, pErr.GoError())

// The store metrics no longer track tenant 123.
require.Equal(t,
Expand Down
10 changes: 9 additions & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,15 @@ func createTestStoreWithoutStart(
eng := storage.NewDefaultInMemForTesting()
stopper.AddCloser(eng)
require.Nil(t, cfg.Transport)
cfg.Transport = NewDummyRaftTransport(cfg.Settings, cfg.AmbientCtx.Tracer)

require.NotNil(t, cfg.Gossip) // was set above already
// Even though testContext is fundamentally a single-store test, some tests
// will try config changes, etc, so we will see some use of the transport
// and it's important that this doesn't cause crashes. Just set up the
// "real thing" since it's straightforward enough.
cfg.NodeDialer = nodedialer.New(rpcContext, gossip.AddressResolver(cfg.Gossip))
cfg.Transport = NewRaftTransport(cfg.AmbientCtx, cfg.Settings, cfg.Tracer(), cfg.NodeDialer, server, stopper)

stores := NewStores(cfg.AmbientCtx, cfg.Clock)
nodeDesc := &roachpb.NodeDescriptor{NodeID: 1}

Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ type StoreTestingKnobs struct {
// Replica.executeReadOnlyBatch after checks have successfully determined
// execution can proceed but a storage snapshot has not been acquired.
PreStorageSnapshotButChecksCompleteInterceptor func(replica *Replica)

// DisableMergeWaitForReplicasInit skips the waitForReplicasInit calls
// during merges. Useful for testContext tests that want to use merges.
DisableMergeWaitForReplicasInit bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 6b5cb4c

Please sign in to comment.