diff --git a/ddl/backfilling.go b/ddl/backfilling.go index b547937660131..1c8fe03833f10 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -825,7 +825,12 @@ type backfillScheduler struct { copReqSenderPool *copReqSenderPool // for add index in ingest way. } -const backfillTaskChanSize = 1024 +var backfillTaskChanSize = 1024 + +// SetBackfillTaskChanSizeForTest is only used for test. +func SetBackfillTaskChanSizeForTest(n int) { + backfillTaskChanSize = n +} func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool, tp backfillerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column, @@ -1071,8 +1076,11 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if err != nil { return errors.Trace(err) } - scheduler.setMaxWorkerSize(len(kvRanges)) + if len(kvRanges) == 0 { + break + } + scheduler.setMaxWorkerSize(len(kvRanges)) err = scheduler.adjustWorkerSize() if err != nil { return errors.Trace(err) @@ -1095,14 +1103,17 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic if err != nil { return errors.Trace(err) } - - if len(remains) == 0 { - if ingestBeCtx != nil { - ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID) - } + if len(remains) > 0 { + startKey = remains[0].StartKey + } else { + startKey = kvRanges[len(kvRanges)-1].EndKey + } + if startKey.Cmp(endKey) >= 0 { break } - startKey = remains[0].StartKey + } + if ingestBeCtx != nil { + ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID) } return nil } diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index e5c147d3337a3..69f644ba36c3d 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -444,6 +444,29 @@ func TestAddIndexIngestCancel(t *testing.T) { require.Empty(t, ingest.LitBackCtxMgr.Keys()) } +func TestAddIndexSplitTableRanges(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t (a int primary key, b int);") + for i := 0; i < 8; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000)) + } + tk.MustQuery("split table t between (0) and (80000) regions 7;").Check(testkit.Rows("6 1")) + + ddl.SetBackfillTaskChanSizeForTest(4) + tk.MustExec("alter table t add index idx(b);") + tk.MustExec("admin check table t;") + ddl.SetBackfillTaskChanSizeForTest(7) + tk.MustExec("alter table t add index idx_2(b);") + tk.MustExec("admin check table t;") + ddl.SetBackfillTaskChanSizeForTest(1024) +} + type testCallback struct { ddl.Callback OnJobRunBeforeExported func(job *model.Job)