Skip to content

Commit

Permalink
Merge #72383
Browse files Browse the repository at this point in the history
72383: kvserver: use AdminSplit in splitTestRange r=erikgrinaker a=tbg

`splitTestRange` was previously reaching into the store to finagle a
split. It is used by around a dozen tests. Prototyping around #72374
has shown that these tests frequently need patching up whenever we
adjust (improve) the store's replica handling.

This is a time suck and besides, we also want to be able to test
the Store from within the `kvserver` (not `kvserver_test`) package.
So if we can make that happen, and can use AdminSplit, that would
be preferrable.

AdminSplit requires them to run a (somewhat) distributed multi-range
transaction. A first split would hit a single range, but after that the
split batch hits at least two ranges (meta2 and splitKey), and so we
need nontrivial DistSender-like functionality. Splits are also
nontrivial distributed transactions and so we need a TxnCoordSender.
Experience suggests that it's better to use the "real thing" and to
make sure it's configurable enough to fit the use case, rather than
whipping up half-baked replacements.

Luckily, it turned out that DistSender and TxnCoordSender are already
up to the task, and this commit adopts them in
`createTestStoreWithoutStart`, and changes `splitTestRange` to use
`AdminSplit`.

Release note: None


Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
  • Loading branch information
craig[bot] and tbg committed Nov 8, 2021
2 parents a0849b1 + 6ac194a commit f29118b
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 424 deletions.
48 changes: 27 additions & 21 deletions pkg/kv/kvserver/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,17 +479,23 @@ func TestGCQueueProcess(t *testing.T) {
ts3 := makeTS(now-intentAgeThreshold.Nanoseconds(), 0) // 2h old
ts4 := makeTS(now-(intentAgeThreshold.Nanoseconds()-1), 0) // 2h-1ns old
ts5 := makeTS(now-1e9, 0) // 1s old
key1 := roachpb.Key("a")
key2 := roachpb.Key("b")
key3 := roachpb.Key("c")
key4 := roachpb.Key("d")
key5 := roachpb.Key("e")
key6 := roachpb.Key("f")
key7 := roachpb.Key("g")
key8 := roachpb.Key("h")
key9 := roachpb.Key("i")
key10 := roachpb.Key("j")
key11 := roachpb.Key("k")
mkKey := func(suff string) roachpb.Key {
var k roachpb.Key
k = append(k, keys.ScratchRangeMin...)
k = append(k, suff...)
return k
}
key1 := mkKey("a")
key2 := mkKey("b")
key3 := mkKey("c")
key4 := mkKey("d")
key5 := mkKey("e")
key6 := mkKey("f")
key7 := mkKey("g")
key8 := mkKey("h")
key9 := mkKey("i")
key10 := mkKey("j")
key11 := mkKey("k")

data := []struct {
key roachpb.Key
Expand Down Expand Up @@ -582,16 +588,16 @@ func TestGCQueueProcess(t *testing.T) {
}

// The total size of the GC'able versions of the keys and values in Info.
// Key size: len("a") + MVCCVersionTimestampSize (13 bytes) = 14 bytes.
// Key size: len(scratch+"a") + MVCCVersionTimestampSize (13 bytes) = 15 bytes.
// Value size: len("value") + headerSize (5 bytes) = 10 bytes.
// key1 at ts1 (14 bytes) => "value" (10 bytes)
// key2 at ts1 (14 bytes) => "value" (10 bytes)
// key3 at ts1 (14 bytes) => "value" (10 bytes)
// key4 at ts1 (14 bytes) => "value" (10 bytes)
// key5 at ts1 (14 bytes) => "value" (10 bytes)
// key5 at ts2 (14 bytes) => delete (0 bytes)
// key10 at ts1 (14 bytes) => delete (0 bytes)
var expectedVersionsKeyBytes int64 = 7 * 14
// key1 at ts1 (15 bytes) => "value" (10 bytes)
// key2 at ts1 (15 bytes) => "value" (10 bytes)
// key3 at ts1 (15 bytes) => "value" (10 bytes)
// key4 at ts1 (15 bytes) => "value" (10 bytes)
// key5 at ts1 (15 bytes) => "value" (10 bytes)
// key5 at ts2 (15 bytes) => delete (0 bytes)
// key10 at ts1 (15 bytes) => delete (0 bytes)
var expectedVersionsKeyBytes int64 = 7 * 15
var expectedVersionsValBytes int64 = 5 * 10

// Call Run with dummy functions to get current Info.
Expand Down Expand Up @@ -670,7 +676,7 @@ func TestGCQueueProcess(t *testing.T) {
return err
}
for i, kv := range kvs {
log.VEventf(ctx, 1, "%d: %s", i, kv.Key)
t.Logf("%d: %s", i, kv.Key)
}
if len(kvs) != len(expKVs) {
return fmt.Errorf("expected length %d; got %d", len(expKVs), len(kvs))
Expand Down
256 changes: 55 additions & 201 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -48,8 +46,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand All @@ -59,7 +55,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -180,93 +175,38 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
) {
tc.TB = t
ctx := context.Background()
// Setup fake zone config handler.
config.TestingSetupZoneConfigHook(stopper)
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: cfg.AmbientCtx,
Config: &base.Config{Insecure: true},
Clock: cfg.Clock,
Stopper: stopper,
Settings: cfg.Settings,
})
grpcServer := rpc.NewServer(rpcContext) // never started
require.Nil(t, tc.gossip)
tc.gossip = gossip.NewTest(1, rpcContext, grpcServer, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
require.Nil(t, tc.transport)
dialer := nodedialer.New(rpcContext, gossip.AddressResolver(tc.gossip))
tc.transport = NewRaftTransport(cfg.AmbientCtx, cfg.Settings, dialer, grpcServer, stopper)

require.Nil(t, tc.engine)
disableSeparatedIntents :=
!cfg.Settings.Version.ActiveVersionOrEmpty(context.Background()).IsActive(
clusterversion.PostSeparatedIntentsMigration)
log.Infof(context.Background(), "engine creation is randomly setting disableSeparatedIntents: %t",
disableSeparatedIntents)

var err error
tc.engine, err = storage.Open(context.Background(),
storage.InMemory(),
storage.Attributes(roachpb.Attributes{Attrs: []string{"dc1", "mem"}}),
storage.MaxSize(1<<20),
storage.SetSeparatedIntents(disableSeparatedIntents),
storage.Settings(cfg.Settings))
require.NoError(t, err)
stopper.AddCloser(tc.engine)

require.Nil(t, tc.store)
cv := clusterversion.ClusterVersion{Version: bootstrapVersion}
cfg.Gossip = tc.gossip
cfg.Transport = tc.transport
cfg.StorePool = NewTestStorePool(cfg)
// Create a test sender without setting a store. This is to deal with the
// circular dependency between the test sender and the store. The actual
// store will be passed to the sender after it is created and bootstrapped.
factory := &testSenderFactory{}
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper)

require.NoError(t, WriteClusterVersion(ctx, tc.engine, cv))
if err := InitEngine(ctx, tc.engine, roachpb.StoreIdent{
ClusterID: uuid.MakeV4(),
NodeID: 1,
StoreID: 1,
}); err != nil {
t.Fatal(err)
}
if err := clusterversion.Initialize(ctx, cv.Version, &cfg.Settings.SV); err != nil {
t.Fatal(err)
}
tc.store = NewStore(ctx, cfg, tc.engine, &roachpb.NodeDescriptor{NodeID: 1})
// Now that we have our actual store, monkey patch the factory used in cfg.DB.
factory.setStore(tc.store)
// We created the store without a real KV client, so it can't perform splits
// or merges.
tc.store.splitQueue.SetDisabled(true)
tc.store.mergeQueue.SetDisabled(true)

require.Nil(t, tc.repl)
if err := WriteInitialClusterData(
ctx, tc.store.Engine(),
nil, /* initialValues */
bootstrapVersion,
1 /* numStores */, nil /* splits */, cfg.Clock.PhysicalNow(),
cfg.TestingKnobs,
); err != nil {
t.Fatal(err)
}
if err := tc.store.Start(ctx, stopper); err != nil {
t.Fatal(err)
}
tc.store.WaitForInit()
tc.repl, err = tc.store.GetReplica(1)
if err != nil {
t.Fatal(err)
}
tc.rangeID = tc.repl.RangeID

if err := tc.initConfigs(t); err != nil {
// NB: this also sets up fake zone config handlers via TestingSetupZoneConfigHook.
//
// TODO(tbg): the above is not good, figure out which tests need this and make them
// call it directly.
//
// NB: split queue, merge queue, and scanner are also disabled.
store := createTestStoreWithoutStart(
t, stopper, testStoreOpts{
createSystemRanges: false,
bootstrapVersion: bootstrapVersion,
}, &cfg,
)
if err := store.Start(ctx, stopper); err != nil {
t.Fatal(err)
}
store.WaitForInit()
repl, err := store.GetReplica(1)
require.NoError(t, err)
tc.repl = repl
tc.rangeID = repl.RangeID
tc.gossip = store.cfg.Gossip
tc.transport = store.cfg.Transport
tc.engine = store.engine
tc.store = store
// TODO(tbg): see if this is needed. Would like to remove it.
require.NoError(t, tc.initConfigs(t))
}

func (tc *testContext) Sender() kv.Sender {
Expand Down Expand Up @@ -1223,6 +1163,12 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) {
tc.manualClock.Increment(11 + int64(tc.Clock().MaxOffset())) // advance time
now = tc.Clock().NowAsClockTimestamp()

ch := tc.gossip.RegisterSystemConfigChannel()
select {
case <-ch:
default:
}

// Give lease to this range.
if err := sendLeaseRequest(tc.repl, &roachpb.Lease{
Start: now.ToTimestamp().Add(11, 0).UnsafeToClockTimestamp(),
Expand All @@ -1237,16 +1183,19 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
cfg := tc.gossip.GetSystemConfig()
if cfg == nil {
return errors.Errorf("expected system config to be set")
sysCfg := tc.gossip.GetSystemConfig()
if sysCfg == nil {
return errors.Errorf("no system config yet")
}
numValues := len(cfg.Values)
if numValues != 1 {
return errors.Errorf("num config values != 1; got %d", numValues)
var found bool
for _, cur := range sysCfg.Values {
if key.Equal(cur.Key) {
found = true
break
}
}
if k := cfg.Values[numValues-1].Key; !k.Equal(key) {
return errors.Errorf("invalid key for config value (%q != %q)", k, key)
if !found {
return errors.Errorf("key %s not found in SystemConfig", key)
}
return nil
})
Expand Down Expand Up @@ -1548,112 +1497,6 @@ func TestReplicaGossipAllConfigs(t *testing.T) {
}
}

// TestReplicaNoGossipConfig verifies that certain commands (e.g.,
// reads, writes in uncommitted transactions) do not trigger gossip.
func TestReplicaNoGossipConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
tc.Start(t, stopper)

// Write some arbitrary data in the system span (up to, but not including MaxReservedID+1)
key := keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID)

txn := newTransaction("test", key, 1 /* userPriority */, tc.Clock())
h := roachpb.Header{Txn: txn}
req1 := putArgs(key, []byte("foo"))
req2, _ := endTxnArgs(txn, true /* commit */)
req2.LockSpans = []roachpb.Span{{Key: key}}
req3 := getArgs(key)

testCases := []struct {
req roachpb.Request
h roachpb.Header
}{
{&req1, h},
{&req2, h},
{&req3, roachpb.Header{}},
}

for i, test := range testCases {
assignSeqNumsForReqs(txn, test.req)
if _, pErr := kv.SendWrappedWith(context.Background(), tc.Sender(), test.h, test.req); pErr != nil {
t.Fatal(pErr)
}

// System config is not gossiped.
cfg := tc.gossip.GetSystemConfig()
if cfg == nil {
t.Fatal("config not set")
}
if len(cfg.Values) != 0 {
t.Errorf("System config was gossiped at #%d", i)
}
}
}

// TestReplicaNoGossipFromNonLeader verifies that a non-lease holder replica
// does not gossip configurations.
func TestReplicaNoGossipFromNonLeader(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.Start(t, stopper)

// Write some arbitrary data in the system span (up to, but not including MaxReservedID+1)
key := keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID)

txn := newTransaction("test", key, 1 /* userPriority */, tc.Clock())
req1 := putArgs(key, nil)

assignSeqNumsForReqs(txn, &req1)
if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), roachpb.Header{
Txn: txn,
}, &req1); pErr != nil {
t.Fatal(pErr)
}

