Skip to content

Commit

Permalink
ddl, disttask: collect summary for each subtask (#50838)
Browse files Browse the repository at this point in the history
ref #48795
  • Loading branch information
tangenta authored Feb 1, 2024
1 parent 134d2bf commit cac449b
Show file tree
Hide file tree
Showing 19 changed files with 135 additions and 203 deletions.
8 changes: 4 additions & 4 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type BackfillSubTaskMeta struct {

// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage proto.Step, summary *execute.Summary) (execute.StepExecutor, error) {
bc ingest.BackendCtx, stage proto.Step) (execute.StepExecutor, error) {
bgm := &BackfillTaskMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
Expand Down Expand Up @@ -90,7 +90,7 @@ func NewBackfillSubtaskExecutor(taskMeta []byte, d *ddl,
d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query)
d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexExecutor(
d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil
d, &bgm.Job, indexInfos, tbl.(table.PhysicalTable), jc, bc, bgm.CloudStorageURI), nil
case proto.BackfillStepMergeSort:
return newMergeSortExecutor(jobMeta.ID, len(indexInfos), tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
case proto.BackfillStepWriteAndIngest:
Expand Down Expand Up @@ -167,10 +167,10 @@ func decodeIndexUniqueness(job *model.Job) (bool, error) {
return unique[0], nil
}

func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, summary *execute.Summary, _ *proto.StepResource) (execute.StepExecutor, error) {
func (s *backfillDistExecutor) GetStepExecutor(task *proto.Task, _ *proto.StepResource) (execute.StepExecutor, error) {
switch task.Step {
case proto.BackfillStepReadIndex, proto.BackfillStepMergeSort, proto.BackfillStepWriteAndIngest:
return NewBackfillSubtaskExecutor(task.Meta, s.d, s.backendCtx, task.Step, summary)
return NewBackfillSubtaskExecutor(task.Meta, s.d, s.backendCtx, task.Step)
default:
return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

type cloudImportExecutor struct {
taskexecutor.EmptyStepExecutor
job *model.Job
jobID int64
index *model.IndexInfo
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/backfilling_merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -34,6 +35,7 @@ import (
)

type mergeSortExecutor struct {
taskexecutor.EmptyStepExecutor
jobID int64
idxNum int
ptbl table.PhysicalTable
Expand Down
20 changes: 12 additions & 8 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type readIndexExecutor struct {

cloudStorageURI string

bc ingest.BackendCtx
summary *execute.Summary
bc ingest.BackendCtx
curRowCount *atomic.Int64

subtaskSummary sync.Map // subtaskID => readIndexSummary
}
Expand All @@ -67,7 +67,6 @@ func newReadIndexExecutor(
ptbl table.PhysicalTable,
jc *JobContext,
bc ingest.BackendCtx,
summary *execute.Summary,
cloudStorageURI string,
) *readIndexExecutor {
return &readIndexExecutor{
Expand All @@ -77,8 +76,8 @@ func newReadIndexExecutor(
ptbl: ptbl,
jc: jc,
bc: bc,
summary: summary,
cloudStorageURI: cloudStorageURI,
curRowCount: &atomic.Int64{},
}
}

Expand Down Expand Up @@ -117,13 +116,13 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta

opCtx := NewOperatorCtx(ctx)
defer opCtx.Cancel()
totalRowCount := &atomic.Int64{}
r.curRowCount.Store(0)

var pipe *operator.AsyncPipeline
if len(r.cloudStorageURI) > 0 {
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, totalRowCount)
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sessCtx, tbl, startKey, endKey, r.curRowCount)
} else {
pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, totalRowCount)
pipe, err = r.buildLocalStorePipeline(opCtx, sessCtx, tbl, startKey, endKey, r.curRowCount)
}
if err != nil {
return err
Expand All @@ -142,10 +141,15 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
}

r.bc.ResetWorkers(r.job.ID)
r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load())
return nil
}

func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
return &execute.SubtaskSummary{
RowCount: r.curRowCount.Load(),
}
}

func (*readIndexExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("read index executor cleanup subtask exec env",
zap.String("category", "ddl"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
taskexecutor.RegisterTaskType(proto.Backfill,
func(ctx context.Context, id string, task *proto.Task, taskTable taskexecutor.TaskTable) taskexecutor.TaskExecutor {
return newBackfillDistExecutor(ctx, id, task, taskTable, d)
}, taskexecutor.WithSummary,
},
)

scheduler.RegisterSchedulerFactory(proto.Backfill,
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/mock/execute/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/taskexecutor/execute",
"@org_uber_go_mock//gomock",
],
)
15 changes: 15 additions & 0 deletions pkg/disttask/framework/mock/execute/execute_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/disttask/framework/mock/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions pkg/disttask/framework/taskexecutor/execute/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "execute",
srcs = [
"interface.go",
"summary.go",
],
srcs = ["interface.go"],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute",
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/util/logutil",
"@org_uber_go_zap//:zap",
],
deps = ["//pkg/disttask/framework/proto"],
)
9 changes: 9 additions & 0 deletions pkg/disttask/framework/taskexecutor/execute/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ type StepExecutor interface {
Init(context.Context) error
// RunSubtask is used to run the subtask.
RunSubtask(ctx context.Context, subtask *proto.Subtask) error

// RealtimeSummary returns the realtime summary of the running subtask by this executor.
RealtimeSummary() *SubtaskSummary

// OnFinished is used to handle the subtask when it is finished.
// The subtask meta can be updated in place.
OnFinished(ctx context.Context, subtask *proto.Subtask) error
// Cleanup is used to clean up the environment for the subtask executor.
Cleanup(context.Context) error
}

// SubtaskSummary contains the summary of a subtask.
type SubtaskSummary struct {
RowCount int64
}
93 changes: 0 additions & 93 deletions pkg/disttask/framework/taskexecutor/execute/summary.go

This file was deleted.

7 changes: 6 additions & 1 deletion pkg/disttask/framework/taskexecutor/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type Extension interface {
// Note:
// 1. summary is the summary manager of all subtask of the same type now.
// 2. should not retry the error from it.
GetStepExecutor(task *proto.Task, summary *execute.Summary, resource *proto.StepResource) (execute.StepExecutor, error)
GetStepExecutor(task *proto.Task, resource *proto.StepResource) (execute.StepExecutor, error)
// IsRetryableError returns whether the error is transient.
// When error is transient, the framework won't mark subtasks as failed,
// then the TaskExecutor can load the subtask again and redo it.
Expand All @@ -137,6 +137,11 @@ func (*EmptyStepExecutor) RunSubtask(context.Context, *proto.Subtask) error {
return nil
}

// RealtimeSummary implements the StepExecutor interface.
func (*EmptyStepExecutor) RealtimeSummary() *execute.SubtaskSummary {
return nil
}

// Cleanup implements the StepExecutor interface.
func (*EmptyStepExecutor) Cleanup(context.Context) error {
return nil
Expand Down
9 changes: 0 additions & 9 deletions pkg/disttask/framework/taskexecutor/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ import (
"context"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
)

type taskTypeOptions struct {
// Summary is the summary of all tasks of the task type.
// TODO: better have a summary per task/subtask.
Summary *execute.Summary
}

// TaskTypeOption is the option of TaskType.
Expand Down Expand Up @@ -58,8 +54,3 @@ func ClearTaskExecutors() {
taskTypes = make(map[proto.TaskType]taskTypeOptions)
taskExecutorFactories = make(map[proto.TaskType]taskExecutorFactoryFn)
}

// WithSummary is the option of TaskExecutor to set the summary.
var WithSummary TaskTypeOption = func(opts *taskTypeOptions) {
opts.Summary = execute.NewSummary()
}
Loading

0 comments on commit cac449b

Please sign in to comment.