Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: eliminate ingest step for add index with local engine #47982

Merged
merged 8 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"backfilling_dispatcher.go",
"backfilling_dist_scheduler.go",
"backfilling_import_cloud.go",
"backfilling_import_local.go",
"backfilling_merge_sort.go",
"backfilling_operators.go",
"backfilling_proto.go",
Expand Down Expand Up @@ -130,7 +129,6 @@ go_library(
"//pkg/util/collate",
"//pkg/util/dbterror",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/disttask",
"//pkg/util/domainutil",
"//pkg/util/filter",
"//pkg/util/gcutil",
Expand Down
71 changes: 11 additions & 60 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,18 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

// BackfillingDispatcherExt is an extension of litBackfillDispatcher, exported for test.
type BackfillingDispatcherExt struct {
d *ddl
previousSchedulerIDs []string
GlobalSort bool
d *ddl
GlobalSort bool
}

// NewBackfillingDispatcherExt creates a new backfillingDispatcherExt, only used for test now.
Expand Down Expand Up @@ -135,11 +133,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
prevStep,
logger)
}
// for partition table, no subtasks for write and ingest step.
if tblInfo.Partition != nil {
return nil, nil
}
return generateIngestTaskPlan(ctx, dsp, taskHandle, gTask)
return nil, nil
default:
return nil, nil
}
Expand All @@ -163,7 +157,7 @@ func (dsp *BackfillingDispatcherExt) GetNextStep(task *proto.Task) proto.Step {
if dsp.GlobalSort {
return StepMergeSort
}
return StepWriteAndIngest
return proto.StepDone
case StepMergeSort:
return StepWriteAndIngest
case StepWriteAndIngest:
Expand Down Expand Up @@ -196,21 +190,11 @@ func (*BackfillingDispatcherExt) OnErrStage(_ context.Context, _ dispatcher.Task
}

// GetEligibleInstances implements dispatcher.Extension interface.
func (dsp *BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) {
func (*BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _ *proto.Task) ([]*infosync.ServerInfo, error) {
serverInfos, err := dispatcher.GenerateSchedulerNodes(ctx)
if err != nil {
return nil, err
}
if len(dsp.previousSchedulerIDs) > 0 {
// Only the nodes that executed step one can have step two.
involvedServerInfos := make([]*infosync.ServerInfo, 0, len(serverInfos))
for _, id := range dsp.previousSchedulerIDs {
if idx := disttaskutil.FindServerInfo(serverInfos, id); idx >= 0 {
involvedServerInfos = append(involvedServerInfos, serverInfos[idx])
}
}
return involvedServerInfos, nil
}
return serverInfos, nil
}

Expand Down Expand Up @@ -309,8 +293,11 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job)
return nil, err
}

subTaskMetas := make([][]byte, 0, 100)
regionBatch := 20
// Make subtask large enough to reduce the overhead of local/global flush.
quota := variable.DDLDiskQuota.Load()
tangenta marked this conversation as resolved.
Show resolved Hide resolved
regionBatch := int(int64(quota) / int64(config.SplitRegionSize))

subTaskMetas := make([][]byte, 0, 4)
sort.Slice(recordRegionMetas, func(i, j int) bool {
return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0
})
Expand All @@ -336,42 +323,6 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job)
return subTaskMetas, nil
}

