diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index a12e4024cefee..d71251c835d91 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -274,6 +274,7 @@ go_test( "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", "@org_golang_x_exp//slices", + "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/ddl/ddl_api_test.go b/ddl/ddl_api_test.go index a6a5a3187eb3a..b3053c792950d 100644 --- a/ddl/ddl_api_test.go +++ b/ddl/ddl_api_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/util/cmp" "github.com/stretchr/testify/require" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" ) func TestGetDDLJobs(t *testing.T) { @@ -219,6 +220,58 @@ func enQueueDDLJobs(t *testing.T, sess session.Session, txn kv.Transaction, jobT } } +func TestCreateViewConcurrently(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create table t (a int);") + tk.MustExec("create view v as select * from t;") + tk.MustExec("set global tidb_enable_metadata_lock = 1;") + tk.MustExec("set global tidb_enable_concurrent_ddl = 1;") + var ( + counterErr error + counter int + ) + ddl.OnCreateViewForTest = func(job *model.Job) { + counter++ + if counter > 1 { + counterErr = fmt.Errorf("create view job should not run concurrently") + return + } + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/onDDLCreateView", "return")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/onDDLCreateView")) + }() + + ddl.AfterDelivery2WorkerForTest = func(job *model.Job) { + if job.Type == model.ActionCreateView { + counter-- + } + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/afterDelivery2Worker", "return")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/afterDelivery2Worker")) + }() + + var eg errgroup.Group + for i := 0; i < 5; i++ { + eg.Go(func() error { + newTk := testkit.NewTestKit(t, store) + _, err := newTk.Exec("use test") + if err != nil { + return err + } + _, err = newTk.Exec("create or replace view v as select * from t;") + return err + }) + } + err := eg.Wait() + require.NoError(t, err) + require.NoError(t, counterErr) +} + func TestCreateDropCreateTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/job_table.go b/ddl/job_table.go index 4ccb604345a0b..6215e77e61196 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -204,6 +204,9 @@ func (d *ddl) loadDDLJobAndRun(sess *session, pool *workerPool, getJob func(*ses d.delivery2worker(wk, pool, job) } +// AfterDelivery2WorkerForTest is only used for test. +var AfterDelivery2WorkerForTest func(*model.Job) + // delivery2worker owns the worker, need to put it back to the pool in this function. func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { injectFailPointForGetJob(job) @@ -211,6 +214,9 @@ func (d *ddl) delivery2worker(wk *worker, pool *workerPool, job *model.Job) { d.wg.Run(func() { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() defer func() { + failpoint.Inject("afterDelivery2Worker", func() { + AfterDelivery2WorkerForTest(job) + }) d.runningJobs.remove(job) asyncNotify(d.ddlJobCh) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() diff --git a/ddl/placement_policy.go b/ddl/placement_policy.go index eedb4bbf09dcf..47bfe63b11784 100644 --- a/ddl/placement_policy.go +++ b/ddl/placement_policy.go @@ -118,7 +118,7 @@ func getPlacementPolicyByName(d *ddlCtx, t *meta.Meta, policyName model.CIStr) ( } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { // Use cached policy. policy, ok := is.PolicyByName(policyName) if ok { @@ -319,7 +319,7 @@ func checkPlacementPolicyNotInUse(d *ddlCtx, t *meta.Meta, policy *model.PolicyI return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return CheckPlacementPolicyNotInUseFromInfoSchema(is, policy) } diff --git a/ddl/schema.go b/ddl/schema.go index d9e86c30c5eaf..6635ce7468905 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -76,7 +76,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) } return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) diff --git a/ddl/table.go b/ddl/table.go index 0e836a2d41687..3dcdc172fac34 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -269,18 +269,28 @@ func repairTableOrViewWithCheck(t *meta.Meta, job *model.Job, schemaID int64, tb return t.UpdateTable(schemaID, tbInfo) } +// OnCreateViewForTest is only used for test. +var OnCreateViewForTest func(*model.Job) + func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { schemaID := job.SchemaID tbInfo := &model.TableInfo{} var orReplace bool - var oldTbInfoID int64 - if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil { + var _placeholder int64 // oldTblInfoID + if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } tbInfo.State = model.StateNone - err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) + + oldTableID, err := findTableIDByName(d, t, schemaID, tbInfo.Name.L) + if infoschema.ErrTableNotExists.Equal(err) { + err = nil + } + failpoint.Inject("onDDLCreateView", func() { + OnCreateViewForTest(job) + }) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) { job.State = model.JobStateCancelled @@ -303,13 +313,13 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) // none -> public tbInfo.State = model.StatePublic tbInfo.UpdateTS = t.StartTS - if oldTbInfoID > 0 && orReplace { - err = t.DropTableOrView(schemaID, oldTbInfoID) + if oldTableID > 0 && orReplace { + err = t.DropTableOrView(schemaID, oldTableID) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - err = t.GetAutoIDAccessors(schemaID, oldTbInfoID).Del() + err = t.GetAutoIDAccessors(schemaID, oldTableID).Del() if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -1454,7 +1464,7 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } @@ -1507,6 +1517,48 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string return nil } +func findTableIDByName(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) { + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return 0, err + } + is := d.infoCache.GetLatest() + if is != nil && is.SchemaMetaVersion() == currVer { + return findTableIDFromInfoSchema(is, schemaID, tableName) + } + + return findTableIDFromStore(t, schemaID, tableName) +} + +func findTableIDFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) (int64, error) { + schema, ok := is.SchemaByID(schemaID) + if !ok { + return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + tbl, err := is.TableByName(schema.Name, model.NewCIStr(tableName)) + if err != nil { + return 0, err + } + return tbl.Meta().ID, nil +} + +func findTableIDFromStore(t *meta.Meta, schemaID int64, tableName string) (int64, error) { + tbls, err := t.ListTables(schemaID) + if err != nil { + if meta.ErrDBNotExists.Equal(err) { + return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + return 0, errors.Trace(err) + } + for _, tbl := range tbls { + if tbl.Name.L == tableName { + return tbl.ID, nil + } + } + return 0, infoschema.ErrTableNotExists.FastGenByArgs(tableName) +} + // updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) ( ver int64, err error) { diff --git a/ddl/table_test.go b/ddl/table_test.go index 195c188298476..b293857867446 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -308,7 +308,8 @@ func TestCreateView(t *testing.T) { } ctx.SetValue(sessionctx.QueryString, "skip") err = d.DoDDLJob(ctx, job) - require.Error(t, err) + // The non-existing table id in job args will not be considered anymore. + require.NoError(t, err) } func checkTableCacheTest(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {