From dcf7d5f89bf253faca4371aab7dc388f406cfbb4 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Wed, 20 Jul 2022 11:16:04 -0400 Subject: [PATCH] schematelemetry: add job, schedule, controller and executor This commit adds all the boilerplate for automatically scheduling SQL schema telemetry jobs, by default on a daily basis. This commit also adds a couple of builtins to create the job schedule and also to immediately create a schema telemetry logging job. Fixes #84284. Release note (sql change): this change ensures the existence of a new scheduled job for collecting SQL schema telemetry. By default it runs daily but this can be adjusted via the sql.schema.telemetry.recurrence cluster setting. The schedule can also be paused via PAUSE SCHEDULE followed by its ID, which can be retrieved by SELECT * FROM [SHOW SCHEDULES] WHERE label = 'sql-schema-telemetry'. --- .../settings/settings-for-tenants.txt | 4 +- docs/generated/settings/settings.html | 4 +- docs/generated/sql/functions.md | 4 + pkg/BUILD.bazel | 4 + pkg/base/testing_knobs.go | 1 + pkg/clusterversion/cockroach_versions.go | 7 + pkg/clusterversion/key_string.go | 5 +- pkg/gen/protobuf.bzl | 1 + pkg/server/BUILD.bazel | 1 + pkg/server/server.go | 1 + pkg/server/server_sql.go | 4 + pkg/sql/BUILD.bazel | 1 + pkg/sql/catalog/schematelemetry/BUILD.bazel | 48 ++- pkg/sql/catalog/schematelemetry/main_test.go | 31 ++ .../schematelemetry/scheduled_job_executor.go | 131 ++++++++ .../schematelemetry/schema_telemetry_event.go | 4 - .../schematelemetry/schema_telemetry_job.go | 97 ++++++ .../schematelemetry/schema_telemetry_test.go | 108 +++++++ .../schematelemetrycontroller/BUILD.bazel | 46 +++ .../schematelemetrycontroller/controller.go | 289 ++++++++++++++++++ .../schema_telemetry.proto | 19 ++ pkg/sql/conn_executor.go | 21 ++ pkg/sql/distsql/server.go | 1 + pkg/sql/exec_util.go | 11 + pkg/sql/execinfra/server_config.go | 4 + pkg/sql/planner.go | 5 + .../builtins/builtinconstants/constants.go | 3 + pkg/sql/sem/builtins/builtins.go | 65 ++++ pkg/sql/sem/eval/context.go | 2 + pkg/sql/sem/eval/deps.go | 8 + pkg/sql/sem/tree/show.go | 7 + pkg/sql/sqltelemetry/schema.go | 5 + pkg/ts/catalog/chart_catalog.go | 8 + pkg/upgrade/upgrades/BUILD.bazel | 7 + .../ensure_sql_schema_telemetry_schedule.go | 26 ++ ...sure_sql_schema_telemetry_schedule_test.go | 87 ++++++ pkg/upgrade/upgrades/upgrades.go | 6 + 37 files changed, 1066 insertions(+), 10 deletions(-) create mode 100644 pkg/sql/catalog/schematelemetry/main_test.go create mode 100644 pkg/sql/catalog/schematelemetry/scheduled_job_executor.go create mode 100644 pkg/sql/catalog/schematelemetry/schema_telemetry_job.go create mode 100644 pkg/sql/catalog/schematelemetry/schema_telemetry_test.go create mode 100644 pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel create mode 100644 pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go create mode 100644 pkg/sql/catalog/schematelemetry/schematelemetrycontroller/schema_telemetry.proto create mode 100644 pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule.go create mode 100644 pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index bcca1bf17cfb..ab803a21585c 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -248,6 +248,8 @@ sql.multiple_modifications_of_table.enabled boolean false if true, allow stateme sql.multiregion.drop_primary_region.enabled boolean true allows dropping the PRIMARY REGION of a database if it is the last region sql.notices.enabled boolean true enable notices in the server/client protocol being sent sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabled boolean false if enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +sql.schema.telemetry.recurrence string @daily cron-tab recurrence for SQL schema telemetry job +sql.schema_telemetry.job.enabled boolean true whether the schema telemetry job is enabled sql.spatial.experimental_box2d_comparison_operators.enabled boolean false enables the use of certain experimental box2d comparison operators sql.stats.automatic_collection.enabled boolean true automatic statistics collection mode sql.stats.automatic_collection.fraction_stale_rows float 0.2 target fraction of stale rows per table that will trigger a statistics refresh @@ -283,4 +285,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-28 set the active cluster version in the format '.' +version version 22.1-30 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 5d1e60b4d3a9..baa13520021d 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -179,6 +179,8 @@ sql.multiregion.drop_primary_region.enabledbooleantrueallows dropping the PRIMARY REGION of a database if it is the last region sql.notices.enabledbooleantrueenable notices in the server/client protocol being sent sql.optimizer.uniqueness_checks_for_gen_random_uuid.enabledbooleanfalseif enabled, uniqueness checks may be planned for mutations of UUID columns updated with gen_random_uuid(); otherwise, uniqueness is assumed due to near-zero collision probability +sql.schema.telemetry.recurrencestring@dailycron-tab recurrence for SQL schema telemetry job +sql.schema_telemetry.job.enabledbooleantruewhether the schema telemetry job is enabled sql.spatial.experimental_box2d_comparison_operators.enabledbooleanfalseenables the use of certain experimental box2d comparison operators sql.stats.automatic_collection.enabledbooleantrueautomatic statistics collection mode sql.stats.automatic_collection.fraction_stale_rowsfloat0.2target fraction of stale rows per table that will trigger a statistics refresh @@ -214,6 +216,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-28set the active cluster version in the format '.' +versionversion22.1-30set the active cluster version in the format '.' diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index afba41bdded1..59550df415dd 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -3023,6 +3023,8 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)

Volatile crdb_internal.create_session_revival_token() → bytes

Generate a token that can be used to create a new session for the current user.

Volatile +crdb_internal.create_sql_schema_telemetry_job() → int

This function is used to create a schema telemetry job instance.

+
Volatile crdb_internal.decode_cluster_setting(setting: string, value: string) → string

Decodes the given encoded value for a cluster setting.

Immutable crdb_internal.deserialize_session(session: bytes) → bool

This function deserializes the serialized variables into the current session.

@@ -3120,6 +3122,8 @@ table. Returns an error if validation fails.

Immutable crdb_internal.round_decimal_values(val: decimal[], scale: int) → decimal[]

This function is used internally to round decimal array values during mutations.

Stable +crdb_internal.schedule_sql_schema_telemetry() → bool

This function is used to create the schema telemetry job schedule.

+
Volatile crdb_internal.schedule_sql_stats_compaction() → bool

This function is used to start a SQL stats compaction job.

Volatile crdb_internal.serialize_session() → bytes