tangenta marked this conversation as resolved.
Show resolved Hide resolved
func generateIngestTaskPlan(
ctx context.Context,
h *BackfillingDispatcherExt,
taskHandle dispatcher.TaskHandle,
gTask *proto.Task,
) ([][]byte, error) {
// We dispatch dummy subtasks because the rest data in local engine will be imported
// in the initialization of subtask executor.
var ingestSubtaskCnt int
if intest.InTest && taskHandle == nil {
serverNodes, err := dispatcher.GenerateSchedulerNodes(ctx)
if err != nil {
return nil, err
}
ingestSubtaskCnt = len(serverNodes)
} else {
schedulerIDs, err := taskHandle.GetPreviousSchedulerIDs(ctx, gTask.ID, gTask.Step)
if err != nil {
return nil, err
}
h.previousSchedulerIDs = schedulerIDs
ingestSubtaskCnt = len(schedulerIDs)
}

subTaskMetas := make([][]byte, 0, ingestSubtaskCnt)
dummyMeta := &BackfillSubTaskMeta{}
metaBytes, err := json.Marshal(dummyMeta)
if err != nil {
return nil, err
}
for i := 0; i < ingestSubtaskCnt; i++ {
subTaskMetas = append(subTaskMetas, metaBytes)
}
return subTaskMetas, nil
}

func generateGlobalSortIngestPlan(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
Expand Down
13 changes: 1 addition & 12 deletions pkg/ddl/backfilling_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) {
// 1.2 test partition table OnNextSubtasksBatch after StepReadIndex
gTask.State = proto.TaskStateRunning
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, ddl.StepWriteAndIngest, gTask.Step)
// for partition table, we will not generate subtask for StepWriteAndIngest.
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
require.Len(t, metas, 0)
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, proto.StepDone, gTask.Step)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
Expand Down Expand Up @@ -123,11 +117,6 @@ func TestBackfillingDispatcherLocalMode(t *testing.T) {
// 2.2.2 StepReadIndex
gTask.State = proto.TaskStateRunning
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, ddl.StepWriteAndIngest, gTask.Step)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
require.Equal(t, 1, len(metas))
gTask.Step = dsp.GetNextStep(gTask)
require.Equal(t, proto.StepDone, gTask.Step)
metas, err = dsp.OnNextSubtasksBatch(context.Background(), nil, gTask, gTask.Step)
require.NoError(t, err)
Expand Down Expand Up @@ -271,7 +260,7 @@ func TestGetNextStep(t *testing.T) {
ext := &ddl.BackfillingDispatcherExt{}

// 1. local mode
for _, nextStep := range []proto.Step{ddl.StepReadIndex, ddl.StepWriteAndIngest} {
for _, nextStep := range []proto.Step{ddl.StepReadIndex, proto.StepDone} {
require.Equal(t, nextStep, ext.GetNextStep(task))
task.Step = nextStep
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
if len(bgm.CloudStorageURI) > 0 {
return newCloudImportExecutor(&bgm.Job, jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
}
return newImportFromLocalStepExecutor(jobMeta.ID, indexInfos, tbl.(table.PhysicalTable), bc), nil
return nil, errors.Errorf("local import does not have write & ingest step")
default:
return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID)
}
Expand Down
81 changes: 0 additions & 81 deletions pkg/ddl/backfilling_import_local.go

This file was deleted.

6 changes: 1 addition & 5 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,13 +734,9 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Inject("mockFlushError", func(_ failpoint.Value) {
failpoint.Return(errors.New("mock flush error"))
})
flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota
if s.tbl.GetPartitionedTable() != nil {
flushMode = ingest.FlushModeForceGlobal
}
for _, index := range s.indexes {
idxInfo := index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, flushMode)
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import "github.com/pingcap/tidb/pkg/disttask/framework/proto"
// the initial step is StepInit(-1)
// steps are processed in the following order:
// - local sort:
// StepInit -> StepReadIndex -> StepWriteAndIngest -> StepDone
// StepInit -> StepReadIndex -> StepDone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// StepInit -> StepReadIndex -> StepDone
// StepInit -> StepReadIndexAndIngest -> StepDone

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is not suitable for global sort:

// - global sort:
// StepInit -> StepReadIndexAndIngest -> StepMergeSort -> StepWriteAndIngest -> StepDone

// - global sort:
// StepInit -> StepReadIndex -> StepMergeSort -> StepWriteAndIngest -> StepDone
const (
Expand Down