req2, h := endTxnArgs(txn, true /* commit */)
req2.LockSpans = []roachpb.Span{{Key: key}}
assignSeqNumsForReqs(txn, &req2)
if _, pErr := tc.SendWrappedWith(h, &req2); pErr != nil {
t.Fatal(pErr)
}
// Execute a get to resolve the intent.
req3 := getArgs(key)
if _, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: txn.WriteTimestamp}, &req3); pErr != nil {
t.Fatal(pErr)
}

// Increment the clock's timestamp to expire the range lease.
tc.manualClock.Set(leaseExpiry(tc.repl))
if tc.repl.CurrentLeaseStatus(ctx).State != kvserverpb.LeaseState_EXPIRED {
t.Fatal("range lease should have been expired")
}

// Make sure the information for db1 is not gossiped. Since obtaining
// a lease updates the gossiped information, we do that.
if _, pErr := tc.repl.redirectOnOrAcquireLease(ctx); pErr != nil {
t.Fatal(pErr)
}
// Fetch the raw gossip info. GetSystemConfig is based on callbacks at
// modification time. But we're checking for _not_ gossiped, so there should
// be no callbacks. Easier to check the raw info.
var cfg config.SystemConfigEntries
err := tc.gossip.GetInfoProto(gossip.KeySystemConfig, &cfg)
if err != nil {
t.Fatal(err)
}
if len(cfg.Values) != 0 {
t.Fatalf("non-lease holder gossiped the system config")
}
}

func getArgs(key []byte) roachpb.GetRequest {
return roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down Expand Up @@ -7063,10 +6906,21 @@ func TestReplicaLoadSystemConfigSpanIntent(t *testing.T) {
return err
}

if len(cfg.Values) != 1 || !bytes.Equal(cfg.Values[0].Key, keys.SystemConfigSpan.Key) {
return errors.Errorf("expected only key %s in SystemConfigSpan map: %+v", keys.SystemConfigSpan.Key, cfg)
var found bool
for _, cur := range cfg.Values {
if !cur.Key.Equal(keys.SystemConfigSpan.Key) {
continue
}
if !v.EqualTagAndData(cur.Value) {
continue
}
found = true
break
}
return nil
if found {
return nil
}
return errors.New("recent write not found in gossiped SystemConfig")
})
}

Expand Down
Loading

0 comments on commit f29118b

Please sign in to comment.