Skip to content

Commit

Permalink
sql,schemachanger: disallow concurrent execution for new schema changes
Browse files Browse the repository at this point in the history
This PR prevents new-style schema changes from running concurrently with
any other schema changes, in two ways:

1. If there are mutations in progress (from either new or old schema
   changes) when attempting to plan a new-style schema change, we wait
   and poll until there are no mutations, and then restart the
   transaction.
2. If we try to write an old-style schema change job while there is a
   new-style schema change on the table, which is detected via a new
   field on the table descriptor for the new-style schema change job ID,
   an error is returned. This effectively prevents all schema changes,
   even the ones without mutations.

Most of this commit consists of testing. Testing knobs for the new
schema changer are introduced. We also now accumulate the statements
involved in the schema change, as strings, in the schema changer state
in `extraTxnState`. The executor now takes an argument to inject more
relevant state into the testing knobs, including the aforementioned
statements.

Release justification: Non-production code change (the new schema
changer is disabled for 21.1)

Release note: None
  • Loading branch information
thoszhang committed Feb 27, 2021
1 parent be0383e commit d67cc55
Show file tree
Hide file tree
Showing 24 changed files with 1,271 additions and 435 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ ALL_TESTS = [
"//pkg/sql/schemachanger/scbuild:scbuild_test",
"//pkg/sql/schemachanger/scexec:scexec_test",
"//pkg/sql/schemachanger/scpb:scpb_test",
"//pkg/sql/schemachanger:schemachanger_test",
"//pkg/sql/sem/builtins:builtins_test",
"//pkg/sql/sem/tree/eval_test:eval_test_test",
"//pkg/sql/sem/tree:tree_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type TestingKnobs struct {
SQLExecutor ModuleTestingKnobs
SQLLeaseManager ModuleTestingKnobs
SQLSchemaChanger ModuleTestingKnobs
SQLNewSchemaChanger ModuleTestingKnobs
SQLTypeSchemaChanger ModuleTestingKnobs
GCJob ModuleTestingKnobs
PGWireTestingKnobs ModuleTestingKnobs
Expand Down
4 changes: 1 addition & 3 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ go_library(
"//pkg/sql/physicalplan",
"//pkg/sql/querycache",
"//pkg/sql/roleoption",
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/schemachanger/scjob",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down Expand Up @@ -232,7 +233,6 @@ go_library(

go_test(
name = "server_test",
size = "medium",
srcs = [
"admin_cluster_test.go",
"admin_test.go",
Expand Down Expand Up @@ -265,8 +265,6 @@ go_test(
],
data = glob(["testdata/**"]),
embed = [":server"],
shard_count = 16,
tags = ["broken_in_bazel"],
deps = [
"//pkg/base",
"//pkg/build",
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/optionalnodeliveness"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
Expand Down Expand Up @@ -544,6 +545,11 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
} else {
execCfg.SchemaChangerTestingKnobs = new(sql.SchemaChangerTestingKnobs)
}
if sqlNewSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLNewSchemaChanger; sqlNewSchemaChangerTestingKnobs != nil {
execCfg.NewSchemaChangerTestingKnobs = sqlNewSchemaChangerTestingKnobs.(*scexec.NewSchemaChangerTestingKnobs)
} else {
execCfg.NewSchemaChangerTestingKnobs = new(scexec.NewSchemaChangerTestingKnobs)
}
if sqlTypeSchemaChangerTestingKnobs := cfg.TestingKnobs.SQLTypeSchemaChanger; sqlTypeSchemaChangerTestingKnobs != nil {
execCfg.TypeSchemaChangerTestingKnobs = sqlTypeSchemaChangerTestingKnobs.(*sql.TypeSchemaChangerTestingKnobs)
} else {
Expand Down
819 changes: 430 additions & 389 deletions pkg/sql/catalog/descpb/structured.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,12 @@ message TableDescriptor {
// with the mutations list.
repeated MutationJob mutationJobs = 27 [(gogoproto.nullable) = false];

// The job associated with a schema change job run in the new schema changer
// (in sql/schemachanger), if one exists. Only one such job can exist at a
// time.
optional int64 new_schema_change_job_id = 45 [(gogoproto.nullable) = false,
(gogoproto.customname) = "NewSchemaChangeJobID"];

message SequenceOpts {
option (gogoproto.equal) = true;
// How much to increment the sequence by when nextval() is called.
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,17 @@ func (desc *wrapper) ValidateSelf(ctx context.Context) error {
}
}

// Validate that the presence of MutationJobs (from the old schema changer)
// and the presence of a NewSchemaChangeJobID are mutually exclusive. (Note
// the jobs themselves can be running simultaneously, since a resumer can
// still be running after the schema change is complete from the point of view
// of the descriptor, in both the new and old schema change jobs.)
if len(desc.MutationJobs) > 0 && desc.NewSchemaChangeJobID != 0 {
return errors.AssertionFailedf(
"invalid concurrent new-style schema change job %d and old-style schema change jobs %v",
desc.NewSchemaChangeJobID, desc.MutationJobs)
}

if err := desc.validateTableIfTesting(ctx); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ var validationMap = []struct {
"Temporary": {status: thisFieldReferencesNoObjects},
"LocalityConfig": {status: iSolemnlySwearThisFieldIsValidated},
"PartitionAllBy": {status: iSolemnlySwearThisFieldIsValidated},
"NewSchemaChangeJobID": {status: iSolemnlySwearThisFieldIsValidated},
},
},
{
Expand Down
53 changes: 42 additions & 11 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2564,44 +2564,71 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) {
// runPreCommitStages is part of the new schema changer infrastructure to
// mutate descriptors prior to committing a SQL transaction.
func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
if len(ex.extraTxnState.schemaChangerState.nodes) == 0 {
scs := &ex.extraTxnState.schemaChangerState
if len(scs.nodes) == 0 {
return nil
}
executor := scexec.NewExecutor(
ex.planner.txn, &ex.extraTxnState.descCollection, ex.server.cfg.Codec,
nil /* backfiller */, nil, /* jobTracker */
nil /* backfiller */, nil /* jobTracker */, ex.server.cfg.NewSchemaChangerTestingKnobs,
)
after, err := runNewSchemaChanger(
ctx, scplan.PreCommitPhase,
ctx,
scplan.PreCommitPhase,
ex.extraTxnState.schemaChangerState.nodes,
executor,
scs.stmts,
)
if err != nil {
return err
}
scs := &ex.extraTxnState.schemaChangerState
scs.nodes = after
targetSlice := make([]*scpb.Target, len(scs.nodes))
states := make([]scpb.State, len(scs.nodes))
// TODO(ajwerner): It may be better in the future to have the builder be
// responsible for determining this set of descriptors. As of the time of
// writing, the descriptors to be "locked," descriptors that need schema
// change jobs, and descriptors with schema change mutations all coincide. But
// there are future schema changes to be implemented in the new schema changer
// (e.g., RENAME TABLE) for which this may no longer be true.
descIDSet := catalog.MakeDescriptorIDSet()
for i := range scs.nodes {
targetSlice[i] = scs.nodes[i].Target
states[i] = scs.nodes[i].State
descIDSet.Add(scs.nodes[i].Element().DescriptorID())
}
_, err = ex.planner.extendedEvalCtx.QueueJob(ctx, jobs.Record{
descIDs := descIDSet.Ordered()
job, err := ex.planner.extendedEvalCtx.QueueJob(ctx, jobs.Record{
Description: "Schema change job", // TODO(ajwerner): use const
Statement: "", // TODO(ajwerner): combine all of the DDL statements together
Statement: strings.Join(scs.stmts, "; "),
Username: ex.planner.User(),
DescriptorIDs: nil, // TODO(ajwerner): populate
Details: jobspb.NewSchemaChangeDetails{Targets: targetSlice},
DescriptorIDs: descIDs,
Details: jobspb.NewSchemaChangeDetails{
Targets: targetSlice,
},
Progress: jobspb.NewSchemaChangeProgress{States: states},
RunningStatus: "",
NonCancelable: false,
})
return err
if err != nil {
return err
}
// Write the job ID to the affected descriptors.
if err := scexec.UpdateDescriptorJobIDs(
ctx, ex.planner.Txn(), &ex.extraTxnState.descCollection, descIDs, jobspb.InvalidJobID, job.ID(),
); err != nil {
return err
}
log.Infof(ctx, "queued new schema change job %d using the new schema changer", job.ID())
return nil
}

func runNewSchemaChanger(
ctx context.Context, phase scplan.Phase, nodes []*scpb.Node, executor *scexec.Executor,
ctx context.Context,
phase scplan.Phase,
nodes []*scpb.Node,
executor *scexec.Executor,
stmts []string,
) (after []*scpb.Node, _ error) {
sc, err := scplan.MakePlan(nodes, scplan.Params{
ExecutionPhase: phase,
Expand All @@ -2612,7 +2639,11 @@ func runNewSchemaChanger(
}
after = nodes
for _, s := range sc.Stages {
if err := executor.ExecuteOps(ctx, s.Ops); err != nil {
if err := executor.ExecuteOps(ctx, s.Ops,
scexec.TestingKnobMetadata{
Statements: stmts,
Phase: phase,
}); err != nil {
return nil, err
}
after = s.After
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -422,6 +424,16 @@ func (p *planner) initiateDropTable(
return errors.Errorf("table %q is already being dropped", tableDesc.Name)
}

// Exit early with an error if the table is undergoing a new-style schema
// change, before we try to get job IDs and update job statuses later. See
// createOrUpdateSchemaChangeJob.
if tableDesc.NewSchemaChangeJobID != 0 {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"cannot perform a schema change on table %q while it is undergoing a new-style schema change",
tableDesc.GetName(),
)
}

// If the table is not interleaved , use the delayed GC mechanism to
// schedule usage of the more efficient ClearRange pathway. ClearRange will
// only work if the entire hierarchy of interleaved tables are dropped at
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
Expand Down Expand Up @@ -337,7 +338,6 @@ var experimentalUniqueWithoutIndexConstraintsMode = settings.RegisterBoolSetting
false,
)

// DistSQLClusterExecMode controls the cluster default for when DistSQL is used.
var experimentalUseNewSchemaChanger = settings.RegisterEnumSetting(
"sql.defaults.experimental_new_schema_changer.enabled",
"default value for experimental_use_new_schema_changer session setting;"+
Expand Down Expand Up @@ -787,6 +787,7 @@ type ExecutorConfig struct {
TestingKnobs ExecutorTestingKnobs
PGWireTestingKnobs *PGWireTestingKnobs
SchemaChangerTestingKnobs *SchemaChangerTestingKnobs
NewSchemaChangerTestingKnobs *scexec.NewSchemaChangerTestingKnobs
TypeSchemaChangerTestingKnobs *TypeSchemaChangerTestingKnobs
GCJobTestingKnobs *GCJobTestingKnobs
DistSQLRunTestingKnobs *execinfra.TestingKnobs
Expand Down
Loading

0 comments on commit d67cc55

Please sign in to comment.