This function serializes the variables in the current session.

diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 2467fb1e2e05..e09f97d525f2 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -255,6 +255,7 @@ ALL_TESTS = [ "//pkg/sql/catalog/resolver:resolver_test", "//pkg/sql/catalog/schemadesc:schemadesc_test", "//pkg/sql/catalog/schemaexpr:schemaexpr_test", + "//pkg/sql/catalog/schematelemetry:schematelemetry_test", "//pkg/sql/catalog/seqexpr:seqexpr_disallowed_imports_test", "//pkg/sql/catalog/seqexpr:seqexpr_test", "//pkg/sql/catalog/systemschema_test:systemschema_test_test", @@ -1246,7 +1247,9 @@ GO_TARGETS = [ "//pkg/sql/catalog/schemadesc:schemadesc_test", "//pkg/sql/catalog/schemaexpr:schemaexpr", "//pkg/sql/catalog/schemaexpr:schemaexpr_test", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller:schematelemetrycontroller", "//pkg/sql/catalog/schematelemetry:schematelemetry", + "//pkg/sql/catalog/schematelemetry:schematelemetry_test", "//pkg/sql/catalog/seqexpr:seqexpr", "//pkg/sql/catalog/seqexpr:seqexpr_test", "//pkg/sql/catalog/systemschema:systemschema", @@ -2351,6 +2354,7 @@ GET_X_DATA_TARGETS = [ "//pkg/sql/catalog/schemadesc:get_x_data", "//pkg/sql/catalog/schemaexpr:get_x_data", "//pkg/sql/catalog/schematelemetry:get_x_data", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller:get_x_data", "//pkg/sql/catalog/seqexpr:get_x_data", "//pkg/sql/catalog/systemschema:get_x_data", "//pkg/sql/catalog/systemschema_test:get_x_data", diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 6deeba297c5c..893356cf2dda 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -38,6 +38,7 @@ type TestingKnobs struct { JobsTestingKnobs ModuleTestingKnobs BackupRestore ModuleTestingKnobs TTL ModuleTestingKnobs + SchemaTelemetry ModuleTestingKnobs Streaming ModuleTestingKnobs UpgradeManager ModuleTestingKnobs IndexUsageStatsKnobs ModuleTestingKnobs diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index d4ea07a7b45e..471d8915553c 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -327,6 +327,9 @@ const ( AlterSystemSQLInstancesAddLocality // SystemExternalConnectionsTable adds system.external_connections table. SystemExternalConnectionsTable + // SQLSchemaTelemetryScheduledJobs adds an automatic schedule for SQL schema + // telemetry logging jobs. + SQLSchemaTelemetryScheduledJobs // ************************************************* // Step (1): Add new versions here. @@ -564,6 +567,10 @@ var versionsSingleton = keyedVersions{ Key: SystemExternalConnectionsTable, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 28}, }, + { + Key: SQLSchemaTelemetryScheduledJobs, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 30}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index f3256a9d1f96..9b8b17d099dc 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -58,11 +58,12 @@ func _() { _ = x[EnablePredicateProjectionChangefeed-47] _ = x[AlterSystemSQLInstancesAddLocality-48] _ = x[SystemExternalConnectionsTable-49] + _ = x[SQLSchemaTelemetryScheduledJobs-50] } -const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTable" +const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessBackupDoesNotOverwriteLatestAndCheckpointEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedIncrementalBackupSubdirEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableSQLSchemaTelemetryScheduledJobs" -var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 643, 673, 684, 715, 738, 762, 786, 808, 820, 846, 860, 881, 899, 904, 913, 928, 962, 996, 1018, 1038, 1057, 1090, 1109, 1129, 1150, 1185, 1219, 1249} +var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 643, 673, 684, 715, 738, 762, 786, 808, 820, 846, 860, 881, 899, 904, 913, 928, 962, 996, 1018, 1038, 1057, 1090, 1109, 1129, 1150, 1185, 1219, 1249, 1280} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 6cc1421a0552..a3f7af754cbe 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -37,6 +37,7 @@ PROTOBUF_SRCS = [ "//pkg/settings:settings_go_proto", "//pkg/sql/catalog/catpb:catpb_go_proto", "//pkg/sql/catalog/descpb:descpb_go_proto", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller:schematelemetrycontroller_go_proto", "//pkg/sql/contentionpb:contentionpb_go_proto", "//pkg/sql/execinfrapb:execinfrapb_go_proto", "//pkg/sql/inverted:inverted_go_proto", diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 23e951e9ab43..035deb6669db 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -161,6 +161,7 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/catalog/hydratedtables", "//pkg/sql/catalog/lease", + "//pkg/sql/catalog/schematelemetry", "//pkg/sql/catalog/systemschema", "//pkg/sql/clusterunique", "//pkg/sql/colexec", diff --git a/pkg/server/server.go b/pkg/server/server.go index 15b1d431359c..964faf3f9376 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -60,6 +60,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" + _ "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry" // register schedules declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" _ "github.com/cockroachdb/cockroach/pkg/sql/gcjob" // register jobs declared outside of pkg/sql diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 00b85c0cfb20..87a9ae0793ac 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -873,6 +873,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if ttlKnobs := cfg.TestingKnobs.TTL; ttlKnobs != nil { execCfg.TTLTestingKnobs = ttlKnobs.(*sql.TTLTestingKnobs) } + if schemaTelemetryKnobs := cfg.TestingKnobs.SchemaTelemetry; schemaTelemetryKnobs != nil { + execCfg.SchemaTelemetryTestingKnobs = schemaTelemetryKnobs.(*sql.SchemaTelemetryTestingKnobs) + } if sqlStatsKnobs := cfg.TestingKnobs.SQLStatsKnobs; sqlStatsKnobs != nil { execCfg.SQLStatsTestingKnobs = sqlStatsKnobs.(*sqlstats.TestingKnobs) } @@ -913,6 +916,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ) distSQLServer.ServerConfig.SQLStatsController = pgServer.SQLServer.GetSQLStatsController() + distSQLServer.ServerConfig.SchemaTelemetryController = pgServer.SQLServer.GetSchemaTelemetryController() distSQLServer.ServerConfig.IndexUsageStatsController = pgServer.SQLServer.GetIndexUsageStatsController() // We use one BytesMonitor for all InternalExecutor's created by the diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 972077241df6..a74eced5736e 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -319,6 +319,7 @@ go_library( "//pkg/sql/catalog/resolver", "//pkg/sql/catalog/schemadesc", "//pkg/sql/catalog/schemaexpr", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller", "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", diff --git a/pkg/sql/catalog/schematelemetry/BUILD.bazel b/pkg/sql/catalog/schematelemetry/BUILD.bazel index e7317d699c79..c5283b189146 100644 --- a/pkg/sql/catalog/schematelemetry/BUILD.bazel +++ b/pkg/sql/catalog/schematelemetry/BUILD.bazel @@ -1,26 +1,70 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "schematelemetry", - srcs = ["schema_telemetry_event.go"], + srcs = [ + "scheduled_job_executor.go", + "schema_telemetry_event.go", + "schema_telemetry_job.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry", visibility = ["//visibility:public"], deps = [ + "//pkg/jobs", + "//pkg/jobs/jobspb", "//pkg/kv", + "//pkg/scheduledjobs", + "//pkg/security/username", + "//pkg/server/telemetry", + "//pkg/settings", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller", "//pkg/sql/descmetadata", "//pkg/sql/schemachanger/scdecomp", "//pkg/sql/schemachanger/scpb", "//pkg/sql/schemachanger/screl", + "//pkg/sql/sem/tree", + "//pkg/sql/sqltelemetry", + "//pkg/sql/sqlutil", "//pkg/util/hlc", "//pkg/util/log/eventpb", + "//pkg/util/metric", + "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//types", ], ) +go_test( + name = "schematelemetry_test", + srcs = [ + "main_test.go", + "schema_telemetry_test.go", + ], + deps = [ + "//pkg/base", + "//pkg/jobs", + "//pkg/jobs/jobstest", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller", + "//pkg/sql/sem/builtins/builtinconstants", + "//pkg/sql/sem/tree", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], +) + get_x_data(name = "get_x_data") diff --git a/pkg/sql/catalog/schematelemetry/main_test.go b/pkg/sql/catalog/schematelemetry/main_test.go new file mode 100644 index 000000000000..2205f9b466a8 --- /dev/null +++ b/pkg/sql/catalog/schematelemetry/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package schematelemetry_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +//go:generate ../../../util/leaktest/add-leaktest.sh *_test.go + +func TestMain(m *testing.M) { + securityassets.SetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go b/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go new file mode 100644 index 000000000000..36cdcd997c6c --- /dev/null +++ b/pkg/sql/catalog/schematelemetry/scheduled_job_executor.go @@ -0,0 +1,131 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package schematelemetry + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/scheduledjobs" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/metric" +) + +type schemaTelemetryExecutor struct { + metrics schemaTelemetryMetrics +} + +var _ jobs.ScheduledJobController = (*schemaTelemetryExecutor)(nil) +var _ jobs.ScheduledJobExecutor = (*schemaTelemetryExecutor)(nil) + +type schemaTelemetryMetrics struct { + *jobs.ExecutorMetrics +} + +var _ metric.Struct = &schemaTelemetryMetrics{} + +// MetricStruct implements metric.Struct interface. +func (m *schemaTelemetryMetrics) MetricStruct() {} + +// OnDrop implements the jobs.ScheduledJobController interface. +func (s schemaTelemetryExecutor) OnDrop( + ctx context.Context, + scheduleControllerEnv scheduledjobs.ScheduleControllerEnv, + env scheduledjobs.JobSchedulerEnv, + schedule *jobs.ScheduledJob, + txn *kv.Txn, + descsCol *descs.Collection, +) error { + return nil +} + +// ExecuteJob implements the jobs.ScheduledJobController interface. +func (s schemaTelemetryExecutor) ExecuteJob( + ctx context.Context, + cfg *scheduledjobs.JobExecutionConfig, + env scheduledjobs.JobSchedulerEnv, + sj *jobs.ScheduledJob, + txn *kv.Txn, +) (err error) { + defer func() { + if err == nil { + s.metrics.NumStarted.Inc(1) + } else { + s.metrics.NumFailed.Inc(1) + } + }() + p, cleanup := cfg.PlanHookMaker("invoke-schema-telemetry", txn, username.NodeUserName()) + execCfg := p.(sql.PlanHookState).ExecCfg() + defer cleanup() + _, err = schematelemetrycontroller.CreateSchemaTelemetryJob(ctx, execCfg.JobRegistry, txn, jobs.CreatedByScheduledJobs, sj.ScheduleID()) + return err +} + +// NotifyJobTermination implements the jobs.ScheduledJobController interface. +func (s schemaTelemetryExecutor) NotifyJobTermination( + ctx context.Context, + jobID jobspb.JobID, + jobStatus jobs.Status, + details jobspb.Details, + env scheduledjobs.JobSchedulerEnv, + sj *jobs.ScheduledJob, + ex sqlutil.InternalExecutor, + txn *kv.Txn, +) error { + switch jobStatus { + case jobs.StatusFailed: + jobs.DefaultHandleFailedRun(sj, "SQL schema telemetry job failed") + s.metrics.NumFailed.Inc(1) + return nil + case jobs.StatusSucceeded: + s.metrics.NumSucceeded.Inc(1) + } + sj.SetScheduleStatus(string(jobStatus)) + return nil +} + +// Metrics implements the jobs.ScheduledJobController interface. +func (s schemaTelemetryExecutor) Metrics() metric.Struct { + return &s.metrics +} + +// GetCreateScheduleStatement implements the jobs.ScheduledJobController interface. +func (s schemaTelemetryExecutor) GetCreateScheduleStatement( + ctx context.Context, + env scheduledjobs.JobSchedulerEnv, + txn *kv.Txn, + descsCol *descs.Collection, + sj *jobs.ScheduledJob, + ex sqlutil.InternalExecutor, +) (string, error) { + return "SELECT crdb_internal.schedule_sql_schema_telemetry()", nil +} + +func init() { + jobs.RegisterScheduledJobExecutorFactory( + tree.ScheduledSchemaTelemetryExecutor.InternalName(), + func() (jobs.ScheduledJobExecutor, error) { + m := jobs.MakeExecutorMetrics(tree.ScheduledSchemaTelemetryExecutor.InternalName()) + return &schemaTelemetryExecutor{ + metrics: schemaTelemetryMetrics{ + ExecutorMetrics: &m, + }, + }, nil + }, + ) +} diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_event.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_event.go index 9682045b9ebe..0474d6da95c2 100644 --- a/pkg/sql/catalog/schematelemetry/schema_telemetry_event.go +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_event.go @@ -78,10 +78,6 @@ func buildLogEvents( return events, nil } -// Keep the linter happy. -// TODO(postamar): use this function -var _ = buildLogEvents - // CollectClusterSchemaForTelemetry returns a projection of the cluster's SQL // schema as of the provided system time, suitably filtered for the purposes of // schema telemetry. diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go new file mode 100644 index 000000000000..d666a06cfaeb --- /dev/null +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_job.go @@ -0,0 +1,97 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package schematelemetry + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/errors" +) + +var ( + jobEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.schema_telemetry.job.enabled", + "whether the schema telemetry job is enabled", + true, + ).WithPublic() +) + +type schemaTelemetryResumer struct { + job *jobs.Job + st *cluster.Settings +} + +var _ jobs.Resumer = (*schemaTelemetryResumer)(nil) + +// Resume implements the jobs.Resumer interface. +func (t schemaTelemetryResumer) Resume(ctx context.Context, execCtx interface{}) error { + p := execCtx.(sql.JobExecContext) + + if enabled := jobEnabled.Get(p.ExecCfg().SV()); !enabled { + return errors.Newf("schema telemetry jobs are currently disabled by CLUSTER SETTING %s", jobEnabled.Key()) + } + + telemetry.Inc(sqltelemetry.SchemaTelemetryExecuted) + + var knobs sql.SchemaTelemetryTestingKnobs + if k := p.ExecCfg().SchemaTelemetryTestingKnobs; k != nil { + knobs = *k + } + + aostDuration := -time.Second * 30 + if knobs.AOSTDuration != nil { + aostDuration = *knobs.AOSTDuration + } + events, err := buildLogEvents(ctx, p.ExecCfg(), aostDuration) + if err != nil || len(events) == 0 { + return err + } + + return p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return sql.InsertEventRecord( + ctx, + p.ExecCfg().InternalExecutor, + txn, + int32(p.ExecCfg().NodeID.SQLInstanceID()), /* reportingID */ + sql.LogEverywhere, + 0, /* targetID */ + events..., + ) + }) +} + +// OnFailOrCancel implements the jobs.Resumer interface. +func (t schemaTelemetryResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { + return nil +} + +func init() { + jobs.RegisterConstructor( + jobspb.TypeAutoSchemaTelemetry, + func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return &schemaTelemetryResumer{ + job: job, + st: settings, + } + }, + jobs.DisablesTenantCostControl, + ) +} diff --git a/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go new file mode 100644 index 000000000000..d4132bcb8958 --- /dev/null +++ b/pkg/sql/catalog/schematelemetry/schema_telemetry_test.go @@ -0,0 +1,108 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package schematelemetry_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func makeTestServerArgs() (args base.TestServerArgs) { + args.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{ + JobSchedulerEnv: jobstest.NewJobSchedulerTestEnv( + jobstest.UseSystemTables, + timeutil.Now(), + tree.ScheduledSchemaTelemetryExecutor, + ), + } + aostDuration := time.Nanosecond + args.Knobs.SchemaTelemetry = &sql.SchemaTelemetryTestingKnobs{ + AOSTDuration: &aostDuration, + } + return args +} + +var ( + qExists = fmt.Sprintf(` + SELECT recurrence, count(*) + FROM [SHOW SCHEDULES] + WHERE label = '%s' + AND schedule_status = 'ACTIVE' + GROUP BY recurrence`, + schematelemetrycontroller.SchemaTelemetryScheduleName) + + qID = fmt.Sprintf(` + SELECT id + FROM [SHOW SCHEDULES] + WHERE label = '%s' + AND schedule_status = 'ACTIVE'`, + schematelemetrycontroller.SchemaTelemetryScheduleName) + + qSet = fmt.Sprintf(`SET CLUSTER SETTING %s = '* * * * *'`, + schematelemetrycontroller.SchemaTelemetryRecurrence.Key()) + + qJob = fmt.Sprintf(`SELECT %s()`, + builtinconstants.CreateSchemaTelemetryJobBuiltinName) +) + +const qHasLogs = `SELECT sign(count(*)) FROM system.eventlog WHERE "eventType" = 'schema'` + +func TestSchemaTelemetrySchedule(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, makeTestServerArgs()) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(db) + + tdb.CheckQueryResultsRetry(t, qExists, [][]string{{"@daily", "1"}}) + tdb.ExecSucceedsSoon(t, qSet) + tdb.CheckQueryResultsRetry(t, qExists, [][]string{{"* * * * *", "1"}}) +} + +func TestSchemaTelemetryJob(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, makeTestServerArgs()) + defer s.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(db) + + // Pause the existing schema telemetry schedule so that it doesn't interfere. + res := tdb.QueryStr(t, qID) + require.NotEmpty(t, res) + require.NotEmpty(t, res[0]) + id := res[0][0] + tdb.ExecSucceedsSoon(t, fmt.Sprintf("PAUSE SCHEDULE %s", id)) + // Check that the eventlog table has no schema telemetry log entries. + tdb.CheckQueryResults(t, qHasLogs, [][]string{{"0"}}) + // Run a schema telemetry job and wait for the logs to appear. + tdb.Exec(t, qJob) + tdb.CheckQueryResultsRetry(t, qHasLogs, [][]string{{"1"}}) +} diff --git a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel new file mode 100644 index 000000000000..6df1ed3ee82b --- /dev/null +++ b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel @@ -0,0 +1,46 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "schematelemetrycontroller_proto", + srcs = ["schema_telemetry.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], +) + +go_proto_library( + name = "schematelemetrycontroller_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller", + proto = ":schematelemetrycontroller_proto", + visibility = ["//visibility:public"], +) + +go_library( + name = "schematelemetrycontroller", + srcs = ["controller.go"], + embed = [":schematelemetrycontroller_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller", + visibility = ["//visibility:public"], + deps = [ + "//pkg/clusterversion", + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/kv", + "//pkg/scheduledjobs", + "//pkg/security/username", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql/sem/tree", + "//pkg/sql/sessiondata", + "//pkg/sql/sqlutil", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_gogo_protobuf//types", + "@com_github_robfig_cron_v3//:cron", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go new file mode 100644 index 000000000000..5a5d30e6b4a4 --- /dev/null +++ b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go @@ -0,0 +1,289 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package schematelemetrycontroller + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/scheduledjobs" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + pbtypes "github.com/gogo/protobuf/types" + "github.com/robfig/cron/v3" +) + +// SchemaTelemetryScheduleName is the name of the schema telemetry schedule. +const SchemaTelemetryScheduleName = "sql-schema-telemetry" + +// SchemaTelemetryRecurrence is the cron-tab string specifying the recurrence +// for schema telemetry job. +var SchemaTelemetryRecurrence = settings.RegisterValidatedStringSetting( + settings.TenantWritable, + "sql.schema.telemetry.recurrence", + "cron-tab recurrence for SQL schema telemetry job", + "@daily", /* defaultValue */ + func(_ *settings.Values, s string) error { + if _, err := cron.ParseStandard(s); err != nil { + return errors.Wrap(err, "invalid cron expression") + } + return nil + }, +).WithPublic() + +// ErrDuplicatedSchedules indicates that there is already a schedule for SQL +// schema telemetry jobs existing in the system.scheduled_jobs table. +var ErrDuplicatedSchedules = errors.New("creating multiple schema telemetry schedules is disallowed") + +// ErrVersionGate indicates that SQL schema telemetry jobs or schedules are +// not supported by the current cluster version. +var ErrVersionGate = errors.New("SQL schema telemetry jobs or schedules not supported by current cluster version") + +// Controller implements the SQL Schema telemetry subsystem control plane. +// This exposes administrative interfaces that can be consumed by other parts +// of the database (e.g. status server, builtins) to control the behavior of the +// SQL schema telemetry subsystem. +type Controller struct { + scheduleController + jr *jobs.Registry +} + +// scheduleController is distinct from Controller which implements the full +// control plane, because tenant migrations don't have access to the jobs +// registry. +type scheduleController struct { + db *kv.DB + ie sqlutil.InternalExecutor + st *cluster.Settings +} + +// NewController is a constructor for *Controller. +// +// This constructor needs to be called in the sql package when creating a new +// sql.Server. This is the reason why it and the definition of the Controller +// object live in their own package separate from schematelemetry. +func NewController( + db *kv.DB, ie sqlutil.InternalExecutor, st *cluster.Settings, jr *jobs.Registry, +) *Controller { + return &Controller{ + scheduleController: scheduleController{ + db: db, + ie: ie, + st: st, + }, + jr: jr, + } +} + +// CreateSchemaTelemetryJob implements eval.SchemaTelemetryController. +func (c *Controller) CreateSchemaTelemetryJob( + ctx context.Context, createdByName string, createdByID int64, +) (id int64, _ error) { + if !c.st.Version.IsActive(ctx, clusterversion.SQLSchemaTelemetryScheduledJobs) { + return 0, ErrVersionGate + } + if err := c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + jobID, err := CreateSchemaTelemetryJob(ctx, c.jr, txn, createdByName, createdByID) + id = int64(jobID) + return err + }); err != nil { + return 0, err + } + c.jr.NotifyToAdoptJobs() + return id, nil +} + +// CreateSchemaTelemetryJob creates a new schema telemetry job. +func CreateSchemaTelemetryJob( + ctx context.Context, jr *jobs.Registry, txn *kv.Txn, createdByName string, createdByID int64, +) (jobID jobspb.JobID, err error) { + record := jobs.Record{ + Description: "SQL schema telemetry", + Username: username.NodeUserName(), + Details: jobspb.SchemaTelemetryDetails{}, + Progress: jobspb.SchemaTelemetryProgress{}, + CreatedBy: &jobs.CreatedByInfo{ + ID: createdByID, + Name: createdByName, + }, + } + jobID = jr.MakeJobID() + _, err = jr.CreateAdoptableJobWithTxn(ctx, record, jobID, txn) + return jobID, err +} + +// CreateSchemaTelemetrySchedule implements eval.SchemaTelemetryController. +func (sc *scheduleController) CreateSchemaTelemetrySchedule(ctx context.Context) error { + return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if !sc.st.Version.IsActive(ctx, clusterversion.SQLSchemaTelemetryScheduledJobs) { + return ErrVersionGate + } + _, err := CreateSchemaTelemetrySchedule(ctx, sc.ie, txn, sc.st) + return err + }) +} + +// CreateSchemaTelemetrySchedule registers the schema telemetry job with +// the scheduled job subsystem so that the schema telemetry job can be run +// periodically. This is done during the cluster startup upgrade. +func CreateSchemaTelemetrySchedule( + ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, st *cluster.Settings, +) (*jobs.ScheduledJob, error) { + id, err := GetSchemaTelemetryScheduleID(ctx, ie, txn) + if err != nil { + return nil, err + } + if id != 0 { + return nil, ErrDuplicatedSchedules + } + + scheduledJob := jobs.NewScheduledJob(scheduledjobs.ProdJobSchedulerEnv) + + schedule := SchemaTelemetryRecurrence.Get(&st.SV) + if err := scheduledJob.SetSchedule(schedule); err != nil { + return nil, err + } + + scheduledJob.SetScheduleDetails(jobspb.ScheduleDetails{ + Wait: jobspb.ScheduleDetails_SKIP, + OnError: jobspb.ScheduleDetails_RETRY_SCHED, + }) + + scheduledJob.SetScheduleLabel(SchemaTelemetryScheduleName) + scheduledJob.SetOwner(username.NodeUserName()) + + args, err := pbtypes.MarshalAny(&ScheduledSchemaTelemetryExecutionArgs{}) + if err != nil { + return nil, err + } + scheduledJob.SetExecutionDetails( + tree.ScheduledSchemaTelemetryExecutor.InternalName(), + jobspb.ExecutionArguments{Args: args}, + ) + + scheduledJob.SetScheduleStatus(string(jobs.StatusPending)) + if err = scheduledJob.Create(ctx, ie, txn); err != nil { + return nil, err + } + + return scheduledJob, nil +} + +// GetSchemaTelemetryScheduleID returns the ID of the schema telemetry schedule +// if it exists, 0 if it does not exist yet. +func GetSchemaTelemetryScheduleID( + ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, +) (id int64, _ error) { + row, err := ie.QueryRowEx( + ctx, + "check-existing-schema-telemetry-schedule", + txn, + sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, + `SELECT schedule_id FROM system.scheduled_jobs WHERE schedule_name = $1 ORDER BY schedule_id ASC LIMIT 1`, + SchemaTelemetryScheduleName, + ) + if err != nil || row == nil { + return 0, err + } + if len(row) != 1 { + return 0, errors.AssertionFailedf("unexpectedly received %d columns", len(row)) + } + // Defensively check the type. + v, ok := tree.AsDInt(row[0]) + if !ok { + return 0, errors.AssertionFailedf("unexpectedly received non-integer value %v", row[0]) + } + return int64(v), nil +} + +// EnsureScheduleAtStartup ensures that a SQL schema telemetry job schedule +// exists during sql.Server startup. +func (sc *scheduleController) EnsureScheduleAtStartup(ctx context.Context) { + if !sc.st.Version.IsActive(ctx, clusterversion.SQLSchemaTelemetryScheduledJobs) { + log.Infof(ctx, "%s", ErrVersionGate.Error()) + return + } + if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + _, err := sc.ensureSchedule(ctx, txn) + return err + }); err != nil { + log.Errorf(ctx, "failed to ensure existence of SQL schema telemetry schedule: %s", err) + } +} + +// RegisterClusterSettingHook ensures that the SQL schema telemetry schedule +// recurrence is updated following changes to the corresponding cluster setting. +func (sc *scheduleController) RegisterClusterSettingHook(ctx context.Context) { + SchemaTelemetryRecurrence.SetOnChange(&sc.st.SV, func(ctx context.Context) { + if !sc.st.Version.IsActive(ctx, clusterversion.SQLSchemaTelemetryScheduledJobs) { + log.Infof(ctx, "%s", ErrVersionGate.Error()) + return + } + sc.onClusterSettingChange(ctx) + }) +} + +func (sc *scheduleController) ensureSchedule( + ctx context.Context, txn *kv.Txn, +) (sj *jobs.ScheduledJob, _ error) { + id, err := GetSchemaTelemetryScheduleID(ctx, sc.ie, txn) + if err != nil { + return nil, err + } + if id == 0 { + sj, err = CreateSchemaTelemetrySchedule(ctx, sc.ie, txn, sc.st) + } else { + sj, err = jobs.LoadScheduledJob(ctx, scheduledjobs.ProdJobSchedulerEnv, id, sc.ie, txn) + } + if err != nil { + return nil, err + } + return sj, nil +} + +func (sc *scheduleController) onClusterSettingChange(ctx context.Context) { + if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + sj, err := sc.ensureSchedule(ctx, txn) + if err != nil || sj == nil { + return err + } + cronExpr := SchemaTelemetryRecurrence.Get(&sc.st.SV) + if err = sj.SetSchedule(cronExpr); err != nil { + return err + } + sj.SetScheduleStatus(string(jobs.StatusPending)) + return sj.Update(ctx, sc.ie, txn) + }); err != nil { + log.Errorf(ctx, "failed to change SQL schema telemetry schedule recurrence: %s", err) + } +} + +// EnsureScheduleAndRegisterClusterSettingHook is a convenient entry point +// for the migration associated with the SQL schema telemetry version. +func EnsureScheduleAndRegisterClusterSettingHook( + ctx context.Context, db *kv.DB, ie sqlutil.InternalExecutor, st *cluster.Settings, +) error { + sc := scheduleController{db: db, ie: ie, st: st} + sc.RegisterClusterSettingHook(ctx) + return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + _, err := sc.ensureSchedule(ctx, txn) + return err + }) +} diff --git a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/schema_telemetry.proto b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/schema_telemetry.proto new file mode 100644 index 000000000000..aec525b6cea4 --- /dev/null +++ b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/schema_telemetry.proto @@ -0,0 +1,19 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.sql; +option go_package = "schematelemetrycontroller"; + +// ScheduledSchemaTelemetryExecutionArgs is the arguments to the scheduled +// schema telemetry job. This is required to support SHOW SCHEDULE queries. +message ScheduledSchemaTelemetryExecutionArgs { + +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index e3b4d43eea93..7d62fc75b5ae 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" "github.com/cockroachdb/cockroach/pkg/sql/execstats" @@ -263,6 +264,10 @@ type Server struct { // sqlStatsController is the control-plane interface for sqlStats. sqlStatsController *persistedsqlstats.Controller + // schemaTelemetryController is the control-plane interface for schema + // telemetry. + schemaTelemetryController *schematelemetrycontroller.Controller + // indexUsageStatsController is the control-plane interface for // indexUsageStats. indexUsageStatsController *idxusage.Controller @@ -417,6 +422,12 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { s.sqlStats = persistedSQLStats s.sqlStatsController = persistedSQLStats.GetController(cfg.SQLStatusServer) + schemaTelemetryIEMonitor := MakeInternalExecutorMemMonitor(MemoryMetrics{}, s.GetExecutorConfig().Settings) + schemaTelemetryIEMonitor.StartNoReserved(context.Background(), s.GetBytesMonitor()) + schemaTelemetryIE := MakeInternalExecutor(s, MemoryMetrics{}, schemaTelemetryIEMonitor) + s.schemaTelemetryController = schematelemetrycontroller.NewController( + s.cfg.DB, &schemaTelemetryIE, s.cfg.Settings, s.cfg.JobRegistry, + ) s.indexUsageStatsController = idxusage.NewController(cfg.SQLStatusServer) return s } @@ -502,6 +513,9 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { s.sqlStats.Start(ctx, stopper) + s.schemaTelemetryController.EnsureScheduleAtStartup(ctx) + s.schemaTelemetryController.RegisterClusterSettingHook(ctx) + // reportedStats is periodically cleared to prevent too many SQL Stats // accumulated in the reporter when the telemetry server fails. // Usually it is telemetry's reporter's job to clear the reporting SQL Stats. @@ -518,6 +532,12 @@ func (s *Server) GetSQLStatsController() *persistedsqlstats.Controller { return s.sqlStatsController } +// GetSchemaTelemetryController returns the schematelemetryschedule.Controller +// for current sql.Server's schema telemetry. +func (s *Server) GetSchemaTelemetryController() *schematelemetrycontroller.Controller { + return s.schemaTelemetryController +} + // GetIndexUsageStatsController returns the idxusage.Controller for current // sql.Server's index usage stats. func (s *Server) GetIndexUsageStatsController() *idxusage.Controller { @@ -2709,6 +2729,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo SessionDataStack: ex.sessionDataStack, ReCache: ex.server.reCache, SQLStatsController: ex.server.sqlStatsController, + SchemaTelemetryController: ex.server.schemaTelemetryController, IndexUsageStatsController: ex.server.indexUsageStatsController, ConsistencyChecker: p.execCfg.ConsistencyChecker, RangeProber: p.execCfg.RangeProber, diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 230c93970554..4f7c81dd18b4 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -364,6 +364,7 @@ func (ds *ServerImpl) setupFlow( Txn: leafTxn, SQLLivenessReader: ds.ServerConfig.SQLLivenessReader, SQLStatsController: ds.ServerConfig.SQLStatsController, + SchemaTelemetryController: ds.ServerConfig.SchemaTelemetryController, IndexUsageStatsController: ds.ServerConfig.IndexUsageStatsController, } evalCtx.SetStmtTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.StmtTimestampNanos)) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 63d0cd9c70c6..36c76ddd50b1 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1198,6 +1198,7 @@ type ExecutorConfig struct { EvalContextTestingKnobs eval.TestingKnobs TenantTestingKnobs *TenantTestingKnobs TTLTestingKnobs *TTLTestingKnobs + SchemaTelemetryTestingKnobs *SchemaTelemetryTestingKnobs BackupRestoreTestingKnobs *BackupRestoreTestingKnobs StreamingTestingKnobs *StreamingTestingKnobs SQLStatsTestingKnobs *sqlstats.TestingKnobs @@ -1533,6 +1534,16 @@ type TTLTestingKnobs struct { // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (*TTLTestingKnobs) ModuleTestingKnobs() {} +// SchemaTelemetryTestingKnobs contains testing knobs for schema telemetry. +type SchemaTelemetryTestingKnobs struct { + // AOSTDuration changes the AOST timestamp duration to add to the + // current time. + AOSTDuration *time.Duration +} + +// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. +func (*SchemaTelemetryTestingKnobs) ModuleTestingKnobs() {} + // BackupRestoreTestingKnobs contains knobs for backup and restore behavior. type BackupRestoreTestingKnobs struct { // CaptureResolvedTableDescSpans allows for intercepting the spans which are diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index b84aa3927f62..1a48ccb20a02 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -174,6 +174,10 @@ type ServerConfig struct { // introduce dependency on the sql package. SQLStatsController eval.SQLStatsController + // SchemaTelemetryController is an interface used by the builtins to create a + // job schedule for schema telemetry jobs. + SchemaTelemetryController eval.SchemaTelemetryController + // IndexUsageStatsController is an interface used to reset index usage stats without // the need to introduce dependency on the sql package. IndexUsageStatsController eval.IndexUsageStatsController diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 0e770dc3a4d9..390402cdc223 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/sql/evalcatalog" "github.com/cockroachdb/cockroach/pkg/sql/idxusage" @@ -444,11 +445,13 @@ func internalExtendedEvalCtx( var indexUsageStats *idxusage.LocalIndexUsageStats var sqlStatsController eval.SQLStatsController + var schemaTelemetryController eval.SchemaTelemetryController var indexUsageStatsController eval.IndexUsageStatsController if execCfg.InternalExecutor != nil { if execCfg.InternalExecutor.s != nil { indexUsageStats = execCfg.InternalExecutor.s.indexUsageStats sqlStatsController = execCfg.InternalExecutor.s.sqlStatsController + schemaTelemetryController = execCfg.InternalExecutor.s.schemaTelemetryController indexUsageStatsController = execCfg.InternalExecutor.s.indexUsageStatsController } else { // If the indexUsageStats is nil from the sql.Server, we create a dummy @@ -458,6 +461,7 @@ func internalExtendedEvalCtx( Setting: execCfg.Settings, }) sqlStatsController = &persistedsqlstats.Controller{} + schemaTelemetryController = &schematelemetrycontroller.Controller{} indexUsageStatsController = &idxusage.Controller{} } } @@ -474,6 +478,7 @@ func internalExtendedEvalCtx( StmtTimestamp: stmtTimestamp, TxnTimestamp: txnTimestamp, SQLStatsController: sqlStatsController, + SchemaTelemetryController: schemaTelemetryController, IndexUsageStatsController: indexUsageStatsController, StmtDiagnosticsRequestInserter: execCfg.StmtDiagnosticsRecorder.InsertRequest, }, diff --git a/pkg/sql/sem/builtins/builtinconstants/constants.go b/pkg/sql/sem/builtins/builtinconstants/constants.go index 85df6367238d..3e9f30735e1b 100644 --- a/pkg/sql/sem/builtins/builtinconstants/constants.go +++ b/pkg/sql/sem/builtins/builtinconstants/constants.go @@ -69,6 +69,9 @@ const ( // RehomeRowBuiltinName is the name for the builtin that rehomes a row to the // user's gateway region, defaulting to the database primary region. RehomeRowBuiltinName = "rehome_row" + // CreateSchemaTelemetryJobBuiltinName is the name for the builtin that + // creates a job that logs SQL schema telemetry. + CreateSchemaTelemetryJobBuiltinName = "crdb_internal.create_sql_schema_telemetry_job" ) // NodeIDBits is the number of bits stored in the lower portion of diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 31ee2b326d9e..1f62d3a5cdec 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -6793,6 +6793,71 @@ table's zone configuration this will return NULL.`, }, ), + "crdb_internal.schedule_sql_schema_telemetry": makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategorySystemInfo, + }, + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Bool), + Fn: func(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + if evalCtx.SchemaTelemetryController == nil { + return nil, errors.AssertionFailedf("schema telemetry controller not set") + } + ctx := evalCtx.Ctx() + // The user must be an admin to use this builtin. + isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) + if err != nil { + return nil, err + } + if !isAdmin { + return nil, errInsufficientPriv + } + if err := evalCtx.SchemaTelemetryController.CreateSchemaTelemetrySchedule(ctx); err != nil { + return tree.DNull, err + } + return tree.DBoolTrue, nil + }, + Info: "This function is used to create the schema telemetry job schedule.", + Volatility: volatility.Volatile, + }, + ), + + builtinconstants.CreateSchemaTelemetryJobBuiltinName: makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategorySystemInfo, + }, + tree.Overload{ + Types: tree.ArgTypes{}, + ReturnType: tree.FixedReturnType(types.Int), + Fn: func(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + if evalCtx.SchemaTelemetryController == nil { + return nil, errors.AssertionFailedf("schema telemetry controller not set") + } + ctx := evalCtx.Ctx() + // The user must be an admin to use this builtin. + isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) + if err != nil { + return nil, err + } + if !isAdmin { + return nil, errInsufficientPriv + } + id, err := evalCtx.SchemaTelemetryController.CreateSchemaTelemetryJob( + ctx, + builtinconstants.CreateSchemaTelemetryJobBuiltinName, + int64(evalCtx.NodeID.SQLInstanceID()), + ) + if err != nil { + return tree.DNull, err + } + return tree.NewDInt(tree.DInt(id)), nil + }, + Info: "This function is used to create a schema telemetry job instance.", + Volatility: volatility.Volatile, + }, + ), + "crdb_internal.revalidate_unique_constraints_in_all_tables": makeBuiltin( tree.FunctionProperties{ Category: builtinconstants.CategorySystemInfo, diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 6e046b25c081..7a3d6b1f52db 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -188,6 +188,8 @@ type Context struct { SQLStatsController SQLStatsController + SchemaTelemetryController SchemaTelemetryController + IndexUsageStatsController IndexUsageStatsController // CompactEngineSpan is used to force compaction of a span in a store. diff --git a/pkg/sql/sem/eval/deps.go b/pkg/sql/sem/eval/deps.go index dfedf9b1778f..5d77b044177f 100644 --- a/pkg/sql/sem/eval/deps.go +++ b/pkg/sql/sem/eval/deps.go @@ -524,6 +524,14 @@ type SQLStatsController interface { CreateSQLStatsCompactionSchedule(ctx context.Context) error } +// SchemaTelemetryController is an interface embedded in EvalCtx which can be +// used by the builtins to create a job schedule for schema telemetry jobs. +// This interface is introduced to avoid circular dependency. +type SchemaTelemetryController interface { + CreateSchemaTelemetrySchedule(ctx context.Context) error + CreateSchemaTelemetryJob(ctx context.Context, createdByName string, createdByID int64) (int64, error) +} + // IndexUsageStatsController is an interface embedded in EvalCtx which can be // used by the builtins to reset index usage stats in the cluster. This interface // is introduced to avoid circular dependency. diff --git a/pkg/sql/sem/tree/show.go b/pkg/sql/sem/tree/show.go index a4a726869b4f..4d6e1de8a1d2 100644 --- a/pkg/sql/sem/tree/show.go +++ b/pkg/sql/sem/tree/show.go @@ -794,6 +794,10 @@ const ( // ScheduledRowLevelTTLExecutor is an executor responsible for the cleanup // of rows on row level TTL tables. ScheduledRowLevelTTLExecutor + + // ScheduledSchemaTelemetryExecutor is an executor responsible for the logging + // of schema telemetry. + ScheduledSchemaTelemetryExecutor ) var scheduleExecutorInternalNames = map[ScheduledJobExecutorType]string{ @@ -801,6 +805,7 @@ var scheduleExecutorInternalNames = map[ScheduledJobExecutorType]string{ ScheduledBackupExecutor: "scheduled-backup-executor", ScheduledSQLStatsCompactionExecutor: "scheduled-sql-stats-compaction-executor", ScheduledRowLevelTTLExecutor: "scheduled-row-level-ttl-executor", + ScheduledSchemaTelemetryExecutor: "scheduled-schema-telemetry-executor", } // InternalName returns an internal executor name. @@ -818,6 +823,8 @@ func (t ScheduledJobExecutorType) UserName() string { return "SQL STATISTICS" case ScheduledRowLevelTTLExecutor: return "ROW LEVEL TTL" + case ScheduledSchemaTelemetryExecutor: + return "SCHEMA TELEMETRY" } return "unsupported-executor" } diff --git a/pkg/sql/sqltelemetry/schema.go b/pkg/sql/sqltelemetry/schema.go index 009ee2932bb6..8bb77fd6c9ea 100644 --- a/pkg/sql/sqltelemetry/schema.go +++ b/pkg/sql/sqltelemetry/schema.go @@ -98,6 +98,11 @@ var ( TempObjectCleanerDeletionCounter = telemetry.GetCounterOnce("sql.schema.temp_object_cleaner.num_cleaned") ) +var ( + // SchemaTelemetryExecuted is incremented when a schema telemetry job has executed. + SchemaTelemetryExecuted = telemetry.GetCounterOnce("sql.schema.telemetry.job_executed") +) + // SchemaNewColumnTypeQualificationCounter is to be incremented every time // a new qualification is used for a newly created column. func SchemaNewColumnTypeQualificationCounter(qual string) telemetry.Counter { diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index a1e647613e2d..9f7c1f480ca9 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -2321,6 +2321,14 @@ var charts = []sectionDescription{ "jobs.auto_schema_telemetry.resume_retry_error", }, }, + { + Title: "Scheduled Jobs Statistics", + Metrics: []string{ + "schedules.scheduled-schema-telemetry-executor.succeeded", + "schedules.scheduled-schema-telemetry-executor.started", + "schedules.scheduled-schema-telemetry-executor.failed", + }, + }, }, }, { diff --git a/pkg/upgrade/upgrades/BUILD.bazel b/pkg/upgrade/upgrades/BUILD.bazel index a53bf7c8c4cf..d34aa27a0b36 100644 --- a/pkg/upgrade/upgrades/BUILD.bazel +++ b/pkg/upgrade/upgrades/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "alter_table_protected_timestamp_records.go", "comment_on_index_migration.go", "descriptor_utils.go", + "ensure_sql_schema_telemetry_schedule.go", "migrate_span_configs.go", "precondition_before_starting_an_upgrade.go", "public_schema_migration.go", @@ -41,6 +42,7 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/catalog/resolver", "//pkg/sql/catalog/schemadesc", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller", "//pkg/sql/catalog/seqexpr", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", @@ -71,6 +73,7 @@ go_test( "builtins_test.go", "comment_on_index_migration_external_test.go", "descriptor_utils_test.go", + "ensure_sql_schema_telemetry_schedule_test.go", "helpers_test.go", "main_test.go", "migrate_span_configs_test.go", @@ -88,6 +91,8 @@ go_test( deps = [ "//pkg/base", "//pkg/clusterversion", + "//pkg/jobs", + "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/rangefeed", @@ -109,9 +114,11 @@ go_test( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/desctestutils", + "//pkg/sql/catalog/schematelemetry/schematelemetrycontroller", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/privilege", + "//pkg/sql/sem/builtins/builtinconstants", "//pkg/sql/sem/catconstants", "//pkg/sql/sem/tree", "//pkg/sql/sqlutil", diff --git a/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule.go b/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule.go new file mode 100644 index 000000000000..afdf20651302 --- /dev/null +++ b/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule.go @@ -0,0 +1,26 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" + "github.com/cockroachdb/cockroach/pkg/upgrade" +) + +func ensureSQLSchemaTelemetrySchedule( + ctx context.Context, cs clusterversion.ClusterVersion, d upgrade.TenantDeps, _ *jobs.Job, +) error { + return schematelemetrycontroller.EnsureScheduleAndRegisterClusterSettingHook(ctx, d.DB, d.InternalExecutor, d.Settings) +} diff --git a/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go b/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go new file mode 100644 index 000000000000..81b6d8679e07 --- /dev/null +++ b/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go @@ -0,0 +1,87 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package upgrades_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +func TestSchemaTelemetrySchedule(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var args base.TestServerArgs + args.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{ + JobSchedulerEnv: jobstest.NewJobSchedulerTestEnv( + jobstest.UseSystemTables, + timeutil.Now(), + tree.ScheduledSchemaTelemetryExecutor, + ), + } + aostDuration := time.Nanosecond + args.Knobs.SchemaTelemetry = &sql.SchemaTelemetryTestingKnobs{ + AOSTDuration: &aostDuration, + } + args.Knobs.Server = &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey(clusterversion.SQLSchemaTelemetryScheduledJobs - 1), + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(ctx) + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + qExists := fmt.Sprintf(` + SELECT recurrence, count(*) + FROM [SHOW SCHEDULES] + WHERE label = '%s' + GROUP BY recurrence`, + schematelemetrycontroller.SchemaTelemetryScheduleName) + + qJob := fmt.Sprintf(`SELECT %s()`, + builtinconstants.CreateSchemaTelemetryJobBuiltinName) + + // Check that there is no schema telemetry schedule and that creating schema + // telemetry jobs is not possible. + tdb.CheckQueryResults(t, qExists, [][]string{}) + tdb.ExpectErr(t, schematelemetrycontroller.ErrVersionGate.Error(), qJob) + + // Upgrade the cluster. + tdb.Exec(t, `SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(clusterversion.SQLSchemaTelemetryScheduledJobs).String()) + + // Check that the schedule now exists and that jobs can be created. + tdb.Exec(t, qJob) + tdb.CheckQueryResultsRetry(t, qExists, [][]string{{"@daily", "1"}}) + + // Check that the schedule can have its recurrence altered. + tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING %s = '* * * * *'`, + schematelemetrycontroller.SchemaTelemetryRecurrence.Key())) + tdb.CheckQueryResultsRetry(t, qExists, [][]string{{"* * * * *", "1"}}) +} diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index 627a9f1cdcae..15570749b785 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -140,6 +140,12 @@ var upgrades = []upgrade.Upgrade{ NoPrecondition, systemExternalConnectionsTableMigration, ), + upgrade.NewTenantUpgrade( + "add default SQL schema telemetry schedule", + toCV(clusterversion.SQLSchemaTelemetryScheduledJobs), + NoPrecondition, + ensureSQLSchemaTelemetrySchedule, + ), } func init() {