Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask, ddl: move backend ctx register code to scheduler #46952

Merged
merged 10 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ go_library(
srcs = [
"backfilling.go",
"backfilling_dispatcher.go",
"backfilling_import_cloud.go",
"backfilling_import_local.go",
"backfilling_operator.go",
"backfilling_read_index.go",
"backfilling_scheduler.go",
"callback.go",
"cluster.go",
Expand Down Expand Up @@ -47,9 +50,6 @@ go_library(
"schema.go",
"sequence.go",
"split_region.go",
"stage_ingest_index.go",
"stage_merge_sort.go",
"stage_read_index.go",
"stage_scheduler.go",
"stat.go",
"table.go",
Expand Down
19 changes: 9 additions & 10 deletions ddl/stage_merge_sort.go → ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ import (
"go.uber.org/zap"
)

type mergeSortStage struct {
type cloudImportExecutor struct {
jobID int64
index *model.IndexInfo
ptbl table.PhysicalTable
bc ingest.BackendCtx
cloudStoreURI string
}

func newMergeSortStage(
func newCloudImportExecutor(
jobID int64,
index *model.IndexInfo,
ptbl table.PhysicalTable,
bc ingest.BackendCtx,
cloudStoreURI string,
) (*mergeSortStage, error) {
return &mergeSortStage{
) (*cloudImportExecutor, error) {
return &cloudImportExecutor{
jobID: jobID,
index: index,
ptbl: ptbl,
Expand All @@ -53,12 +53,12 @@ func newMergeSortStage(
}, nil
}

func (*mergeSortStage) Init(ctx context.Context) error {
func (*cloudImportExecutor) Init(ctx context.Context) error {
logutil.Logger(ctx).Info("merge sort stage init subtask exec env")
return nil
}

func (m *mergeSortStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
logutil.Logger(ctx).Info("merge sort stage split subtask")

sm := &BackfillSubTaskMeta{}
Expand Down Expand Up @@ -94,18 +94,17 @@ func (m *mergeSortStage) RunSubtask(ctx context.Context, subtask *proto.Subtask)
return err
}

func (m *mergeSortStage) Cleanup(ctx context.Context) error {
func (*cloudImportExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("merge sort stage clean up subtask env")
ingest.LitBackCtxMgr.Unregister(m.jobID)
return nil
}

func (*mergeSortStage) OnFinished(ctx context.Context, _ *proto.Subtask) error {
func (*cloudImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error {
logutil.Logger(ctx).Info("merge sort stage finish subtask")
return nil
}

func (*mergeSortStage) Rollback(ctx context.Context) error {
func (*cloudImportExecutor) Rollback(ctx context.Context) error {
logutil.Logger(ctx).Info("merge sort stage rollback subtask")
return nil
}
20 changes: 9 additions & 11 deletions ddl/stage_ingest_index.go → ddl/backfilling_import_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,28 @@ import (
"go.uber.org/zap"
)

type ingestIndexStage struct {
type localImportExecutor struct {
jobID int64
index *model.IndexInfo
ptbl table.PhysicalTable
bc ingest.BackendCtx
}

func newIngestIndexStage(
func newImportFromLocalStepExecutor(
jobID int64,
index *model.IndexInfo,
ptbl table.PhysicalTable,
bc ingest.BackendCtx,
) *ingestIndexStage {
return &ingestIndexStage{
) *localImportExecutor {
return &localImportExecutor{
jobID: jobID,
index: index,
ptbl: ptbl,
bc: bc,
}
}

func (i *ingestIndexStage) Init(ctx context.Context) error {
func (i *localImportExecutor) Init(ctx context.Context) error {
logutil.Logger(ctx).Info("ingest index stage init subtask exec env")
_, _, err := i.bc.Flush(i.index.ID, ingest.FlushModeForceGlobal)
if err != nil {
Expand All @@ -61,24 +61,22 @@ func (i *ingestIndexStage) Init(ctx context.Context) error {
return err
}

func (*ingestIndexStage) RunSubtask(ctx context.Context, _ *proto.Subtask) error {
func (*localImportExecutor) RunSubtask(ctx context.Context, _ *proto.Subtask) error {
logutil.Logger(ctx).Info("ingest index stage split subtask")
return nil
}

func (i *ingestIndexStage) Cleanup(ctx context.Context) error {
func (*localImportExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("ingest index stage cleanup subtask exec env")
ingest.LitBackCtxMgr.Unregister(i.jobID)
return nil
}

func (*ingestIndexStage) OnFinished(ctx context.Context, _ *proto.Subtask) error {
func (*localImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error {
logutil.Logger(ctx).Info("ingest index stage finish subtask")
return nil
}

func (i *ingestIndexStage) Rollback(ctx context.Context) error {
func (*localImportExecutor) Rollback(ctx context.Context) error {
logutil.Logger(ctx).Info("ingest index stage rollback backfill add index task")
ingest.LitBackCtxMgr.Unregister(i.jobID)
return nil
}
28 changes: 12 additions & 16 deletions ddl/stage_read_index.go → ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"go.uber.org/zap"
)

type readIndexStage struct {
type readIndexExecutor struct {
d *ddl
job *model.Job
index *model.IndexInfo
Expand All @@ -61,7 +61,7 @@ type readIndexSummary struct {
mu sync.Mutex
}

func newReadIndexStage(
func newReadIndexExecutor(
d *ddl,
job *model.Job,
index *model.IndexInfo,
Expand All @@ -70,8 +70,8 @@ func newReadIndexStage(
bc ingest.BackendCtx,
summary *execute.Summary,
cloudStorageURI string,
) *readIndexStage {
return &readIndexStage{
) *readIndexExecutor {
return &readIndexExecutor{
d: d,
job: job,
index: index,
Expand All @@ -83,13 +83,13 @@ func newReadIndexStage(
}
}

func (*readIndexStage) Init(_ context.Context) error {
func (*readIndexExecutor) Init(_ context.Context) error {
logutil.BgLogger().Info("read index stage init subtask exec env",
zap.String("category", "ddl"))
return nil
}

func (r *readIndexStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
logutil.BgLogger().Info("read index stage run subtask",
zap.String("category", "ddl"))

Expand Down Expand Up @@ -146,19 +146,16 @@ func (r *readIndexStage) RunSubtask(ctx context.Context, subtask *proto.Subtask)
return nil
}

func (r *readIndexStage) Cleanup(ctx context.Context) error {
func (*readIndexExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("read index stage cleanup subtask exec env",
zap.String("category", "ddl"))
if _, ok := r.ptbl.(table.PartitionedTable); ok {
ingest.LitBackCtxMgr.Unregister(r.job.ID)
}
return nil
}

// MockDMLExecutionAddIndexSubTaskFinish is used to mock DML execution during distributed add index.
var MockDMLExecutionAddIndexSubTaskFinish func()

func (r *readIndexStage) OnFinished(ctx context.Context, subtask *proto.Subtask) error {
func (r *readIndexExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error {
failpoint.Inject("mockDMLExecutionAddIndexSubTaskFinish", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecutionAddIndexSubTaskFinish != nil {
Expand Down Expand Up @@ -194,14 +191,13 @@ func (r *readIndexStage) OnFinished(ctx context.Context, subtask *proto.Subtask)
return nil
}

func (r *readIndexStage) Rollback(ctx context.Context) error {
func (r *readIndexExecutor) Rollback(ctx context.Context) error {
logutil.Logger(ctx).Info("read index stage rollback backfill add index task",
zap.String("category", "ddl"), zap.Int64("jobID", r.job.ID))
ingest.LitBackCtxMgr.Unregister(r.job.ID)
return nil
}

func (r *readIndexStage) getTableStartEndKey(sm *BackfillSubTaskMeta) (
func (r *readIndexExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) (
start, end kv.Key, tbl table.PhysicalTable, err error) {
currentVer, err1 := getValidCurrentVersion(r.d.store)
if err1 != nil {
Expand All @@ -224,7 +220,7 @@ func (r *readIndexStage) getTableStartEndKey(sm *BackfillSubTaskMeta) (
return start, end, tbl, nil
}

func (r *readIndexStage) buildLocalStorePipeline(
func (r *readIndexExecutor) buildLocalStorePipeline(
opCtx *OperatorCtx,
d *ddl,
sessCtx sessionctx.Context,
Expand All @@ -244,7 +240,7 @@ func (r *readIndexStage) buildLocalStorePipeline(
opCtx, d.store, d.sessPool, r.bc, ei, sessCtx, tbl, r.index, start, end, totalRowCount, counter)
}

func (r *readIndexStage) buildExternalStorePipeline(
func (r *readIndexExecutor) buildExternalStorePipeline(
opCtx *OperatorCtx,
d *ddl,
subtaskID int64,
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
}

scheduler.RegisterTaskType(BackfillTaskType,
func(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable) scheduler.Scheduler {
return newBackfillDistScheduler(ctx, id, taskID, taskTable, d)
func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler {
return newBackfillDistScheduler(ctx, id, task, taskTable, d)
}, scheduler.WithSummary,
)

Expand Down
3 changes: 1 addition & 2 deletions ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, cfg

// Unregister removes a backend context from the backend context manager.
func (m *litBackendCtxMgr) Unregister(jobID int64) {
bc, exist := m.SyncMap.Load(jobID)
bc, exist := m.SyncMap.Delete(jobID)
if !exist {
return
}
Expand All @@ -170,7 +170,6 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) {
bc.checkpointMgr.Close()
}
m.memRoot.Release(StructSizeBackendCtx)
m.Delete(jobID)
m.memRoot.ReleaseWithTag(EncodeBackendTag(jobID))
logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("job ID", jobID),
zap.Int64("current memory usage", m.memRoot.CurrentUsage()),
Expand Down
68 changes: 53 additions & 15 deletions ddl/stage_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ type BackfillSubTaskMeta struct {
TotalKVSize uint64 `json:"total_kv_size"`
}

// NewBackfillSchedulerHandle creates a new backfill scheduler.
func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl,
stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) {
// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl,
bc ingest.BackendCtx, stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) {
bgm := &BackfillGlobalMeta{}
err := json.Unmarshal(taskMeta, bgm)
if err != nil {
Expand All @@ -73,23 +73,18 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl,
return nil, errors.New("index info not found")
}

bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli, jobMeta.ReorgMeta.ResourceGroupName)
if err != nil {
return nil, errors.Trace(err)
}

switch stage {
case proto.StepInit:
jc := d.jobContext(jobMeta.ID, jobMeta.ReorgMeta)
d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query)
d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type)
return newReadIndexStage(
return newReadIndexExecutor(
d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil
case proto.StepOne:
if len(bgm.CloudStorageURI) > 0 {
return newMergeSortStage(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
return newCloudImportExecutor(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI)
}
return newIngestIndexStage(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc), nil
return newImportFromLocalStepExecutor(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc), nil
default:
return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID)
}
Expand All @@ -100,23 +95,66 @@ const BackfillTaskType = "backfill"

type backfillDistScheduler struct {
*scheduler.BaseScheduler
d *ddl
d *ddl
task *proto.Task
taskTable scheduler.TaskTable
backendCtx ingest.BackendCtx
jobID int64
}

func newBackfillDistScheduler(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler {
func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler {
s := &backfillDistScheduler{
BaseScheduler: scheduler.NewBaseScheduler(ctx, id, taskID, taskTable),
BaseScheduler: scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable),
d: d,
task: task,
taskTable: taskTable,
}
s.BaseScheduler.Extension = s
return s
}

func (s *backfillDistScheduler) Init(ctx context.Context) error {
err := s.BaseScheduler.Init(ctx)
if err != nil {
return err
}
d := s.d

bgm := &BackfillGlobalMeta{}
err = json.Unmarshal(s.task.Meta, bgm)
if err != nil {
return errors.Trace(err)
}
job := &bgm.Job
_, tbl, err := d.getTableByTxn(d.store, job.SchemaID, job.TableID)
if err != nil {
return errors.Trace(err)
}
idx := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleID)
if idx == nil {
return errors.Trace(errors.New("index info not found"))
}
bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName)
if err != nil {
return errors.Trace(err)
}
s.backendCtx = bc
s.jobID = job.ID
return nil
}

func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) {
switch task.Step {
case proto.StepInit, proto.StepOne:
return NewBackfillSchedulerHandle(ctx, task.Meta, s.d, task.Step, summary)
return NewBackfillSubtaskExecutor(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary)
default:
return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID)
}
}

func (s *backfillDistScheduler) Close() {
if s.backendCtx != nil {
ingest.LitBackCtxMgr.Unregister(s.jobID)
}
Comment on lines +156 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call Close of base scheduler

s.BaseScheduler.Close()
}
4 changes: 2 additions & 2 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func registerTaskMetaInner(t *testing.T, taskType string, mockExtension schedule
return baseDispatcher
})
scheduler.RegisterTaskType(taskType,
func(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable) scheduler.Scheduler {
s := scheduler.NewBaseScheduler(ctx, id, taskID, taskTable)
func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler {
s := scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable)
s.Extension = mockExtension
return s
},
Expand Down
Loading