diff --git a/pkg/ddl/tests/metadatalock/BUILD.bazel b/pkg/ddl/tests/metadatalock/BUILD.bazel index 0314bcf91fd6d..97b67bede8bbb 100644 --- a/pkg/ddl/tests/metadatalock/BUILD.bazel +++ b/pkg/ddl/tests/metadatalock/BUILD.bazel @@ -8,12 +8,14 @@ go_test( "mdl_test.go", ], flaky = True, - shard_count = 36, + shard_count = 37, deps = [ "//pkg/config", "//pkg/ddl", "//pkg/ddl/ingest/testutil", + "//pkg/ddl/util/callback", "//pkg/errno", + "//pkg/parser/model", "//pkg/server", "//pkg/testkit", "//pkg/testkit/testsetup", diff --git a/pkg/ddl/tests/metadatalock/mdl_test.go b/pkg/ddl/tests/metadatalock/mdl_test.go index c1e1afaeff161..92ef31ee34725 100644 --- a/pkg/ddl/tests/metadatalock/mdl_test.go +++ b/pkg/ddl/tests/metadatalock/mdl_test.go @@ -21,8 +21,11 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl" ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil" + "github.com/pingcap/tidb/pkg/ddl/util/callback" mysql "github.com/pingcap/tidb/pkg/errno" + "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/server" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -997,6 +1000,86 @@ func TestMDLPreparePlanCacheExecute2(t *testing.T) { tk.MustExec("admin check table t") } +// TestMDLPreparePlanCacheExecuteInsert makes sure the insert statement handle the schema correctly in plan cache. +func TestMDLPreparePlanCacheExecuteInsert(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + defer ingesttestutil.InjectMockBackendMgr(t, store)() + + sv := server.CreateMockServer(t, store) + + sv.SetDomain(dom) + dom.InfoSyncer().SetSessionManager(sv) + defer sv.Close() + + conn1 := server.CreateMockConn(t, sv) + tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session) + conn2 := server.CreateMockConn(t, sv) + tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session) + tk.MustExec("use test") + tk.MustExec("set global tidb_enable_metadata_lock=1") + tk.MustExec("create table t(a int primary key, b int);") + tk.MustExec("create table t2(a int);") + tk.MustExec("insert into t values(1, 1), (2, 2), (3, 3), (4, 4);") + + tk.MustExec(`prepare insert_stmt from 'insert into t values (?, ?)'`) + tk.MustExec(`set @a=4, @b=4;`) + + ch := make(chan struct{}) + + first := true + callback := &callback.TestDDLCallback{Do: dom} + onJobUpdatedExported := func(job *model.Job) { + switch job.SchemaState { + case model.StateWriteReorganization: + tbl, _ := dom.InfoSchema().TableByID(job.TableID) + idx := tbl.Meta().FindIndexByName("idx") + switch idx.BackfillState { + case model.BackfillStateRunning: + if first { + // generate plan, cache it, and make some row change to make + // sure backfill state 'merging' is not skipped. + tk.MustExec(`begin`) + tk.MustExec(`delete from t where a = 4;`) + tk.MustExec(`execute insert_stmt using @a, @b;`) + tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0")) + tk.MustExec(`commit`) + + tk.MustExec("begin") + // Activate txn. + tk.MustExec("select * from t2") + first = false + return + } + } + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExported) + dom.DDL().SetHook(callback) + + ddl.MockDMLExecutionMerging = func() { + tk.MustExec(`delete from t where a = 4;`) + // we must generate a new plan here, because the schema has changed since + // the last plan was generated. + tk.MustExec(`execute insert_stmt using @a, @b;`) + tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0")) + tk.MustExec("commit") + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-ch + tkDDL.MustExec("alter table test.t add index idx(a);") + wg.Done() + }() + + ch <- struct{}{} + wg.Wait() + + tk.MustExec("admin check table t") +} + func TestMDLDisable2Enable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) sv := server.CreateMockServer(t, store) diff --git a/pkg/planner/core/plan_cache.go b/pkg/planner/core/plan_cache.go index f4ea3ebbb3994..04c725134fafa 100644 --- a/pkg/planner/core/plan_cache.go +++ b/pkg/planner/core/plan_cache.go @@ -17,6 +17,7 @@ package core import ( "bytes" "context" + "math" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/bindinfo" @@ -40,8 +41,10 @@ import ( "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/kvcache" + "github.com/pingcap/tidb/pkg/util/logutil" utilpc "github.com/pingcap/tidb/pkg/util/plancache" "github.com/pingcap/tidb/pkg/util/ranger" + "go.uber.org/zap" ) // PlanCacheKeyTestIssue43667 is only for test. @@ -112,23 +115,23 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep if err != nil { return ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error()) } + // Table ID is changed, for example, drop & create table, truncate table. delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID) - stmt.tbls[i] = tblByName - stmt.RelateVersion[tblByName.Meta().ID] = tblByName.Meta().Revision + tbl = tblByName } - newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx, stmt.dbName[i], stmt.tbls[i], is) + // newTbl is the 'should be used' table info for this execution. + newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx, stmt.dbName[i], tbl, is) if err != nil { + logutil.BgLogger().Warn("meet error during tryLockMDLAndUpdateSchemaIfNecessary", zap.String("table name", tbl.Meta().Name.String()), zap.Error(err)) + // Invalid the cache key related fields to avoid using plan cache. + stmt.RelateVersion[tbl.Meta().ID] = math.MaxUint64 schemaNotMatch = true continue } - // The revision of tbl and newTbl may not be the same. - // Example: - // The version of stmt.tbls[i] is taken from the prepare statement and is revision v1. - // When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1. - // The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update. - if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) { + if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision { schemaNotMatch = true } + // Update the cache key related fields. stmt.tbls[i] = newTbl stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision }