Skip to content

Commit

Permalink
spanconfigsql{watcher,reconciler}: setup SQLWatcher to watch for pts …
Browse files Browse the repository at this point in the history
…updates

This change sets up a rangefeed on the `system.pts_records` table,
and adds this to the set of rangefeeds managed by the SQLWatcher. We
switch the unit emitted by the SQLWatcher to a union type called
`SQLUpdate` which captures either an update to a descriptor or to a
protected timestamp target. The zones and descriptor table rangefeeds
will continue to emit descriptor updates. The pts rangefeed will emit
descriptor updates for records that target schema objects, and pts target
updates for records that target cluster or tenants.

The SQLWatcher continues to dedup the SQLUpdates that it emits, so as to
ensure there is only one SQLUpdate per descriptor ID, and one SQLUpdate
per cluster or tenant target. The reconciler registers a handler that
is invoked everytime a batch of SQLUpdates are emitted by the SQLWatcher.
There is change in semantics in this part of the code, a future PR will teach
the reconciler to parse the pts target SQLUpdates, and in turn instruct the
SQLTranslator to generate the appropriate `SystemSpanConfigs`.

Note, this file moves the sqlwatcher tests into a ccl package so as to be
able to run backup statements to test the rangefeed on the pts table.

Informs: #73727

Release note: None
  • Loading branch information
