From 91a9d3065ae985b9ec9fb8c5167e650749dfaf23 Mon Sep 17 00:00:00 2001 From: Arenatlx Date: Fri, 4 Dec 2020 15:29:49 +0800 Subject: [PATCH] ddl: make reorg session aware of the new row format (#21412) --- ddl/backfilling.go | 3 +++ ddl/column_type_change_test.go | 28 ++++++++++++++++++++++++++++ ddl/util/util.go | 4 +++- sessionctx/variable/session.go | 2 ++ sessionctx/variable/tidb_vars.go | 1 + sessionctx/variable/varsutil.go | 10 ++++++++++ 6 files changed, 47 insertions(+), 1 deletion(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index ab7922b4aaeba..22e9dd56924e3 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -569,6 +569,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err)) } workerCnt = variable.GetDDLReorgWorkerCounter() + rowFormat := variable.GetDDLReorgRowFormat() // If only have 1 range, we can only start 1 worker. if len(kvRanges) < int(workerCnt) { workerCnt = int32(len(kvRanges)) @@ -577,6 +578,8 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba for i := len(backfillWorkers); i < int(workerCnt); i++ { sessCtx := newContext(reorgInfo.d.store) sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true + // Set the row encode format version. + sessCtx.GetSessionVars().RowEncoder.Enable = rowFormat != variable.DefTiDBRowFormatV1 // Simulate the sql mode environment in the worker sessionCtx. sqlMode := reorgInfo.ReorgMeta.SQLMode sessCtx.GetSessionVars().SQLMode = sqlMode diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 41b268a971d3f..146197fe383c7 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -27,9 +27,12 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/testkit" ) @@ -1541,3 +1544,28 @@ func (s *testColumnTypeChangeSuite) TestColumnTypeChangeFromJsonToOthers(c *C) { tk.MustExec("alter table t modify str year") tk.MustQuery("select * from t").Check(testkit.Rows("0 0 0 2001 0 2020 1991 2009 2020")) } + +// TestRowFormat is used to close issue #21391, the encoded row in column type change should be aware of the new row format. +func (s *testColumnTypeChangeSuite) TestRowFormat(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + // Enable column change variable. + tk.Se.GetSessionVars().EnableChangeColumnType = true + defer func() { + tk.Se.GetSessionVars().EnableChangeColumnType = false + }() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key, v varchar(10))") + tk.MustExec("insert into t values (1, \"123\");") + tk.MustExec("alter table t modify column v varchar(5);") + + tbl := testGetTableByName(c, tk.Se, "test", "t") + encodedKey := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + + h := helper.NewHelper(s.store.(tikv.Storage)) + data, err := h.GetMvccByEncodedKey(encodedKey) + c.Assert(err, IsNil) + // The new format will start with CodecVer = 128 (0x80). + c.Assert(data.Info.Writes[0].ShortValue, DeepEquals, []byte{0x80, 0x0, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33}) + tk.MustExec("drop table if exists t") +} diff --git a/ddl/util/util.go b/ddl/util/util.go index 69a6d3ced56f7..295376945f7c6 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -154,7 +154,9 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old // LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables. func LoadDDLReorgVars(ctx sessionctx.Context) error { - return LoadGlobalVars(ctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize}) + // close issue #21391 + // variable.TiDBRowFormatVersion is used to encode the new row for column type change. + return LoadGlobalVars(ctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize, variable.TiDBRowFormatVersion}) } // LoadDDLVars loads ddl variable from mysql.global_variables. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1fa3b5758a3af..29204aae4252a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1681,6 +1681,8 @@ func SetLocalSystemVar(name string, val string) { SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) case TiDBDDLErrorCountLimit: SetDDLErrorCountLimit(tidbOptInt64(val, DefTiDBDDLErrorCountLimit)) + case TiDBRowFormatVersion: + SetDDLReorgRowFormat(tidbOptInt64(val, DefTiDBRowFormatV2)) } } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 82c2f68e5741c..7115c4e74f7e8 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -611,6 +611,7 @@ var ( maxDDLReorgWorkerCount int32 = 128 ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit + ddlReorgRowFormat int64 = DefTiDBRowFormatV2 maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount // Export for testing. MaxDDLReorgBatchSize int32 = 10240 diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index ee72c7f285419..721985eaa617a 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -72,6 +72,16 @@ func GetDDLErrorCountLimit() int64 { return atomic.LoadInt64(&ddlErrorCountlimit) } +// SetDDLReorgRowFormat sets ddlReorgRowFormat version. +func SetDDLReorgRowFormat(format int64) { + atomic.StoreInt64(&ddlReorgRowFormat, format) +} + +// GetDDLReorgRowFormat gets ddlReorgRowFormat version. +func GetDDLReorgRowFormat() int64 { + return atomic.LoadInt64(&ddlReorgRowFormat) +} + // SetMaxDeltaSchemaCount sets maxDeltaSchemaCount size. func SetMaxDeltaSchemaCount(cnt int64) { atomic.StoreInt64(&maxDeltaSchemaCount, cnt)