From 06d339cb2f8eb82fd53f3707786a5f900fd44aac Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 31 May 2024 17:12:32 +0800 Subject: [PATCH 1/7] ddl: get latest old table ID before replace view --- pkg/ddl/ddl.go | 6 ----- pkg/ddl/table.go | 60 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index ef17d29ccd316..c3e283ecbc1da 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -65,7 +65,6 @@ import ( "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/gcutil" "github.com/pingcap/tidb/pkg/util/generic" - "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/tikv/client-go/v2/tikvrpc" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -388,11 +387,6 @@ type ddlCtx struct { // reorgCtx is used for reorganization. reorgCtx reorgContexts - // backfillCtx is used for backfill workers. - backfillCtx struct { - syncutil.RWMutex - jobCtxMap map[int64]*JobContext - } jobCtx struct { sync.RWMutex diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 218701d5b8bcb..eabae7889dcad 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -308,22 +308,26 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) schemaID := job.SchemaID tbInfo := &model.TableInfo{} var orReplace bool - var oldTbInfoID int64 - if err := job.DecodeArgs(tbInfo, &orReplace, &oldTbInfoID); err != nil { + var placeholder any // oldTblInfoID + if err := job.DecodeArgs(tbInfo, &orReplace, &placeholder); err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err) } tbInfo.State = model.StateNone - err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) + + oldTableID, err := findOldTableID(d, t, schemaID, tbInfo.Name.L) if err != nil { + shouldContinue := false if infoschema.ErrDatabaseNotExists.Equal(err) { job.State = model.JobStateCancelled return ver, errors.Trace(err) + } else if infoschema.ErrTableNotExists.Equal(err) { + shouldContinue = true } else if !infoschema.ErrTableExists.Equal(err) { return ver, errors.Trace(err) } - if !orReplace { + if !orReplace && !shouldContinue { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -337,13 +341,13 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) // none -> public tbInfo.State = model.StatePublic tbInfo.UpdateTS = t.StartTS - if oldTbInfoID > 0 && orReplace { - err = t.DropTableOrView(schemaID, job.SchemaName, oldTbInfoID, tbInfo.Name.L) + if oldTableID > 0 && orReplace { + err = t.DropTableOrView(schemaID, job.SchemaName, oldTableID, tbInfo.Name.L) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - err = t.GetAutoIDAccessors(schemaID, oldTbInfoID).Del() + err = t.GetAutoIDAccessors(schemaID, oldTableID).Del() if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -1601,6 +1605,48 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string return nil } +func findOldTableID(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) { + // Try to use memory schema info to check first. + currVer, err := t.GetSchemaVersion() + if err != nil { + return 0, err + } + is := d.infoCache.GetLatest() + if is.SchemaMetaVersion() == currVer { + return findTableIDFromInfoSchema(is, schemaID, tableName) + } + + return findTableIDFromStore(t, schemaID, tableName) +} + +func findTableIDFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) (int64, error) { + schema, ok := is.SchemaByID(schemaID) + if !ok { + return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + tbl, err := is.TableByName(schema.Name, model.NewCIStr(tableName)) + if err != nil { + return 0, err + } + return tbl.Meta().ID, nil +} + +func findTableIDFromStore(t *meta.Meta, schemaID int64, tableName string) (int64, error) { + tbls, err := t.ListSimpleTables(schemaID) + if err != nil { + if meta.ErrDBNotExists.Equal(err) { + return 0, infoschema.ErrDatabaseNotExists.GenWithStackByArgs("") + } + return 0, errors.Trace(err) + } + for _, tbl := range tbls { + if tbl.Name.L == tableName { + return tbl.ID, nil + } + } + return 0, infoschema.ErrTableNotExists.FastGenByArgs(tableName) +} + // updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) ( ver int64, err error) { From afd3edfd325286becaf00a4a7d50de57ef4d59f4 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 31 May 2024 17:23:37 +0800 Subject: [PATCH 2/7] extract ErrTableNotExists error handling --- pkg/ddl/table.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index eabae7889dcad..7779b30b61ce6 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -317,17 +317,15 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) tbInfo.State = model.StateNone oldTableID, err := findOldTableID(d, t, schemaID, tbInfo.Name.L) + err = ignoreTableNotExistsErr(err) if err != nil { - shouldContinue := false if infoschema.ErrDatabaseNotExists.Equal(err) { job.State = model.JobStateCancelled return ver, errors.Trace(err) - } else if infoschema.ErrTableNotExists.Equal(err) { - shouldContinue = true } else if !infoschema.ErrTableExists.Equal(err) { return ver, errors.Trace(err) } - if !orReplace && !shouldContinue { + if !orReplace { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -366,6 +364,13 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } } +func ignoreTableNotExistsErr(err error) error { + if infoschema.ErrTableNotExists.Equal(err) { + return nil + } + return err +} + func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, job.SchemaID) if err != nil { From b00e265e6b7c011feb86d34365812afea77bb6ff Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 31 May 2024 17:34:13 +0800 Subject: [PATCH 3/7] update bazel --- pkg/ddl/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index d1651a502a240..7b4eb0f9988d5 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -164,7 +164,6 @@ go_library( "//pkg/util/sqlexec", "//pkg/util/sqlkiller", "//pkg/util/stringutil", - "//pkg/util/syncutil", "//pkg/util/tiflash", "//pkg/util/timeutil", "//pkg/util/topsql", From 56c559f8cab95dfee76ddd974e54d0520c1b086c Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 31 May 2024 18:16:42 +0800 Subject: [PATCH 4/7] fix test TestCreateView --- pkg/ddl/table_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/ddl/table_test.go b/pkg/ddl/table_test.go index 61fc39a943de4..13d37572e5cef 100644 --- a/pkg/ddl/table_test.go +++ b/pkg/ddl/table_test.go @@ -309,7 +309,8 @@ func TestCreateView(t *testing.T) { } ctx.SetValue(sessionctx.QueryString, "skip") err = d.DoDDLJob(ctx, job) - require.Error(t, err) + // The non-existing table id in job args will not be considered anymore. + require.NoError(t, err) } func checkTableCacheTest(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) { From c52830e6a04a744499545d1f2e0fd4f322643f6d Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 3 Jun 2024 15:13:31 +0800 Subject: [PATCH 5/7] address comment and add a test --- pkg/ddl/BUILD.bazel | 1 + pkg/ddl/ddl_api_test.go | 45 +++++++++++++++++++++++++++++++++++++ pkg/ddl/job_table.go | 1 + pkg/ddl/placement_policy.go | 4 ++-- pkg/ddl/schema.go | 2 +- pkg/ddl/table.go | 22 ++++++++---------- 6 files changed, 59 insertions(+), 16 deletions(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 7b4eb0f9988d5..b7ef1dfe102dd 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -339,6 +339,7 @@ go_test( "@com_github_tikv_client_go_v2//util", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", + "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/pkg/ddl/ddl_api_test.go b/pkg/ddl/ddl_api_test.go index be1d6c029a8a3..95615202f74c9 100644 --- a/pkg/ddl/ddl_api_test.go +++ b/pkg/ddl/ddl_api_test.go @@ -21,6 +21,7 @@ import ( "slices" "sync" "testing" + "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestGetDDLJobs(t *testing.T) { @@ -151,6 +153,49 @@ func enQueueDDLJobs(t *testing.T, sess sessiontypes.Session, txn kv.Transaction, } } +func TestCreateViewConcurrently(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + tk.MustExec("create table t (a int);") + tk.MustExec("create view v as select * from t;") + var ( + counterErr error + counter int + ) + failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/onDDLCreateView", func(job *model.Job) { + counter++ + if counter > 1 { + counterErr = fmt.Errorf("create view job should not run concurrently") + return + } + <-time.After(300 * time.Millisecond) + }) + failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { + if job.Type == model.ActionCreateView { + counter-- + } + }) + var eg errgroup.Group + for i := 0; i < 5; i++ { + eg.Go(func() error { + newTk := testkit.NewTestKit(t, store) + _, err := newTk.Exec("use test") + if err != nil { + return err + } + _, err = newTk.Exec("create or replace view v as select * from t;") + return err + }) + } + err := eg.Wait() + require.NoError(t, err) + require.NoError(t, counterErr) +} + func TestCreateDropCreateTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 861a083969adf..083c71a81f692 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -511,6 +511,7 @@ func (s *jobScheduler) delivery2Worker(wk *worker, pool *workerPool, job *model. s.wg.Run(func() { metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc() defer func() { + failpoint.InjectCall("afterDelivery2Worker", job) s.runningJobs.remove(job) asyncNotify(s.ddlJobNotifyCh) metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Dec() diff --git a/pkg/ddl/placement_policy.go b/pkg/ddl/placement_policy.go index 28cc053238c70..799da52e5efb3 100644 --- a/pkg/ddl/placement_policy.go +++ b/pkg/ddl/placement_policy.go @@ -119,7 +119,7 @@ func getPlacementPolicyByName(d *ddlCtx, t *meta.Meta, policyName model.CIStr) ( } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { // Use cached policy. policy, ok := is.PolicyByName(policyName) if ok { @@ -346,7 +346,7 @@ func checkPlacementPolicyNotInUse(d *ddlCtx, t *meta.Meta, policy *model.PolicyI return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { err = CheckPlacementPolicyNotInUseFromInfoSchema(is, policy) } else { err = CheckPlacementPolicyNotInUseFromMeta(t, policy) diff --git a/pkg/ddl/schema.go b/pkg/ddl/schema.go index 3be32a14a26de..412169b08e11f 100644 --- a/pkg/ddl/schema.go +++ b/pkg/ddl/schema.go @@ -76,7 +76,7 @@ func checkSchemaNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, dbInfo *model return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkSchemaNotExistsFromInfoSchema(is, schemaID, dbInfo) } return checkSchemaNotExistsFromStore(t, schemaID, dbInfo) diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 7779b30b61ce6..2a03669f00da1 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -316,8 +316,11 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } tbInfo.State = model.StateNone - oldTableID, err := findOldTableID(d, t, schemaID, tbInfo.Name.L) - err = ignoreTableNotExistsErr(err) + oldTableID, err := findTableIDByName(d, t, schemaID, tbInfo.Name.L) + if infoschema.ErrTableNotExists.Equal(err) { + err = nil + } + failpoint.InjectCall("onDDLCreateView", job) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) { job.State = model.JobStateCancelled @@ -364,13 +367,6 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) } } -func ignoreTableNotExistsErr(err error) error { - if infoschema.ErrTableNotExists.Equal(err) { - return nil - } - return err -} - func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, job.SchemaID) if err != nil { @@ -1522,7 +1518,7 @@ func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName stri return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } @@ -1536,7 +1532,7 @@ func checkTableNotExistsByName(d *ddlCtx, t *meta.Meta, schemaID int64, schemaNa return err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return checkTableNotExistsFromInfoSchema(is, schemaID, tableName) } return t.CheckTableNameNotExists(t.TableNameKey(schemaName, tableName)) @@ -1610,14 +1606,14 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string return nil } -func findOldTableID(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) { +func findTableIDByName(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) (int64, error) { // Try to use memory schema info to check first. currVer, err := t.GetSchemaVersion() if err != nil { return 0, err } is := d.infoCache.GetLatest() - if is.SchemaMetaVersion() == currVer { + if is != nil && is.SchemaMetaVersion() == currVer { return findTableIDFromInfoSchema(is, schemaID, tableName) } From 6da24f0828b24acfde2c6ef0c4151685eab8235b Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 3 Jun 2024 15:32:58 +0800 Subject: [PATCH 6/7] address comments --- pkg/ddl/ddl_api_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/ddl/ddl_api_test.go b/pkg/ddl/ddl_api_test.go index 95615202f74c9..af431dc3d88c2 100644 --- a/pkg/ddl/ddl_api_test.go +++ b/pkg/ddl/ddl_api_test.go @@ -21,7 +21,6 @@ import ( "slices" "sync" "testing" - "time" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl" @@ -157,8 +156,6 @@ func TestCreateViewConcurrently(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - tk1 := testkit.NewTestKit(t, store) - tk1.MustExec("use test") tk.MustExec("create table t (a int);") tk.MustExec("create view v as select * from t;") @@ -172,7 +169,6 @@ func TestCreateViewConcurrently(t *testing.T) { counterErr = fmt.Errorf("create view job should not run concurrently") return } - <-time.After(300 * time.Millisecond) }) failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { if job.Type == model.ActionCreateView { From efe6b93c75db84863885fe8ae31ffb997b6a2705 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 3 Jun 2024 16:56:44 +0800 Subject: [PATCH 7/7] refine naming --- pkg/ddl/table.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index 2a03669f00da1..892a3130039cf 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -308,8 +308,8 @@ func onCreateView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) schemaID := job.SchemaID tbInfo := &model.TableInfo{} var orReplace bool - var placeholder any // oldTblInfoID - if err := job.DecodeArgs(tbInfo, &orReplace, &placeholder); err != nil { + var _placeholder int64 // oldTblInfoID + if err := job.DecodeArgs(tbInfo, &orReplace, &_placeholder); err != nil { // Invalid arguments, cancel this job. job.State = model.JobStateCancelled return ver, errors.Trace(err)