Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: eliminate ingest step for add index with local engine #47982

Merged
merged 8 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"backfilling_dispatcher.go",
"backfilling_dist_scheduler.go",
"backfilling_import_cloud.go",
"backfilling_import_local.go",
"backfilling_merge_sort.go",
"backfilling_operators.go",
"backfilling_proto.go",
Expand Down Expand Up @@ -130,7 +129,6 @@ go_library(
"//pkg/util/collate",
"//pkg/util/dbterror",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/disttask",
"//pkg/util/domainutil",
"//pkg/util/filter",
"//pkg/util/gcutil",
Expand Down
61 changes: 4 additions & 57 deletions pkg/ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,15 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

// BackfillingDispatcherExt is an extension of litBackfillDispatcher, exported for test.
type BackfillingDispatcherExt struct {
d *ddl
previousSchedulerIDs []string
GlobalSort bool
d *ddl
GlobalSort bool
}

// NewBackfillingDispatcherExt creates a new backfillingDispatcherExt, only used for test now.
Expand Down Expand Up @@ -135,11 +132,7 @@ func (dsp *BackfillingDispatcherExt) OnNextSubtasksBatch(
prevStep,
logger)
}
// for partition table, no subtasks for write and ingest step.
if tblInfo.Partition != nil {
return nil, nil
}
return generateIngestTaskPlan(ctx, dsp, taskHandle, gTask)
return nil, nil
default:
return nil, nil
}
Expand All @@ -163,7 +156,7 @@ func (dsp *BackfillingDispatcherExt) GetNextStep(task *proto.Task) proto.Step {
if dsp.GlobalSort {
return StepMergeSort
}
return StepWriteAndIngest
return proto.StepDone
case StepMergeSort:
return StepWriteAndIngest
case StepWriteAndIngest:
Expand Down Expand Up @@ -201,16 +194,6 @@ func (dsp *BackfillingDispatcherExt) GetEligibleInstances(ctx context.Context, _
if err != nil {
return nil, err
}
if len(dsp.previousSchedulerIDs) > 0 {
// Only the nodes that executed step one can have step two.
involvedServerInfos := make([]*infosync.ServerInfo, 0, len(serverInfos))
for _, id := range dsp.previousSchedulerIDs {
if idx := disttaskutil.FindServerInfo(serverInfos, id); idx >= 0 {
involvedServerInfos = append(involvedServerInfos, serverInfos[idx])
}
}
return involvedServerInfos, nil
}
return serverInfos, nil
}

Expand Down Expand Up @@ -336,42 +319,6 @@ func generateNonPartitionPlan(d *ddl, tblInfo *model.TableInfo, job *model.Job)
return subTaskMetas, nil
}

tangenta marked this conversation as resolved.
Show resolved Hide resolved
func generateIngestTaskPlan(
ctx context.Context,
h *BackfillingDispatcherExt,
taskHandle dispatcher.TaskHandle,
gTask *proto.Task,
) ([][]byte, error) {
// We dispatch dummy subtasks because the rest data in local engine will be imported
// in the initialization of subtask executor.
var ingestSubtaskCnt int
if intest.InTest && taskHandle == nil {
serverNodes, err := dispatcher.GenerateSchedulerNodes(ctx)
if err != nil {
return nil, err
}
ingestSubtaskCnt = len(serverNodes)
} else {
schedulerIDs, err := taskHandle.GetPreviousSchedulerIDs(ctx, gTask.ID, gTask.Step)
if err != nil {
return nil, err
}
h.previousSchedulerIDs = schedulerIDs
ingestSubtaskCnt = len(schedulerIDs)
}

subTaskMetas := make([][]byte, 0, ingestSubtaskCnt)
dummyMeta := &BackfillSubTaskMeta{}
metaBytes, err := json.Marshal(dummyMeta)
if err != nil {
return nil, err
}
for i := 0; i < ingestSubtaskCnt; i++ {
subTaskMetas = append(subTaskMetas, metaBytes)
}
return subTaskMetas, nil
}

func generateGlobalSortIngestPlan(
ctx context.Context,
taskHandle dispatcher.TaskHandle,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
if len(bgm.CloudStorageURI) > 0 {
return newCloudImportExecutor(&bgm.Job, jobMeta.ID, indexInfos[0], tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
}
return newImportFromLocalStepExecutor(jobMeta.ID, indexInfos, tbl.(table.PhysicalTable), bc), nil
return nil, errors.Errorf("local import does not have write & ingest step")
default:
return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID)
}
Expand Down
81 changes: 0 additions & 81 deletions pkg/ddl/backfilling_import_local.go

This file was deleted.

6 changes: 1 addition & 5 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,13 +734,9 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Inject("mockFlushError", func(_ failpoint.Value) {
failpoint.Return(errors.New("mock flush error"))
})
flushMode := ingest.FlushModeForceLocalAndCheckDiskQuota
if s.tbl.GetPartitionedTable() != nil {
flushMode = ingest.FlushModeForceGlobal
}
for _, index := range s.indexes {
idxInfo := index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, flushMode)
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceGlobal)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
Expand Down