Skip to content

Commit

Permalink
ddl: process temp global index correctly (#56608)
Browse files Browse the repository at this point in the history
close #56535
  • Loading branch information
Defined2014 authored Oct 14, 2024
1 parent e017e1b commit d1c476a
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 7 deletions.
41 changes: 36 additions & 5 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2206,11 +2206,19 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {

var err error
if tbl, ok := t.(table.PartitionedTable); ok {
var finish bool
var finish, ok bool
for !finish {
p := tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
var p table.PhysicalTable
if tbl.Meta().ID == reorgInfo.PhysicalTableID {
p, ok = t.(table.PhysicalTable) // global index
if !ok {
return fmt.Errorf("unexpected error, can't cast %T to table.PhysicalTable", t)
}
} else {
p = tbl.GetPartition(reorgInfo.PhysicalTableID)
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
}
err = w.addPhysicalTableIndex(p, reorgInfo)
if err != nil {
Expand Down Expand Up @@ -2537,7 +2545,30 @@ func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysic
if len(pi.AddingDefinitions) == 0 {
// case 1
// Simply AddIndex, without any partitions added or dropped!
pid, err = findNextPartitionID(currPhysicalTableID, pi.Definitions)
if reorg.mergingTmpIdx && currPhysicalTableID == t.Meta().ID {
// If the current Physical id is the table id,
// 1. All indexes are global index, the next Physical id should be the first partition id.
// 2. Not all indexes are global index, return 0.
allGlobal := true
for _, element := range reorg.elements {
if !bytes.Equal(element.TypeKey, meta.IndexElementKey) {
allGlobal = false
break
}
idxInfo := model.FindIndexInfoByID(t.Meta().Indices, element.ID)
if !idxInfo.Global {
allGlobal = false
break
}
}
if allGlobal {
pid = 0
} else {
pid = pi.Definitions[0].ID
}
} else {
pid, err = findNextPartitionID(currPhysicalTableID, pi.Definitions)
}
} else {
// case 3 (or if not found AddingDefinitions; 4)
// check if recreating Global Index (during Reorg Partition)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 20,
shard_count = 21,
deps = [
"//pkg/config",
"//pkg/ddl/ingest/testutil",
Expand Down
46 changes: 46 additions & 0 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,49 @@ func TestAddIndexIngestPartitionCheckpoint(t *testing.T) {
require.Equal(t, 20, int(rowCnt.Load()))
tk.MustExec("admin check table t;")
}

func TestAddGlobalIndexInIngest(t *testing.T) {
store := testkit.CreateMockStore(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int) partition by hash(a) partitions 5")
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3)")
var i atomic.Int32
i.Store(3)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/writeLocalExec", func(bool) {
tk2 := testkit.NewTestKit(t, store)
tmp := i.Add(1)
_, err := tk2.Exec(fmt.Sprintf("insert into test.t values (%d, %d)", tmp, tmp))
assert.Nil(t, err)
})
tk.MustExec("alter table t add index idx_1(b), add unique index idx_2(b) global")
rsGlobalIndex := tk.MustQuery("select * from t use index(idx_2)").Sort()
rsTable := tk.MustQuery("select * from t use index()").Sort()
rsNormalIndex := tk.MustQuery("select * from t use index(idx_1)").Sort()
num := len(rsGlobalIndex.Rows())
require.Greater(t, num, 3)
require.Equal(t, rsGlobalIndex.String(), rsTable.String())
require.Equal(t, rsGlobalIndex.String(), rsNormalIndex.String())

// for indexes have different columns
tk.MustExec("alter table t add index idx_3(a), add unique index idx_4(b) global")
rsGlobalIndex = tk.MustQuery("select * from t use index(idx_4)").Sort()
rsTable = tk.MustQuery("select * from t use index()").Sort()
rsNormalIndex = tk.MustQuery("select * from t use index(idx_3)").Sort()
require.Greater(t, len(rsGlobalIndex.Rows()), num)
require.Equal(t, rsGlobalIndex.String(), rsTable.String())
require.Equal(t, rsGlobalIndex.String(), rsNormalIndex.String())

// for all global indexes
tk.MustExec("alter table t add unique index idx_5(b) global, add unique index idx_6(b) global")
rsGlobalIndex1 := tk.MustQuery("select * from t use index(idx_6)").Sort()
rsTable = tk.MustQuery("select * from t use index()").Sort()
rsGlobalIndex2 := tk.MustQuery("select * from t use index(idx_5)").Sort()
require.Greater(t, len(rsGlobalIndex1.Rows()), len(rsGlobalIndex.Rows()))
require.Equal(t, rsGlobalIndex1.String(), rsTable.String())
require.Equal(t, rsGlobalIndex1.String(), rsGlobalIndex2.String())
}
12 changes: 12 additions & 0 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ddl

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand Down Expand Up @@ -882,6 +883,17 @@ func getReorgInfo(ctx *ReorgContext, jobCtx *jobContext, rh *reorgHandler, job *
tb = tbl.(table.PhysicalTable)
}
if mergingTmpIdx {
for _, element := range elements {
if !bytes.Equal(element.TypeKey, meta.IndexElementKey) {
continue
}
// If has a global index in elements, need start process at `tblInfo.ID`
// because there are some temporary global indexes prefixed with table ID.
idxInfo := model.FindIndexInfoByID(tblInfo.Indices, element.ID)
if idxInfo.Global {
pid = tblInfo.ID
}
}
firstElemTempID := tablecodec.TempIndexPrefix | elements[0].ID
lastElemTempID := tablecodec.TempIndexPrefix | elements[len(elements)-1].ID
start = tablecodec.EncodeIndexSeekKey(pid, firstElemTempID, nil)
Expand Down
9 changes: 8 additions & 1 deletion pkg/table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,14 @@ func (t *partitionedTable) locateHashPartition(ctx expression.EvalContext, partE

// GetPartition returns a Table, which is actually a partition.
func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable {
return t.getPartition(pid)
part := t.getPartition(pid)

// Explicitly check if the partition is nil, and return a nil interface if it is
if part == nil {
return nil // Return a truly nil interface instead of an interface holding a nil pointer
}

return part
}

// getPartition returns a Table, which is actually a partition.
Expand Down

0 comments on commit d1c476a

Please sign in to comment.