Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: use AdminSplit in splitTestRange #72383

Merged
merged 10 commits into from
Nov 8, 2021
48 changes: 27 additions & 21 deletions pkg/kv/kvserver/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,17 +479,23 @@ func TestGCQueueProcess(t *testing.T) {
ts3 := makeTS(now-intentAgeThreshold.Nanoseconds(), 0) // 2h old
ts4 := makeTS(now-(intentAgeThreshold.Nanoseconds()-1), 0) // 2h-1ns old
ts5 := makeTS(now-1e9, 0) // 1s old
key1 := roachpb.Key("a")
key2 := roachpb.Key("b")
key3 := roachpb.Key("c")
key4 := roachpb.Key("d")
key5 := roachpb.Key("e")
key6 := roachpb.Key("f")
key7 := roachpb.Key("g")
key8 := roachpb.Key("h")
key9 := roachpb.Key("i")
key10 := roachpb.Key("j")
key11 := roachpb.Key("k")
mkKey := func(suff string) roachpb.Key {
var k roachpb.Key
k = append(k, keys.ScratchRangeMin...)
k = append(k, suff...)
return k
}
key1 := mkKey("a")
key2 := mkKey("b")
key3 := mkKey("c")
key4 := mkKey("d")
key5 := mkKey("e")
key6 := mkKey("f")
key7 := mkKey("g")
key8 := mkKey("h")
key9 := mkKey("i")
key10 := mkKey("j")
key11 := mkKey("k")

data := []struct {
key roachpb.Key
Expand Down Expand Up @@ -582,16 +588,16 @@ func TestGCQueueProcess(t *testing.T) {
}

// The total size of the GC'able versions of the keys and values in Info.
// Key size: len("a") + MVCCVersionTimestampSize (13 bytes) = 14 bytes.
// Key size: len(scratch+"a") + MVCCVersionTimestampSize (13 bytes) = 15 bytes.
// Value size: len("value") + headerSize (5 bytes) = 10 bytes.
// key1 at ts1 (14 bytes) => "value" (10 bytes)
// key2 at ts1 (14 bytes) => "value" (10 bytes)
// key3 at ts1 (14 bytes) => "value" (10 bytes)
// key4 at ts1 (14 bytes) => "value" (10 bytes)
// key5 at ts1 (14 bytes) => "value" (10 bytes)
// key5 at ts2 (14 bytes) => delete (0 bytes)
// key10 at ts1 (14 bytes) => delete (0 bytes)
var expectedVersionsKeyBytes int64 = 7 * 14
// key1 at ts1 (15 bytes) => "value" (10 bytes)
// key2 at ts1 (15 bytes) => "value" (10 bytes)
// key3 at ts1 (15 bytes) => "value" (10 bytes)
// key4 at ts1 (15 bytes) => "value" (10 bytes)
// key5 at ts1 (15 bytes) => "value" (10 bytes)
// key5 at ts2 (15 bytes) => delete (0 bytes)
// key10 at ts1 (15 bytes) => delete (0 bytes)
var expectedVersionsKeyBytes int64 = 7 * 15
var expectedVersionsValBytes int64 = 5 * 10

// Call Run with dummy functions to get current Info.
Expand Down Expand Up @@ -670,7 +676,7 @@ func TestGCQueueProcess(t *testing.T) {
return err
}
for i, kv := range kvs {
log.VEventf(ctx, 1, "%d: %s", i, kv.Key)
t.Logf("%d: %s", i, kv.Key)
}
if len(kvs) != len(expKVs) {
return fmt.Errorf("expected length %d; got %d", len(expKVs), len(kvs))
Expand Down
263 changes: 57 additions & 206 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand All @@ -48,8 +46,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand All @@ -59,7 +55,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
Expand Down Expand Up @@ -180,93 +175,38 @@ func (tc *testContext) StartWithStoreConfigAndVersion(
) {
tc.TB = t
ctx := context.Background()
// Setup fake zone config handler.
config.TestingSetupZoneConfigHook(stopper)
rpcContext := rpc.NewContext(rpc.ContextOptions{
TenantID: roachpb.SystemTenantID,
AmbientCtx: cfg.AmbientCtx,
Config: &base.Config{Insecure: true},
Clock: cfg.Clock,
Stopper: stopper,
Settings: cfg.Settings,
})
grpcServer := rpc.NewServer(rpcContext) // never started
require.Nil(t, tc.gossip)
tc.gossip = gossip.NewTest(1, rpcContext, grpcServer, stopper, metric.NewRegistry(), zonepb.DefaultZoneConfigRef())
require.Nil(t, tc.transport)
dialer := nodedialer.New(rpcContext, gossip.AddressResolver(tc.gossip))
tc.transport = NewRaftTransport(cfg.AmbientCtx, cfg.Settings, dialer, grpcServer, stopper)

require.Nil(t, tc.engine)
disableSeparatedIntents :=
!cfg.Settings.Version.ActiveVersionOrEmpty(context.Background()).IsActive(
clusterversion.PostSeparatedIntentsMigration)
log.Infof(context.Background(), "engine creation is randomly setting disableSeparatedIntents: %t",
disableSeparatedIntents)

var err error
tc.engine, err = storage.Open(context.Background(),
storage.InMemory(),
storage.Attributes(roachpb.Attributes{Attrs: []string{"dc1", "mem"}}),
storage.MaxSize(1<<20),
storage.SetSeparatedIntents(disableSeparatedIntents),
storage.Settings(cfg.Settings))
require.NoError(t, err)
stopper.AddCloser(tc.engine)

require.Nil(t, tc.store)
cv := clusterversion.ClusterVersion{Version: bootstrapVersion}
cfg.Gossip = tc.gossip
cfg.Transport = tc.transport
cfg.StorePool = NewTestStorePool(cfg)
// Create a test sender without setting a store. This is to deal with the
// circular dependency between the test sender and the store. The actual
// store will be passed to the sender after it is created and bootstrapped.
factory := &testSenderFactory{}
cfg.DB = kv.NewDB(cfg.AmbientCtx, factory, cfg.Clock, stopper)

require.NoError(t, WriteClusterVersion(ctx, tc.engine, cv))
if err := InitEngine(ctx, tc.engine, roachpb.StoreIdent{
ClusterID: uuid.MakeV4(),
NodeID: 1,
StoreID: 1,
}); err != nil {
t.Fatal(err)
}
if err := clusterversion.Initialize(ctx, cv.Version, &cfg.Settings.SV); err != nil {
t.Fatal(err)
}
tc.store = NewStore(ctx, cfg, tc.engine, &roachpb.NodeDescriptor{NodeID: 1})
// Now that we have our actual store, monkey patch the factory used in cfg.DB.
factory.setStore(tc.store)
// We created the store without a real KV client, so it can't perform splits
// or merges.
tc.store.splitQueue.SetDisabled(true)
tc.store.mergeQueue.SetDisabled(true)

require.Nil(t, tc.repl)
if err := WriteInitialClusterData(
ctx, tc.store.Engine(),
nil, /* initialValues */
bootstrapVersion,
1 /* numStores */, nil /* splits */, cfg.Clock.PhysicalNow(),
cfg.TestingKnobs,
); err != nil {
t.Fatal(err)
}
if err := tc.store.Start(ctx, stopper); err != nil {
t.Fatal(err)
}
tc.store.WaitForInit()
tc.repl, err = tc.store.GetReplica(1)
if err != nil {
t.Fatal(err)
}
tc.rangeID = tc.repl.RangeID

if err := tc.initConfigs(t); err != nil {
// NB: this also sets up fake zone config handlers via TestingSetupZoneConfigHook.
//
// TODO(tbg): the above is not good, figure out which tests need this and make them
// call it directly.
//
// NB: split queue, merge queue, and scanner are also disabled.
store := createTestStoreWithoutStart(
t, stopper, testStoreOpts{
createSystemRanges: false,
bootstrapVersion: bootstrapVersion,
}, &cfg,
)
if err := store.Start(ctx, stopper); err != nil {
t.Fatal(err)
}
store.WaitForInit()
repl, err := store.GetReplica(1)
require.NoError(t, err)
tc.repl = repl
tc.rangeID = repl.RangeID
tc.gossip = store.cfg.Gossip
tc.transport = store.cfg.Transport
tc.engine = store.engine
tc.store = store
// TODO(tbg): see if this is needed. Would like to remove it.
require.NoError(t, tc.initConfigs(t))
}

func (tc *testContext) Sender() kv.Sender {
Expand Down Expand Up @@ -1223,6 +1163,12 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) {
tc.manualClock.Increment(11 + int64(tc.Clock().MaxOffset())) // advance time
now = tc.Clock().NowAsClockTimestamp()

ch := tc.gossip.RegisterSystemConfigChannel()
select {
case <-ch:
default:
}

// Give lease to this range.
if err := sendLeaseRequest(tc.repl, &roachpb.Lease{
Start: now.ToTimestamp().Add(11, 0).UnsafeToClockTimestamp(),
Expand All @@ -1236,20 +1182,20 @@ func TestReplicaGossipConfigsOnLease(t *testing.T) {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
cfg := tc.gossip.GetSystemConfig()
if cfg == nil {
return errors.Errorf("expected system config to be set")
}
numValues := len(cfg.Values)
if numValues != 1 {
return errors.Errorf("num config values != 1; got %d", numValues)
}
if k := cfg.Values[numValues-1].Key; !k.Equal(key) {
return errors.Errorf("invalid key for config value (%q != %q)", k, key)
select {
case <-ch:
case <-time.After(testutils.DefaultSucceedsSoonDuration):
t.Fatalf("no SystemConfig gossiped after lease")
}
sysCfg := tc.gossip.GetSystemConfig()
var found bool
for _, cur := range sysCfg.Values {
if key.Equal(cur.Key) {
found = true
break
}
return nil
})
}
require.True(t, found, "key %s not found in SystemConfig")
}

// TestReplicaTSCacheLowWaterOnLease verifies that the low water mark
Expand Down Expand Up @@ -1548,112 +1494,6 @@ func TestReplicaGossipAllConfigs(t *testing.T) {
}
}

// TestReplicaNoGossipConfig verifies that certain commands (e.g.,
// reads, writes in uncommitted transactions) do not trigger gossip.
func TestReplicaNoGossipConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
tc.Start(t, stopper)

// Write some arbitrary data in the system span (up to, but not including MaxReservedID+1)
key := keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID)

txn := newTransaction("test", key, 1 /* userPriority */, tc.Clock())
h := roachpb.Header{Txn: txn}
req1 := putArgs(key, []byte("foo"))
req2, _ := endTxnArgs(txn, true /* commit */)
req2.LockSpans = []roachpb.Span{{Key: key}}
req3 := getArgs(key)

testCases := []struct {
req roachpb.Request
h roachpb.Header
}{
{&req1, h},
{&req2, h},
{&req3, roachpb.Header{}},
}

for i, test := range testCases {
assignSeqNumsForReqs(txn, test.req)
if _, pErr := kv.SendWrappedWith(context.Background(), tc.Sender(), test.h, test.req); pErr != nil {
t.Fatal(pErr)
}

// System config is not gossiped.
cfg := tc.gossip.GetSystemConfig()
if cfg == nil {
t.Fatal("config not set")
}
if len(cfg.Values) != 0 {
t.Errorf("System config was gossiped at #%d", i)
}
}
}

// TestReplicaNoGossipFromNonLeader verifies that a non-lease holder replica
// does not gossip configurations.
func TestReplicaNoGossipFromNonLeader(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.Start(t, stopper)

// Write some arbitrary data in the system span (up to, but not including MaxReservedID+1)
key := keys.SystemSQLCodec.TablePrefix(keys.MaxReservedDescID)

txn := newTransaction("test", key, 1 /* userPriority */, tc.Clock())
req1 := putArgs(key, nil)

assignSeqNumsForReqs(txn, &req1)
if _, pErr := kv.SendWrappedWith(ctx, tc.Sender(), roachpb.Header{
Txn: txn,
}, &req1); pErr != nil {
t.Fatal(pErr)
}

req2, h := endTxnArgs(txn, true /* commit */)
req2.LockSpans = []roachpb.Span{{Key: key}}
assignSeqNumsForReqs(txn, &req2)
if _, pErr := tc.SendWrappedWith(h, &req2); pErr != nil {
t.Fatal(pErr)
}
// Execute a get to resolve the intent.
req3 := getArgs(key)
if _, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: txn.WriteTimestamp}, &req3); pErr != nil {
t.Fatal(pErr)
}

// Increment the clock's timestamp to expire the range lease.
tc.manualClock.Set(leaseExpiry(tc.repl))
if tc.repl.CurrentLeaseStatus(ctx).State != kvserverpb.LeaseState_EXPIRED {
t.Fatal("range lease should have been expired")
}

// Make sure the information for db1 is not gossiped. Since obtaining
// a lease updates the gossiped information, we do that.
if _, pErr := tc.repl.redirectOnOrAcquireLease(ctx); pErr != nil {
t.Fatal(pErr)
}
// Fetch the raw gossip info. GetSystemConfig is based on callbacks at
// modification time. But we're checking for _not_ gossiped, so there should
// be no callbacks. Easier to check the raw info.
var cfg config.SystemConfigEntries
err := tc.gossip.GetInfoProto(gossip.KeySystemConfig, &cfg)
if err != nil {
t.Fatal(err)
}
if len(cfg.Values) != 0 {
t.Fatalf("non-lease holder gossiped the system config")
}
}

func getArgs(key []byte) roachpb.GetRequest {
return roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down Expand Up @@ -7063,10 +6903,21 @@ func TestReplicaLoadSystemConfigSpanIntent(t *testing.T) {
return err
}

if len(cfg.Values) != 1 || !bytes.Equal(cfg.Values[0].Key, keys.SystemConfigSpan.Key) {
return errors.Errorf("expected only key %s in SystemConfigSpan map: %+v", keys.SystemConfigSpan.Key, cfg)
var found bool
for _, cur := range cfg.Values {
if !cur.Key.Equal(keys.SystemConfigSpan.Key) {
continue
}
if !v.EqualTagAndData(cur.Value) {
continue
}
found = true
break
}
return nil
if found {
return nil
}
return errors.New("recent write not found in gossiped SystemConfig")
})
}

Expand Down
Loading