From d48cbd3e07f6fe02f87ab3cee85258445c3adc9e Mon Sep 17 00:00:00 2001 From: ldeng Date: Thu, 22 Oct 2020 14:11:24 +0800 Subject: [PATCH 1/2] executor: support admin operations in partitioned table with global index --- executor/admin.go | 49 ++++++++-- executor/admin_test.go | 148 +++++++++++++++++++++++++++++++ executor/builder.go | 23 ++++- executor/distsql.go | 8 +- executor/executor_test.go | 2 +- executor/partition_table_test.go | 6 ++ kv/key.go | 22 ----- planner/core/handle_cols.go | 64 +++++++++++++ planner/core/planbuilder.go | 15 ++-- session/session.go | 4 + 10 files changed, 293 insertions(+), 48 deletions(-) diff --git a/executor/admin.go b/executor/admin.go index 6eaa8e363fee3..bb73f84bbfd79 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -523,6 +523,11 @@ type CleanupIndexExec struct { table table.Table physicalID int64 + // There are 3 len of column used. + // len(columns) include index cols, handle cols and pid col. + // handleIdxColLen is len(index.Columns) + handle cols len + // len(e.index.Columns) include only index val col. + handleIdxColLen int columns []*model.ColumnInfo idxColFieldTypes []*types.FieldType idxChunk *chunk.Chunk @@ -544,12 +549,30 @@ func (e *CleanupIndexExec) getIdxColTypes() []*types.FieldType { for _, col := range e.columns { e.idxColFieldTypes = append(e.idxColFieldTypes, &col.FieldType) } + e.handleIdxColLen = len(e.columns) + if e.index.Meta().Global { + e.handleIdxColLen-- + } return e.idxColFieldTypes } func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte, error) { - e.idxValues.Range(func(h kv.Handle, _ interface{}) bool { - e.batchKeys = append(e.batchKeys, e.table.RecordKey(h)) + var getKey func(kv.Handle, interface{}) kv.Key + if e.index.Meta().Global { + getKey = func(h kv.Handle, val interface{}) kv.Key { + idxVals := val.([][]types.Datum) + // partition ID store in last column of first row. + pid := idxVals[0][len(e.index.Meta().Columns)].GetInt64() + key := tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(pid), h) + return key + } + } else { + getKey = func(h kv.Handle, _ interface{}) kv.Key { + return e.table.RecordKey(h) + } + } + e.idxValues.Range(func(h kv.Handle, val interface{}) bool { + e.batchKeys = append(e.batchKeys, getKey(h, val)) return true }) values, err := txn.BatchGet(context.Background(), e.batchKeys) @@ -566,11 +589,12 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri if err != nil { return err } - handleIdxValsGroup, ok := e.idxValues.Get(handle) + fullIdxValsGroup, ok := e.idxValues.Get(handle) if !ok { return errors.Trace(errors.Errorf("batch keys are inconsistent with handles")) } - for _, handleIdxVals := range handleIdxValsGroup.([][]types.Datum) { + for _, fullIdxVals := range fullIdxValsGroup.([][]types.Datum) { + handleIdxVals := fullIdxVals[:e.handleIdxColLen] // Exclude pid column. if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, handleIdxVals, handle); err != nil { return err } @@ -587,8 +611,8 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri func extractIdxVals(row chunk.Row, idxVals []types.Datum, fieldTypes []*types.FieldType, idxValLen int) []types.Datum { - if cap(idxVals) < idxValLen { - idxVals = make([]types.Datum, idxValLen) + if cap(idxVals) < idxValLen+1 { + idxVals = make([]types.Datum, idxValLen, idxValLen+1) } else { idxVals = idxVals[:idxValLen] } @@ -617,9 +641,10 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e if e.idxChunk.NumRows() == 0 { return nil } + global := e.index.Meta().Global iter := chunk.NewIterator4Chunk(e.idxChunk) for row := iter.Begin(); row != iter.End(); row = iter.Next() { - handle, err := e.handleCols.BuildHandle(row) + handle, err := e.handleCols.BuildHandleFromPartitionedTableIndexRow(row, global) if err != nil { return err } @@ -630,7 +655,13 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e updatedIdxVals := append(existingIdxVals.([][]types.Datum), idxVals) e.idxValues.Set(handle, updatedIdxVals) } else { - e.idxValues.Set(handle, [][]types.Datum{idxVals}) + newIdxVals := idxVals[:] + // If index is global, we store pid at end of first row of idx vals. + if global { + pid := handle.(kv.PartitionHandle).PartitionID + newIdxVals = append(newIdxVals, types.NewIntDatum(pid)) + } + e.idxValues.Set(handle, [][]types.Datum{newIdxVals}) } idxKey, _, err := e.index.GenIndexKey(sc, idxVals, handle, nil) if err != nil { @@ -659,7 +690,7 @@ func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.Chunk) error { } var err error - if tbl, ok := e.table.(table.PartitionedTable); ok { + if tbl, ok := e.table.(table.PartitionedTable); ok && !e.index.Meta().Global { pi := e.table.Meta().GetPartitionInfo() for _, p := range pi.Definitions { e.table = tbl.GetPartition(p.ID) diff --git a/executor/admin_test.go b/executor/admin_test.go index 01a4556f46000..6db45e7e7c54a 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -1092,3 +1092,151 @@ func (s *testSuite5) TestAdminCheckWithSnapshot(c *C) { tk.MustExec("admin check index admin_t_s a;") tk.MustExec("drop table if exists admin_t_s") } + +func (s *globalIndexSuite) TestAdminCheckWithGlobalIndex(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + check := func() { + tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (5, 5), (10, 10), (11, 11), (NULL, NULL)") + tk.MustExec("admin check table admin_test") + tk.MustExec("admin check index admin_test c1") + tk.MustExec("admin check index admin_test c2") + } + + // Test for hash partition table. + tk.MustExec("drop table if exists admin_test") + tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1) partition by hash(c2) partitions 5;") + // c1 is global, c2 is local + tk.MustExec("create unique index c1 on admin_test(c1)") + tk.MustExec("create unique index c2 on admin_test(c2)") + check() + + // Test for range partition table. + tk.MustExec("drop table if exists admin_test") + tk.MustExec(`create table admin_test (c1 int, c2 int, c3 int default 1) PARTITION BY RANGE ( c2 ) ( + PARTITION p0 VALUES LESS THAN (5), + PARTITION p1 VALUES LESS THAN (10), + PARTITION p2 VALUES LESS THAN (MAXVALUE))`) + // c1 is global, c2 is local + tk.MustExec("create unique index c1 on admin_test(c1)") + tk.MustExec("create unique index c2 on admin_test(c2)") + check() +} + +func (s *globalIndexSuite) TestAdminRecoverIndexWithGlobalIndex(c *C) { + // Test no corruption case. + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists admin_test") + tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1) partition by hash(c2) partitions 5;") + // c1 is global, c2 is local + tk.MustExec("create unique index c1 on admin_test(c1)") + tk.MustExec("create unique index c2 on admin_test(c2)") + tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (NULL, NULL)") + + r := tk.MustQuery("admin recover index admin_test c1") + r.Check(testkit.Rows("0 3")) + + r = tk.MustQuery("admin recover index admin_test c2") + r.Check(testkit.Rows("0 3")) + + tk.MustExec("admin check index admin_test c1") + tk.MustExec("admin check index admin_test c2") + + tk.MustExec("drop table if exists admin_test") + tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1) partition by hash(c2) partitions 5;") + tk.MustExec("create unique index c1 on admin_test(c1)") + tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (3, 3), (10, 10), (20, 20)") + + r = tk.MustQuery("admin recover index admin_test c1") + r.Check(testkit.Rows("0 5")) + tk.MustExec("admin check index admin_test c1") + + // Make some corrupted index. + s.ctx = mock.NewContext() + s.ctx.Store = s.store + is := s.domain.InfoSchema() + dbName := model.NewCIStr("test") + tblName := model.NewCIStr("admin_test") + tbl, err := is.TableByName(dbName, tblName) + c.Assert(err, IsNil) + + tblInfo := tbl.Meta() + idxInfo := tblInfo.FindIndexByName("c1") + indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo) + sc := s.ctx.GetSessionVars().StmtCtx + txn, err := s.store.Begin() + c.Assert(err, IsNil) + err = indexOpr.Delete(sc, txn, types.MakeDatums(1), kv.IntHandle(1)) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) + err = tk.ExecToErr("admin check table admin_test") + c.Assert(err, NotNil) + c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue) + err = tk.ExecToErr("admin check index admin_test c1") + c.Assert(err, NotNil) + + r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c1)") + r.Check(testkit.Rows("4")) + + r = tk.MustQuery("admin recover index admin_test c1") + r.Check(testkit.Rows("1 5")) + + r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c1)") + r.Check(testkit.Rows("5")) + tk.MustExec("admin check index admin_test c1") + tk.MustExec("admin check table admin_test") +} + +func (s *globalIndexSuite) TestAdminCheckIndexRangeWithGlobalIndex(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec(`drop table if exists check_index_test`) + tk.MustExec("create table check_index_test (a int, b varchar(10), c int) partition by hash(c) partitions 5;") + tk.MustExec("create unique index a_b on check_index_test(a, b)") + tk.MustExec(`insert check_index_test values (3, "ab", 1), (2, "cd", 2), (1, "ef", 3), (-1, "hi", 4)`) + result := tk.MustQuery("admin check index check_index_test a_b (2, 4);") + result.Check(testkit.Rows("1 ef 3", "2 cd 2")) + + result = tk.MustQuery("admin check index check_index_test a_b (3, 5);") + result.Check(testkit.Rows("-1 hi 4", "1 ef 3")) + + tk.MustExec("use mysql") + result = tk.MustQuery("admin check index test.check_index_test a_b (2, 3), (4, 5);") + result.Check(testkit.Rows("-1 hi 4", "2 cd 2")) +} + +func (s *globalIndexSuite) TestAdminChecksumWithGlobalIndex(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test;") + tk.MustExec("DROP TABLE IF EXISTS admin_checksum_partition_test;") + tk.MustExec("CREATE TABLE admin_checksum_partition_test (a INT, b INT, c INT) PARTITION BY HASH(a) PARTITIONS 4;") + + // Mocktikv returns 1 for every table/index scan, then we will xor the checksums of a table. + // So if x requests are built, result will be (x%2, x, x) + // Here, 4 partitions + 1 table are scaned. + r := tk.MustQuery("ADMIN CHECKSUM TABLE admin_checksum_partition_test;") + r.Check(testkit.Rows("test admin_checksum_partition_test 1 5 5")) + + tk.MustExec("create unique index b on admin_checksum_partition_test(b)") + // (4 partitions + 1 table) * (1 index scan + 1 table scan) + r = tk.MustQuery("ADMIN CHECKSUM TABLE admin_checksum_partition_test;") + r.Check(testkit.Rows("test admin_checksum_partition_test 0 10 10")) +} + +func (s *globalIndexSuite) TestAdminCleanupIndexWithGlobalIndex(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists admin_test") + tk.MustExec("create table admin_test (c1 int, c2 int, c3 int) partition by hash(c1) partitions 5;") + // c1 is global, c2 is local + tk.MustExec("create unique index c2 on admin_test(c2)") + tk.MustExec("create unique index c3 on admin_test(c3)") + tk.MustExec("insert admin_test (c1, c2, c3) values (1, 1, 1), (2, 2, 2), (NULL, NULL, 3)") + + r := tk.MustQuery("admin cleanup index admin_test c2") + r.Check(testkit.Rows("0")) + r = tk.MustQuery("admin cleanup index admin_test c3") + r.Check(testkit.Rows("0")) +} diff --git a/executor/builder.go b/executor/builder.go index 10e0ef23ccaa8..e3a44f54070cc 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -337,6 +337,9 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo if !e.isCommonHandle() { fullColLen += 1 } + if e.index.Global { + fullColLen += 1 + } e.dagPB.OutputOffsets = make([]uint32, fullColLen) for i := 0; i < fullColLen; i++ { e.dagPB.OutputOffsets[i] = uint32(i) @@ -404,7 +407,7 @@ func buildIdxColsConcatHandleCols(tblInfo *model.TableInfo, indexInfo *model.Ind pkCols = pkIdx.Columns handleLen = len(pkIdx.Columns) } - columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+handleLen) + columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+handleLen+1) for _, idxCol := range indexInfo.Columns { columns = append(columns, tblInfo.Columns[idxCol.Offset]) } @@ -425,6 +428,21 @@ func buildIdxColsConcatHandleCols(tblInfo *model.TableInfo, indexInfo *model.Ind return columns } +func buildIdxColsConcatHandleColsWithPid(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) []*model.ColumnInfo { + columns := buildIdxColsConcatHandleCols(tblInfo, indexInfo) + if indexInfo.Global { + pidOffset := len(columns) + pidColsInfo := &model.ColumnInfo{ + ID: model.ExtraPidColID, + Name: model.ExtraPartitionIdName, + Offset: pidOffset, + } + pidColsInfo.FieldType = *types.NewFieldType(mysql.TypeLonglong) + columns = append(columns, pidColsInfo) + } + return columns +} + func (b *executorBuilder) buildRecoverIndex(v *plannercore.RecoverIndex) Executor { tblInfo := v.Table.TableInfo t, err := b.is.TableByName(v.Table.Schema, tblInfo.Name) @@ -493,14 +511,13 @@ func (b *executorBuilder) buildCleanupIndex(v *plannercore.CleanupIndex) Executo break } } - if index == nil { b.err = errors.Errorf("index `%v` is not found in table `%v`.", v.IndexName, v.Table.Name.O) return nil } e := &CleanupIndexExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), - columns: buildIdxColsConcatHandleCols(tblInfo, index.Meta()), + columns: buildIdxColsConcatHandleColsWithPid(tblInfo, index.Meta()), index: index, table: t, physicalID: t.Meta().ID, diff --git a/executor/distsql.go b/executor/distsql.go index 850cc53e3e8ce..95935b450c651 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -488,12 +488,12 @@ func (e *IndexLookUpExecutor) getRetTpsByHandle() []*types.FieldType { } else { tps = []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} } - if e.index.Global { - tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) - } if e.checkIndexValue != nil { tps = e.idxColTps } + if e.index.Global { + tps = append(tps, types.NewFieldType(mysql.TypeLonglong)) + } return tps } @@ -798,7 +798,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk, } if w.checkIndexValue != nil { if retChk == nil { - retChk = chunk.NewChunkWithCapacity(w.idxColTps, w.batchSize) + retChk = chunk.NewChunkWithCapacity(w.idxLookup.getRetTpsByHandle(), w.batchSize) } retChk.Append(chk, 0, chk.NumRows()) } diff --git a/executor/executor_test.go b/executor/executor_test.go index afd2fad4d1df6..78f3483bb99cd 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -130,7 +130,7 @@ var _ = SerialSuites(&testSerialSuite1{&baseTestSuite{}}) var _ = SerialSuites(&testSlowQuery{&baseTestSuite{}}) var _ = Suite(&partitionTableSuite{&baseTestSuite{}}) var _ = SerialSuites(&tiflashTestSuite{}) -var _ = SerialSuites(&globalIndexSuite{&baseTestSuite{}}) +var _ = Suite(&globalIndexSuite{&baseTestSuite{}}) var _ = SerialSuites(&testSerialSuite{&baseTestSuite{}}) var _ = SerialSuites(&testCoprCache{}) diff --git a/executor/partition_table_test.go b/executor/partition_table_test.go index e358ee49a55e3..c1926815f3c7b 100644 --- a/executor/partition_table_test.go +++ b/executor/partition_table_test.go @@ -181,4 +181,10 @@ partition p2 values less than (10))`) tk.MustExec("alter table p add unique idx(id)") tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)") tk.MustQuery("select * from p use index (idx)").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9")) + + tk.MustExec("drop table if exists p") + tk.MustExec("create table p (id int, c int) partition by hash(c) partitions 5") + tk.MustExec("alter table p add unique idx(id)") + tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)") + tk.MustQuery("select * from p use index (idx) order by id").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9")) } diff --git a/kv/key.go b/kv/key.go index 53eacf05a8a3d..5888c13cb83d4 100644 --- a/kv/key.go +++ b/kv/key.go @@ -445,25 +445,3 @@ func NewPartitionHandle(pid int64, h Handle) PartitionHandle { PartitionID: pid, } } - -// Equal implements the Handle interface. -func (ph PartitionHandle) Equal(h Handle) bool { - if ph2, ok := h.(PartitionHandle); ok { - return ph.PartitionID == ph2.PartitionID && ph.Handle.Equal(ph2.Handle) - } - return false -} - -// Compare implements the Handle interface. -func (ph PartitionHandle) Compare(h Handle) int { - if ph2, ok := h.(PartitionHandle); ok { - if ph.PartitionID < ph2.PartitionID { - return -1 - } - if ph.PartitionID > ph2.PartitionID { - return 1 - } - return ph.Handle.Compare(ph2.Handle) - } - panic("PartitonHandle compares to non-parition Handle") -} diff --git a/planner/core/handle_cols.go b/planner/core/handle_cols.go index b66aa964d5e51..9f79ae01cd39a 100644 --- a/planner/core/handle_cols.go +++ b/planner/core/handle_cols.go @@ -35,6 +35,10 @@ type HandleCols interface { BuildHandleByDatums(row []types.Datum) (kv.Handle, error) // BuildHandleFromIndexRow builds a Handle from index row data. BuildHandleFromIndexRow(row chunk.Row) (kv.Handle, error) + // BuildHandleFromPartitionedTableIndexRow builds a Handle from index row data of partitioned table. + // If global is true, partition ID will be decoded from the last column, and return PartitionHandle, + // otherwise return value is the same as BuildHandleFromIndexRow. + BuildHandleFromPartitionedTableIndexRow(row chunk.Row, global bool) (kv.Handle, error) // ResolveIndices resolves handle column indices. ResolveIndices(schema *expression.Schema) (HandleCols, error) // IsInt returns if the HandleCols is a single tnt column. @@ -49,6 +53,9 @@ type HandleCols interface { Compare(a, b []types.Datum) (int, error) // GetFieldTypes return field types of columns GetFieldsTypes() []*types.FieldType + // GetFieldsTypesOfPartitionedTableIndex return field types of columns, + // if global is true, partition ID column will append to the end. + GetFieldsTypesOfPartitionedTableIndex(global bool) []*types.FieldType } // CommonHandleCols implements the kv.HandleCols interface. @@ -86,6 +93,27 @@ func (cb *CommonHandleCols) BuildHandleFromIndexRow(row chunk.Row) (kv.Handle, e return cb.buildHandleByDatumsBuffer(datumBuf) } +// BuildHandleFromPartitionedTableIndexRow implements the kv.HandleCols interface. +func (cb *CommonHandleCols) BuildHandleFromPartitionedTableIndexRow(row chunk.Row, global bool) (kv.Handle, error) { + handleColsOffset := row.Len() - cb.NumCols() + if global { + handleColsOffset-- + } + datumBuf := make([]types.Datum, 0, 4) + for i := 0; i < cb.NumCols(); i++ { + datumBuf = append(datumBuf, row.GetDatum(handleColsOffset+i, cb.columns[i].RetType)) + } + h, err := cb.buildHandleByDatumsBuffer(datumBuf) + if err != nil { + return h, err + } + if global { + pid := row.GetInt64(row.Len() - 1) + h = kv.NewPartitionHandle(pid, h) + } + return h, nil +} + // BuildHandleByDatums implements the kv.HandleCols interface. func (cb *CommonHandleCols) BuildHandleByDatums(row []types.Datum) (kv.Handle, error) { datumBuf := make([]types.Datum, 0, 4) @@ -167,6 +195,18 @@ func (cb *CommonHandleCols) GetFieldsTypes() []*types.FieldType { return fieldTps } +// GetFieldsTypesOfPartitionedTableIndex implements the kv.HandleCols interface. +func (cb *CommonHandleCols) GetFieldsTypesOfPartitionedTableIndex(global bool) []*types.FieldType { + fieldTps := make([]*types.FieldType, 0, len(cb.columns)+1) + for _, col := range cb.columns { + fieldTps = append(fieldTps, col.RetType) + } + if global { + fieldTps = append(fieldTps, types.NewFieldType(mysql.TypeLonglong)) + } + return fieldTps +} + // NewCommonHandleCols creates a new CommonHandleCols. func NewCommonHandleCols(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, tableColumns []*expression.Column) *CommonHandleCols { @@ -197,6 +237,21 @@ func (ib *IntHandleCols) BuildHandleFromIndexRow(row chunk.Row) (kv.Handle, erro return kv.IntHandle(row.GetInt64(row.Len() - 1)), nil } +// BuildHandleFromPartitionedTableIndexRow implements the kv.HandleCols interface. +func (ib *IntHandleCols) BuildHandleFromPartitionedTableIndexRow(row chunk.Row, global bool) (kv.Handle, error) { + handleColsOffset := row.Len() - 1 + if global { + handleColsOffset-- + } + var h kv.Handle + h = kv.IntHandle(row.GetInt64(handleColsOffset)) + if global { + pid := row.GetInt64(row.Len() - 1) + h = kv.NewPartitionHandle(pid, h) + } + return h, nil +} + // BuildHandleByDatums implements the kv.HandleCols interface. func (ib *IntHandleCols) BuildHandleByDatums(row []types.Datum) (kv.Handle, error) { return kv.IntHandle(row[ib.col.Index].GetInt64()), nil @@ -252,6 +307,15 @@ func (ib *IntHandleCols) GetFieldsTypes() []*types.FieldType { return []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} } +// GetFieldsTypesOfPartitionedTableIndex implements the kv.HandleCols interface. +func (ib *IntHandleCols) GetFieldsTypesOfPartitionedTableIndex(global bool) []*types.FieldType { + fieldTps := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)} + if global { + fieldTps = append(fieldTps, types.NewFieldType(mysql.TypeLonglong)) + } + return fieldTps +} + // NewIntHandleCols creates a new IntHandleCols. func NewIntHandleCols(col *expression.Column) HandleCols { return &IntHandleCols{col: col} diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 498c25e3cd59a..4a3952550ac5d 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -41,7 +41,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/types/parser_driver" + driver "github.com/pingcap/tidb/types/parser_driver" util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -1272,12 +1272,9 @@ func getIndexColsSchema(tblInfo *model.TableInfo, idx *model.IndexInfo, allColSc } func getPhysicalID(t table.Table) (physicalID int64, isPartition bool) { - tblInfo := t.Meta() - if tblInfo.GetPartitionInfo() != nil { - pid := t.(table.PhysicalTable).GetPhysicalID() - return pid, true - } - return tblInfo.ID, false + // In global index, pid is equal to tid in PartionedTable. + pid := t.(table.PhysicalTable).GetPhysicalID() + return pid, pid != t.Meta().ID } func tryGetPkExtraColumn(sv *variable.SessionVars, tblInfo *model.TableInfo) (*model.ColumnInfo, *expression.Column, bool) { @@ -1330,7 +1327,7 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam } indexInfos = append(indexInfos, idxInfo) // For partition tables. - if pi := tbl.Meta().GetPartitionInfo(); pi != nil { + if pi := tbl.Meta().GetPartitionInfo(); pi != nil && !idxInfo.Global { for _, def := range pi.Definitions { t := tbl.(table.PartitionedTable).GetPartition(def.ID) reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, t, idxInfo) @@ -1341,7 +1338,7 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam } continue } - // For non-partition tables. + // For non-partition tables or global index. reader, err := b.buildPhysicalIndexLookUpReader(ctx, dbName, tbl, idxInfo) if err != nil { return nil, nil, err diff --git a/session/session.go b/session/session.go index 74424b170e8bd..78354d66f116f 100644 --- a/session/session.go +++ b/session/session.go @@ -858,6 +858,10 @@ func (s *session) ExecRestrictedSQLWithSnapshot(sql string) ([]chunk.Row, []*ast se.sessionVars.OptimizerUseInvisibleIndexes = true defer func() { se.sessionVars.OptimizerUseInvisibleIndexes = false }() } + if s.sessionVars.UseDynamicPartitionPrune() { + se.sessionVars.PartitionPruneMode.Store(string(variable.DynamicOnly)) + defer func() { se.sessionVars.PartitionPruneMode.Store(string(variable.StaticOnly)) }() + } return execRestrictedSQL(ctx, se, sql) } From b7bdbbbc33f6f381e26cc6d1fd4b6ab917cf050b Mon Sep 17 00:00:00 2001 From: ldeng Date: Thu, 22 Oct 2020 15:09:20 +0800 Subject: [PATCH 2/2] fix bug --- executor/admin.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/executor/admin.go b/executor/admin.go index bb73f84bbfd79..6da14602f4a55 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -589,12 +589,14 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri if err != nil { return err } - fullIdxValsGroup, ok := e.idxValues.Get(handle) + handleIdxValsGroup, ok := e.idxValues.Get(handle) if !ok { return errors.Trace(errors.Errorf("batch keys are inconsistent with handles")) } - for _, fullIdxVals := range fullIdxValsGroup.([][]types.Datum) { - handleIdxVals := fullIdxVals[:e.handleIdxColLen] // Exclude pid column. + for i, handleIdxVals := range handleIdxValsGroup.([][]types.Datum) { + if e.index.Meta().Global && i == 0 { + handleIdxVals = handleIdxVals[:len(e.index.Meta().Columns)] + } if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, handleIdxVals, handle); err != nil { return err } @@ -643,6 +645,7 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e } global := e.index.Meta().Global iter := chunk.NewIterator4Chunk(e.idxChunk) + logutil.BgLogger().Info("dddddd", zap.Any("chk", e.idxChunk.ToString(e.idxColFieldTypes))) for row := iter.Begin(); row != iter.End(); row = iter.Next() { handle, err := e.handleCols.BuildHandleFromPartitionedTableIndexRow(row, global) if err != nil {