Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanconfig: introduce the spanconfig.SQLWatcher #69661

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ ALL_TESTS = [
"//pkg/settings:settings_test",
"//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (t *testServerShim) DistSenderI() interface{} { panic(unsuppor
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLWatcher() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ go_library(
"//pkg/spanconfig/spanconfigkvaccessor",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/spanconfig/spanconfigsqltranslator",
"//pkg/spanconfig/spanconfigsqlwatcher",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqltranslator"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -842,13 +843,20 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set.
spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
sqlTranslator := spanconfigsqltranslator.New(execCfg, codec)
sqlWatcher := spanconfigsqlwatcher.New(
codec,
cfg.Settings,
cfg.rangeFeedFactory,
cfg.stopper,
)
spanConfigMgr = spanconfigmanager.New(
cfg.db,
jobRegistry,
cfg.circularInternalExecutor,
cfg.stopper,
cfg.Settings,
cfg.spanConfigAccessor,
sqlWatcher,
sqlTranslator,
spanConfigKnobs,
)
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,16 @@ func (ts *TestServer) SpanConfigSQLTranslator() interface{} {
return ts.sqlServer.spanconfigMgr.SQLTranslator
}

// SpanConfigSQLWatcher is part of TestServerInterface.
func (ts *TestServer) SpanConfigSQLWatcher() interface{} {
if ts.sqlServer.spanconfigMgr == nil {
panic(
"span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs",
)
}
return ts.sqlServer.spanconfigMgr.SQLWatcher
}

// SQLServer is part of TestServerInterface.
func (ts *TestServer) SQLServer() interface{} {
return ts.PGServer().SQLServer
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/base",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/util/hlc",
],
Expand Down
46 changes: 41 additions & 5 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
Expand Down Expand Up @@ -77,6 +78,45 @@ func FullTranslate(
return s.Translate(ctx, descpb.IDs{keys.RootNamespaceID})
}

// SQLWatcher can be used to watch for events on system.zones and
// system.descriptors.
type SQLWatcher interface {
// WatchForSQLUpdates watches for changes to zones and descriptors starting at
// the given timestamp (exclusive) by establishing rangefeeds over
// system.zones and system.descriptors. Consumption of these rangefeed events
// happens asynchronously.
//
// It periodically calls the handle function with a list of SQLWatcherEvents
// and a monotonically increasing checkpointTS. The list of SQLWatcherEvents
// is guaranteed to include all events on system.zones and system.descriptors
// in the window (previous checkpointTS, checkpointTS]. All calls to handle
// are executed serially.
//
// The checkpointTS can be stored and used as the startTS to subsequent calls
// to WatchForSQLUpdates. The previous checkpointTS above is the same as
// startTS the very first time handle is called.
WatchForSQLUpdates(ctx context.Context, startTS hlc.Timestamp, handle SQLWatcherHandleFunc) error

// Close closes the rangefeeds and asynchronous tasks created by the
// SQLWatcher and waits for them to shut down. Close is idempotent.
Close()
}

// SQLWatcherHandleFunc is the type of the handler function expected by the
// SQLWatcher.
type SQLWatcherHandleFunc func(ids []SQLWatcherEvent, checkpointTS hlc.Timestamp) error

// SQLWatcherEvent captures the ID and type of a descriptor or zone that the
// SQLWatcher has observed change.
type SQLWatcherEvent struct {
// ID of the descriptor/zone that has changed.
ID descpb.ID

// DescriptorType of the descriptor/zone that has changed. Could be either the
// specific type or catalog.Any.
DescriptorType catalog.DescriptorType
}

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
Expand All @@ -86,11 +126,7 @@ type ReconciliationDependencies interface {

SQLTranslator

// TODO(arul): We'll also want access to a "SQLWatcher", something that
// watches for changes to system.{descriptors, zones} to feed IDs to the
// SQLTranslator. These interfaces will be used by the "Reconciler to perform
// full/partial reconciliation, checkpoint the span config job, and update KV
// with the tenants span config state.
SQLWatcher
}

// Store is a data structure used to store spans and their corresponding
Expand Down
1 change: 1 addition & 0 deletions pkg/spanconfig/spanconfigmanager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/testspanconfig",
"//pkg/sql",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand Down
3 changes: 3 additions & 0 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Manager struct {
knobs *spanconfig.TestingKnobs

spanconfig.KVAccessor
spanconfig.SQLWatcher
spanconfig.SQLTranslator
}

Expand All @@ -73,6 +74,7 @@ func New(
stopper *stop.Stopper,
settings *cluster.Settings,
kvAccessor spanconfig.KVAccessor,
sqlWatcher spanconfig.SQLWatcher,
sqlTranslator spanconfig.SQLTranslator,
knobs *spanconfig.TestingKnobs,
) *Manager {
Expand All @@ -86,6 +88,7 @@ func New(
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLWatcher: sqlWatcher,
SQLTranslator: sqlTranslator,
knobs: knobs,
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/spanconfig/spanconfigmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerCreatedJobInterceptor: func(jobI interface{}) {
Expand Down Expand Up @@ -162,6 +163,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) {
Expand Down Expand Up @@ -237,6 +239,7 @@ func TestManagerCheckJobConditions(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
Expand Down
62 changes: 62 additions & 0 deletions pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spanconfigsqlwatcher",
srcs = [
"event_buffer.go",
"sql_watcher.go",
"zones_decoder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/roachpb:with-mocks",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "spanconfigsqlwatcher_test",
srcs = [
"event_buffer_test.go",
"main_test.go",
"sql_watcher_test.go",
"zones_decoder_test.go",
],
embed = [":spanconfigsqlwatcher"],
deps = [
"//pkg/base",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/spanconfig",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/protoutil",
"@com_github_gogo_protobuf//proto",
"@com_github_stretchr_testify//require",
],
)
Loading