Skip to content

Commit

Permalink
ddl: correct the remain ranges check for adding index (#41460) (#41494)
Browse files Browse the repository at this point in the history
close #41459
  • Loading branch information
ti-chi-bot authored Feb 16, 2023
1 parent 4958321 commit 8a40c4d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
27 changes: 19 additions & 8 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,12 @@ type backfillScheduler struct {
copReqSenderPool *copReqSenderPool // for add index in ingest way.
}

const backfillTaskChanSize = 1024
var backfillTaskChanSize = 1024

// SetBackfillTaskChanSizeForTest is only used for test.
func SetBackfillTaskChanSizeForTest(n int) {
backfillTaskChanSize = n
}

func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool,
tp backfillerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column,
Expand Down Expand Up @@ -1056,8 +1061,11 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
if err != nil {
return errors.Trace(err)
}
scheduler.setMaxWorkerSize(len(kvRanges))
if len(kvRanges) == 0 {
break
}

scheduler.setMaxWorkerSize(len(kvRanges))
err = scheduler.adjustWorkerSize()
if err != nil {
return errors.Trace(err)
Expand All @@ -1080,14 +1088,17 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
if err != nil {
return errors.Trace(err)
}

if len(remains) == 0 {
if ingestBeCtx != nil {
ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID)
}
if len(remains) > 0 {
startKey = remains[0].StartKey
} else {
startKey = kvRanges[len(kvRanges)-1].EndKey
}
if startKey.Cmp(endKey) >= 0 {
break
}
startKey = remains[0].StartKey
}
if ingestBeCtx != nil {
ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID)
}
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,29 @@ func TestAddIndexIngestCancel(t *testing.T) {
require.Empty(t, ingest.LitBackCtxMgr.Keys())
}

func TestAddIndexSplitTableRanges(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t (a int primary key, b int);")
for i := 0; i < 8; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}
tk.MustQuery("split table t between (0) and (80000) regions 7;").Check(testkit.Rows("6 1"))

ddl.SetBackfillTaskChanSizeForTest(4)
tk.MustExec("alter table t add index idx(b);")
tk.MustExec("admin check table t;")
ddl.SetBackfillTaskChanSizeForTest(7)
tk.MustExec("alter table t add index idx_2(b);")
tk.MustExec("admin check table t;")
ddl.SetBackfillTaskChanSizeForTest(1024)
}

type testCallback struct {
ddl.Callback
OnJobRunBeforeExported func(job *model.Job)
Expand Down

0 comments on commit 8a40c4d

Please sign in to comment.