Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: modify admin executors to support partitioned table with global index #20642

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ type CleanupIndexExec struct {
table table.Table
physicalID int64

// There are 3 len of column used.
// len(columns) include index cols, handle cols and pid col.
// handleIdxColLen is len(index.Columns) + handle cols len
// len(e.index.Columns) include only index val col.
handleIdxColLen int
columns []*model.ColumnInfo
idxColFieldTypes []*types.FieldType
idxChunk *chunk.Chunk
Expand All @@ -544,12 +549,30 @@ func (e *CleanupIndexExec) getIdxColTypes() []*types.FieldType {
for _, col := range e.columns {
e.idxColFieldTypes = append(e.idxColFieldTypes, &col.FieldType)
}
e.handleIdxColLen = len(e.columns)
if e.index.Meta().Global {
e.handleIdxColLen--
}
return e.idxColFieldTypes
}

func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte, error) {
e.idxValues.Range(func(h kv.Handle, _ interface{}) bool {
e.batchKeys = append(e.batchKeys, e.table.RecordKey(h))
var getKey func(kv.Handle, interface{}) kv.Key
if e.index.Meta().Global {
getKey = func(h kv.Handle, val interface{}) kv.Key {
idxVals := val.([][]types.Datum)
// partition ID store in last column of first row.
pid := idxVals[0][len(e.index.Meta().Columns)].GetInt64()
key := tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(pid), h)
return key
}
} else {
getKey = func(h kv.Handle, _ interface{}) kv.Key {
return e.table.RecordKey(h)
}
}
e.idxValues.Range(func(h kv.Handle, val interface{}) bool {
e.batchKeys = append(e.batchKeys, getKey(h, val))
return true
})
values, err := txn.BatchGet(context.Background(), e.batchKeys)
Expand All @@ -570,7 +593,10 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri
if !ok {
return errors.Trace(errors.Errorf("batch keys are inconsistent with handles"))
}
for _, handleIdxVals := range handleIdxValsGroup.([][]types.Datum) {
for i, handleIdxVals := range handleIdxValsGroup.([][]types.Datum) {
if e.index.Meta().Global && i == 0 {
handleIdxVals = handleIdxVals[:len(e.index.Meta().Columns)]
}
if err := e.index.Delete(e.ctx.GetSessionVars().StmtCtx, txn, handleIdxVals, handle); err != nil {
return err
}
Expand All @@ -587,8 +613,8 @@ func (e *CleanupIndexExec) deleteDanglingIdx(txn kv.Transaction, values map[stri

func extractIdxVals(row chunk.Row, idxVals []types.Datum,
fieldTypes []*types.FieldType, idxValLen int) []types.Datum {
if cap(idxVals) < idxValLen {
idxVals = make([]types.Datum, idxValLen)
if cap(idxVals) < idxValLen+1 {
idxVals = make([]types.Datum, idxValLen, idxValLen+1)
} else {
idxVals = idxVals[:idxValLen]
}
Expand Down Expand Up @@ -617,9 +643,11 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e
if e.idxChunk.NumRows() == 0 {
return nil
}
global := e.index.Meta().Global
iter := chunk.NewIterator4Chunk(e.idxChunk)
logutil.BgLogger().Info("dddddd", zap.Any("chk", e.idxChunk.ToString(e.idxColFieldTypes)))
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
handle, err := e.handleCols.BuildHandle(row)
handle, err := e.handleCols.BuildHandleFromPartitionedTableIndexRow(row, global)
if err != nil {
return err
}
Expand All @@ -630,7 +658,13 @@ func (e *CleanupIndexExec) fetchIndex(ctx context.Context, txn kv.Transaction) e
updatedIdxVals := append(existingIdxVals.([][]types.Datum), idxVals)
e.idxValues.Set(handle, updatedIdxVals)
} else {
e.idxValues.Set(handle, [][]types.Datum{idxVals})
newIdxVals := idxVals[:]
// If index is global, we store pid at end of first row of idx vals.
if global {
pid := handle.(kv.PartitionHandle).PartitionID
newIdxVals = append(newIdxVals, types.NewIntDatum(pid))
}
e.idxValues.Set(handle, [][]types.Datum{newIdxVals})
}
idxKey, _, err := e.index.GenIndexKey(sc, idxVals, handle, nil)
if err != nil {
Expand Down Expand Up @@ -659,7 +693,7 @@ func (e *CleanupIndexExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

var err error
if tbl, ok := e.table.(table.PartitionedTable); ok {
if tbl, ok := e.table.(table.PartitionedTable); ok && !e.index.Meta().Global {
pi := e.table.Meta().GetPartitionInfo()
for _, p := range pi.Definitions {
e.table = tbl.GetPartition(p.ID)
Expand Down
148 changes: 148 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,3 +1092,151 @@ func (s *testSuite5) TestAdminCheckWithSnapshot(c *C) {
tk.MustExec("admin check index admin_t_s a;")
tk.MustExec("drop table if exists admin_t_s")
}

func (s *globalIndexSuite) TestAdminCheckWithGlobalIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
check := func() {
tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (5, 5), (10, 10), (11, 11), (NULL, NULL)")
tk.MustExec("admin check table admin_test")
tk.MustExec("admin check index admin_test c1")
tk.MustExec("admin check index admin_test c2")
}

// Test for hash partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1) partition by hash(c2) partitions 5;")
// c1 is global, c2 is local
tk.MustExec("create unique index c1 on admin_test(c1)")
tk.MustExec("create unique index c2 on admin_test(c2)")
check()

// Test for range partition table.
tk.MustExec("drop table if exists admin_test")
tk.MustExec(`create table admin_test (c1 int, c2 int, c3 int default 1) PARTITION BY RANGE ( c2 ) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (MAXVALUE))`)
// c1 is global, c2 is local
tk.MustExec("create unique index c1 on admin_test(c1)")
tk.MustExec("create unique index c2 on admin_test(c2)")
check()
}

func (s *globalIndexSuite) TestAdminRecoverIndexWithGlobalIndex(c *C) {
// Test no corruption case.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1) partition by hash(c2) partitions 5;")
// c1 is global, c2 is local
tk.MustExec("create unique index c1 on admin_test(c1)")
tk.MustExec("create unique index c2 on admin_test(c2)")
tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (NULL, NULL)")

r := tk.MustQuery("admin recover index admin_test c1")
r.Check(testkit.Rows("0 3"))

r = tk.MustQuery("admin recover index admin_test c2")
r.Check(testkit.Rows("0 3"))

tk.MustExec("admin check index admin_test c1")
tk.MustExec("admin check index admin_test c2")

tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1) partition by hash(c2) partitions 5;")
tk.MustExec("create unique index c1 on admin_test(c1)")
tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (3, 3), (10, 10), (20, 20)")

r = tk.MustQuery("admin recover index admin_test c1")
r.Check(testkit.Rows("0 5"))
tk.MustExec("admin check index admin_test c1")

// Make some corrupted index.
s.ctx = mock.NewContext()
s.ctx.Store = s.store
is := s.domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("admin_test")
tbl, err := is.TableByName(dbName, tblName)
c.Assert(err, IsNil)

tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName("c1")
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
sc := s.ctx.GetSessionVars().StmtCtx
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = indexOpr.Delete(sc, txn, types.MakeDatums(1), kv.IntHandle(1))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
err = tk.ExecToErr("admin check table admin_test")
c.Assert(err, NotNil)
c.Assert(executor.ErrAdminCheckTable.Equal(err), IsTrue)
err = tk.ExecToErr("admin check index admin_test c1")
c.Assert(err, NotNil)

r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c1)")
r.Check(testkit.Rows("4"))

r = tk.MustQuery("admin recover index admin_test c1")
r.Check(testkit.Rows("1 5"))

r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(c1)")
r.Check(testkit.Rows("5"))
tk.MustExec("admin check index admin_test c1")
tk.MustExec("admin check table admin_test")
}

func (s *globalIndexSuite) TestAdminCheckIndexRangeWithGlobalIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`drop table if exists check_index_test`)
tk.MustExec("create table check_index_test (a int, b varchar(10), c int) partition by hash(c) partitions 5;")
tk.MustExec("create unique index a_b on check_index_test(a, b)")
tk.MustExec(`insert check_index_test values (3, "ab", 1), (2, "cd", 2), (1, "ef", 3), (-1, "hi", 4)`)
result := tk.MustQuery("admin check index check_index_test a_b (2, 4);")
result.Check(testkit.Rows("1 ef 3", "2 cd 2"))

result = tk.MustQuery("admin check index check_index_test a_b (3, 5);")
result.Check(testkit.Rows("-1 hi 4", "1 ef 3"))

tk.MustExec("use mysql")
result = tk.MustQuery("admin check index test.check_index_test a_b (2, 3), (4, 5);")
result.Check(testkit.Rows("-1 hi 4", "2 cd 2"))
}

func (s *globalIndexSuite) TestAdminChecksumWithGlobalIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
tk.MustExec("DROP TABLE IF EXISTS admin_checksum_partition_test;")
tk.MustExec("CREATE TABLE admin_checksum_partition_test (a INT, b INT, c INT) PARTITION BY HASH(a) PARTITIONS 4;")

// Mocktikv returns 1 for every table/index scan, then we will xor the checksums of a table.
// So if x requests are built, result will be (x%2, x, x)
// Here, 4 partitions + 1 table are scaned.
r := tk.MustQuery("ADMIN CHECKSUM TABLE admin_checksum_partition_test;")
r.Check(testkit.Rows("test admin_checksum_partition_test 1 5 5"))

tk.MustExec("create unique index b on admin_checksum_partition_test(b)")
// (4 partitions + 1 table) * (1 index scan + 1 table scan)
r = tk.MustQuery("ADMIN CHECKSUM TABLE admin_checksum_partition_test;")
r.Check(testkit.Rows("test admin_checksum_partition_test 0 10 10"))
}

func (s *globalIndexSuite) TestAdminCleanupIndexWithGlobalIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int) partition by hash(c1) partitions 5;")
// c1 is global, c2 is local
tk.MustExec("create unique index c2 on admin_test(c2)")
tk.MustExec("create unique index c3 on admin_test(c3)")
tk.MustExec("insert admin_test (c1, c2, c3) values (1, 1, 1), (2, 2, 2), (NULL, NULL, 3)")

r := tk.MustQuery("admin cleanup index admin_test c2")
r.Check(testkit.Rows("0"))
r = tk.MustQuery("admin cleanup index admin_test c3")
r.Check(testkit.Rows("0"))
}
23 changes: 20 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func buildIndexLookUpChecker(b *executorBuilder, p *plannercore.PhysicalIndexLoo
if !e.isCommonHandle() {
fullColLen += 1
}
if e.index.Global {
fullColLen += 1
}
e.dagPB.OutputOffsets = make([]uint32, fullColLen)
for i := 0; i < fullColLen; i++ {
e.dagPB.OutputOffsets[i] = uint32(i)
Expand Down Expand Up @@ -404,7 +407,7 @@ func buildIdxColsConcatHandleCols(tblInfo *model.TableInfo, indexInfo *model.Ind
pkCols = pkIdx.Columns
handleLen = len(pkIdx.Columns)
}
columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+handleLen)
columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+handleLen+1)
for _, idxCol := range indexInfo.Columns {
columns = append(columns, tblInfo.Columns[idxCol.Offset])
}
Expand All @@ -425,6 +428,21 @@ func buildIdxColsConcatHandleCols(tblInfo *model.TableInfo, indexInfo *model.Ind
return columns
}

func buildIdxColsConcatHandleColsWithPid(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) []*model.ColumnInfo {
columns := buildIdxColsConcatHandleCols(tblInfo, indexInfo)
if indexInfo.Global {
pidOffset := len(columns)
pidColsInfo := &model.ColumnInfo{
ID: model.ExtraPidColID,
Name: model.ExtraPartitionIdName,
Offset: pidOffset,
}
pidColsInfo.FieldType = *types.NewFieldType(mysql.TypeLonglong)
columns = append(columns, pidColsInfo)
}
return columns
}

func (b *executorBuilder) buildRecoverIndex(v *plannercore.RecoverIndex) Executor {
tblInfo := v.Table.TableInfo
t, err := b.is.TableByName(v.Table.Schema, tblInfo.Name)
Expand Down Expand Up @@ -493,14 +511,13 @@ func (b *executorBuilder) buildCleanupIndex(v *plannercore.CleanupIndex) Executo
break
}
}

if index == nil {
b.err = errors.Errorf("index `%v` is not found in table `%v`.", v.IndexName, v.Table.Name.O)
return nil
}
e := &CleanupIndexExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
columns: buildIdxColsConcatHandleCols(tblInfo, index.Meta()),
columns: buildIdxColsConcatHandleColsWithPid(tblInfo, index.Meta()),
index: index,
table: t,
physicalID: t.Meta().ID,
Expand Down
8 changes: 4 additions & 4 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,12 @@ func (e *IndexLookUpExecutor) getRetTpsByHandle() []*types.FieldType {
} else {
tps = []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
}
if e.index.Global {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}
if e.checkIndexValue != nil {
tps = e.idxColTps
}
if e.index.Global {
tps = append(tps, types.NewFieldType(mysql.TypeLonglong))
}
return tps
}

Expand Down Expand Up @@ -798,7 +798,7 @@ func (w *indexWorker) extractTaskHandles(ctx context.Context, chk *chunk.Chunk,
}
if w.checkIndexValue != nil {
if retChk == nil {
retChk = chunk.NewChunkWithCapacity(w.idxColTps, w.batchSize)
retChk = chunk.NewChunkWithCapacity(w.idxLookup.getRetTpsByHandle(), w.batchSize)
}
retChk.Append(chk, 0, chk.NumRows())
}
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var _ = SerialSuites(&testSerialSuite1{&baseTestSuite{}})
var _ = SerialSuites(&testSlowQuery{&baseTestSuite{}})
var _ = Suite(&partitionTableSuite{&baseTestSuite{}})
var _ = SerialSuites(&tiflashTestSuite{})
var _ = SerialSuites(&globalIndexSuite{&baseTestSuite{}})
var _ = Suite(&globalIndexSuite{&baseTestSuite{}})
var _ = SerialSuites(&testSerialSuite{&baseTestSuite{}})
var _ = SerialSuites(&testCoprCache{})

Expand Down
6 changes: 6 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,10 @@ partition p2 values less than (10))`)
tk.MustExec("alter table p add unique idx(id)")
tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)")
tk.MustQuery("select * from p use index (idx)").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9"))

tk.MustExec("drop table if exists p")
tk.MustExec("create table p (id int, c int) partition by hash(c) partitions 5")
tk.MustExec("alter table p add unique idx(id)")
tk.MustExec("insert into p values (1,3), (3,4), (5,6), (7,9)")
tk.MustQuery("select * from p use index (idx) order by id").Check(testkit.Rows("1 3", "3 4", "5 6", "7 9"))
}
22 changes: 0 additions & 22 deletions kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,25 +445,3 @@ func NewPartitionHandle(pid int64, h Handle) PartitionHandle {
PartitionID: pid,
}
}

// Equal implements the Handle interface.
func (ph PartitionHandle) Equal(h Handle) bool {
if ph2, ok := h.(PartitionHandle); ok {
return ph.PartitionID == ph2.PartitionID && ph.Handle.Equal(ph2.Handle)
}
return false
}

// Compare implements the Handle interface.
func (ph PartitionHandle) Compare(h Handle) int {
if ph2, ok := h.(PartitionHandle); ok {
if ph.PartitionID < ph2.PartitionID {
return -1
}
if ph.PartitionID > ph2.PartitionID {
return 1
}
return ph.Handle.Compare(ph2.Handle)
}
panic("PartitonHandle compares to non-parition Handle")
}
Loading