adityamaru committed Feb 1, 2022
1 parent c4f15d6 commit e0ce909
Show file tree
Hide file tree
Showing 10 changed files with 651 additions and 139 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ALL_TESTS = [
"//pkg/ccl/spanconfigccl/spanconfigcomparedccl:spanconfigcomparedccl_test",
"//pkg/ccl/spanconfigccl/spanconfigreconcilerccl:spanconfigreconcilerccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl:spanconfigsqlwatcherccl_test",
"//pkg/ccl/sqlproxyccl/denylist:denylist_test",
"//pkg/ccl/sqlproxyccl/idle:idle_test",
"//pkg/ccl/sqlproxyccl/tenant:tenant_test",
Expand Down
38 changes: 38 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "spanconfigsqlwatcherccl_test",
srcs = [
"main_test.go",
"sqlwatcher_test.go",
],
deps = [
"//pkg/base",
"//pkg/ccl/backupccl",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/cloud/impl:cloudimpl",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/kv/kvserver/protectedts",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigsqlwatcher",
"//pkg/sql/catalog/descpb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
34 changes: 34 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package spanconfigsqlwatcherccl

import (
"os"
"testing"

_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
defer utilccl.TestingEnableEnterprise()()
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package spanconfigsqlwatcher_test
package spanconfigsqlwatcherccl

import (
"context"
Expand All @@ -18,9 +16,14 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -42,8 +45,9 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
stmt string
expectedIDs descpb.IDs
stmt string
expectedIDs descpb.IDs
expectedPTSUpdates []spanconfig.ProtectedTimestampUpdate
}{
{
stmt: "CREATE TABLE t()",
Expand Down Expand Up @@ -101,15 +105,46 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
// One ID each for the enum and the array type.
expectedIDs: descpb.IDs{66, 67},
},
// Test that pts updates are seen.
{
stmt: "BACKUP TABLE t,t2 INTO 'nodelocal://1/foo'",
expectedIDs: descpb.IDs{54, 55},
},
{
stmt: "BACKUP DATABASE d INTO 'nodelocal://1/foo'",
expectedIDs: descpb.IDs{56},
},
{
stmt: "BACKUP TABLE d.* INTO 'nodelocal://1/foo'",
expectedIDs: descpb.IDs{56},
},
{
stmt: "BACKUP INTO 'nodelocal://1/foo'",
expectedPTSUpdates: []spanconfig.ProtectedTimestampUpdate{{ClusterTarget: true,
TenantTarget: roachpb.TenantID{}}},
},
{
stmt: `
SELECT crdb_internal.create_tenant(2);
BACKUP TENANT 2 INTO 'nodelocal://1/foo'`,
expectedPTSUpdates: []spanconfig.ProtectedTimestampUpdate{{ClusterTarget: false,
TenantTarget: roachpb.TenantID{2}}},
},
}

dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
ExternalIODir: dir,
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ManagerDisableJobCreation: true, // disable the automatic job creation.
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes.
ProtectedTS: &protectedts.TestingKnobs{
EnableProtectedTimestampForMultiTenant: true,
},
},
},
})
Expand All @@ -134,10 +169,12 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {

var mu struct {
syncutil.Mutex
receivedIDs map[descpb.ID]struct{}
lastCheckpoint hlc.Timestamp
receivedIDs map[descpb.ID]struct{}
receivedTargets map[spanconfig.ProtectedTimestampUpdate]struct{}
lastCheckpoint hlc.Timestamp
}
mu.receivedIDs = make(map[descpb.ID]struct{})
mu.receivedTargets = make(map[spanconfig.ProtectedTimestampUpdate]struct{})

var wg sync.WaitGroup
watcherStartTS := ts.Clock().Now()
Expand All @@ -148,15 +185,20 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
defer wg.Done()

_ = sqlWatcher.WatchForSQLUpdates(watcherCtx, watcherStartTS,
func(ctx context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error {
func(ctx context.Context, updates []spanconfig.SQLUpdate, checkpointTS hlc.Timestamp) error {
mu.Lock()
defer mu.Unlock()

require.True(t, mu.lastCheckpoint.LessEq(checkpointTS))
mu.lastCheckpoint = checkpointTS

for _, update := range updates {
mu.receivedIDs[update.ID] = struct{}{}
if update.IsDescriptorUpdate() {
mu.receivedIDs[update.GetDescriptorUpdate().ID] = struct{}{}
} else if update.IsProtectedTimestampUpdate() {
ptsUpdate := update.GetProtectedTimestampUpdate()
mu.receivedTargets[ptsUpdate] = struct{}{}
}
}
return nil
})
Expand All @@ -179,12 +221,18 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) {
// Rangefeed events aren't guaranteed to be in any particular order for
// different keys.
mu.Lock()
require.Equal(t, len(tc.expectedPTSUpdates), len(mu.receivedTargets))
require.Equal(t, len(tc.expectedIDs), len(mu.receivedIDs))
for _, id := range tc.expectedIDs {
_, seen := mu.receivedIDs[id]
require.True(t, seen)
delete(mu.receivedIDs, id)
}
for _, ptsUpdate := range tc.expectedPTSUpdates {
_, seen := mu.receivedTargets[ptsUpdate]
require.True(t, seen)
delete(mu.receivedTargets, ptsUpdate)
}
mu.Unlock()
}

Expand Down Expand Up @@ -244,11 +292,11 @@ func TestSQLWatcherMultiple(t *testing.T) {

receivedIDs := make(map[descpb.ID]struct{})
err := sqlWatcher.WatchForSQLUpdates(ctx, beforeStmtTS,
func(_ context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error {
func(_ context.Context, updates []spanconfig.SQLUpdate, checkpointTS hlc.Timestamp) error {
onCheckpoint(checkpointTS)

for _, update := range updates {
receivedIDs[update.ID] = struct{}{}
receivedIDs[update.GetDescriptorUpdate().ID] = struct{}{}
}
return nil
})
Expand Down Expand Up @@ -363,10 +411,11 @@ func TestSQLWatcherOnEventError(t *testing.T) {
startTS := ts.Clock().Now()
tdb.Exec(t, "CREATE TABLE t()")

err := sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error {
t.Fatal("handler should never run")
return nil
})
err := sqlWatcher.WatchForSQLUpdates(ctx, startTS,
func(context.Context, []spanconfig.SQLUpdate, hlc.Timestamp) error {
t.Fatal("handler should never run")
return nil
})
require.Error(t, err)
require.True(t, testutils.IsError(err, "boom"))
}
Expand Down Expand Up @@ -431,10 +480,11 @@ func TestSQLWatcherHandlerError(t *testing.T) {
// Wrap the call to WatchForSQLUpdates in a SucceedsSoon to ensure it
// evaluates within 45 seconds.
testutils.SucceedsSoon(t, func() error {
err := sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error {
atomic.AddInt32(&numCalled, 1)
return errors.New("handler error")
})
err := sqlWatcher.WatchForSQLUpdates(ctx, startTS,
func(context.Context, []spanconfig.SQLUpdate, hlc.Timestamp) error {
atomic.AddInt32(&numCalled, 1)
return errors.New("handler error")
})
require.Error(t, err)
require.True(t, testutils.IsError(err, "handler error"))
return nil
Expand Down Expand Up @@ -498,11 +548,11 @@ func TestWatcherReceivesNoopCheckpoints(t *testing.T) {

receivedIDs := make(map[descpb.ID]struct{})
err := sqlWatcher.WatchForSQLUpdates(ctx, beforeStmtTS,
func(_ context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error {
func(_ context.Context, updates []spanconfig.SQLUpdate, checkpointTS hlc.Timestamp) error {
onCheckpoint(checkpointTS)

for _, update := range updates {
receivedIDs[update.ID] = struct{}{}
receivedIDs[update.GetDescriptorUpdate().ID] = struct{}{}
}
return nil
})
Expand Down
Loading

0 comments on commit e0ce909

Please sign in to comment.