From c3fd9b664359fa2109d1387ff8238d17bdb090db Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Sep 2023 18:17:46 +0800 Subject: [PATCH 01/10] disttask, ddl: move backend ctx register code to scheduler --- ddl/ddl.go | 4 +- ddl/ingest/backend_mgr.go | 2 +- ddl/stage_ingest_index.go | 2 - ddl/stage_merge_sort.go | 1 - ddl/stage_read_index.go | 4 -- ddl/stage_scheduler.go | 58 +++++++++++++++---- disttask/framework/framework_test.go | 4 +- disttask/framework/mock/scheduler_mock.go | 12 ++++ disttask/framework/scheduler/interface.go | 1 + disttask/framework/scheduler/manager.go | 19 +++--- disttask/framework/scheduler/manager_test.go | 7 ++- disttask/framework/scheduler/register.go | 3 +- disttask/framework/scheduler/register_test.go | 3 +- disttask/framework/scheduler/scheduler.go | 4 ++ disttask/importinto/scheduler.go | 4 +- util/generic/sync_map.go | 9 +++ 16 files changed, 100 insertions(+), 37 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 8abbe3c8a8d53..0b0e9619be899 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -673,8 +673,8 @@ func newDDL(ctx context.Context, options ...Option) *ddl { } scheduler.RegisterTaskType(BackfillTaskType, - func(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable) scheduler.Scheduler { - return newBackfillDistScheduler(ctx, id, taskID, taskTable, d) + func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler { + return newBackfillDistScheduler(ctx, id, task, taskTable, d) }, scheduler.WithSummary, ) diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index d1e9f907e460c..bd1abb5724f35 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -164,13 +164,13 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) { if !exist { return } + m.Delete(jobID) bc.unregisterAll(jobID) bc.backend.Close() if bc.checkpointMgr != nil { bc.checkpointMgr.Close() } m.memRoot.Release(StructSizeBackendCtx) - m.Delete(jobID) m.memRoot.ReleaseWithTag(EncodeBackendTag(jobID)) logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("job ID", jobID), zap.Int64("current memory usage", m.memRoot.CurrentUsage()), diff --git a/ddl/stage_ingest_index.go b/ddl/stage_ingest_index.go index 13fd8eb3e9612..b73d740d03072 100644 --- a/ddl/stage_ingest_index.go +++ b/ddl/stage_ingest_index.go @@ -68,7 +68,6 @@ func (*ingestIndexStage) RunSubtask(ctx context.Context, _ *proto.Subtask) error func (i *ingestIndexStage) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("ingest index stage cleanup subtask exec env") - ingest.LitBackCtxMgr.Unregister(i.jobID) return nil } @@ -79,6 +78,5 @@ func (*ingestIndexStage) OnFinished(ctx context.Context, _ *proto.Subtask) error func (i *ingestIndexStage) Rollback(ctx context.Context) error { logutil.Logger(ctx).Info("ingest index stage rollback backfill add index task") - ingest.LitBackCtxMgr.Unregister(i.jobID) return nil } diff --git a/ddl/stage_merge_sort.go b/ddl/stage_merge_sort.go index 46b17193f780c..cd4748b45b8b9 100644 --- a/ddl/stage_merge_sort.go +++ b/ddl/stage_merge_sort.go @@ -96,7 +96,6 @@ func (m *mergeSortStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) func (m *mergeSortStage) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("merge sort stage clean up subtask env") - ingest.LitBackCtxMgr.Unregister(m.jobID) return nil } diff --git a/ddl/stage_read_index.go b/ddl/stage_read_index.go index 5032a916ab2ac..c322589e16a37 100644 --- a/ddl/stage_read_index.go +++ b/ddl/stage_read_index.go @@ -149,9 +149,6 @@ func (r *readIndexStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) func (r *readIndexStage) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("read index stage cleanup subtask exec env", zap.String("category", "ddl")) - if _, ok := r.ptbl.(table.PartitionedTable); ok { - ingest.LitBackCtxMgr.Unregister(r.job.ID) - } return nil } @@ -197,7 +194,6 @@ func (r *readIndexStage) OnFinished(ctx context.Context, subtask *proto.Subtask) func (r *readIndexStage) Rollback(ctx context.Context) error { logutil.Logger(ctx).Info("read index stage rollback backfill add index task", zap.String("category", "ddl"), zap.Int64("jobID", r.job.ID)) - ingest.LitBackCtxMgr.Unregister(r.job.ID) return nil } diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index f720a1a06cdb3..eec56df20b292 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -54,7 +54,7 @@ type BackfillSubTaskMeta struct { // NewBackfillSchedulerHandle creates a new backfill scheduler. func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, - stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) { + bc ingest.BackendCtx, stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) { bgm := &BackfillGlobalMeta{} err := json.Unmarshal(taskMeta, bgm) if err != nil { @@ -73,11 +73,6 @@ func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, return nil, errors.New("index info not found") } - bc, err := ingest.LitBackCtxMgr.Register(ctx, indexInfo.Unique, jobMeta.ID, d.etcdCli, jobMeta.ReorgMeta.ResourceGroupName) - if err != nil { - return nil, errors.Trace(err) - } - switch stage { case proto.StepInit: jc := d.jobContext(jobMeta.ID, jobMeta.ReorgMeta) @@ -100,23 +95,66 @@ const BackfillTaskType = "backfill" type backfillDistScheduler struct { *scheduler.BaseScheduler - d *ddl + d *ddl + backendCtx ingest.BackendCtx + jobID int64 } -func newBackfillDistScheduler(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler { +func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable, d *ddl) scheduler.Scheduler { s := &backfillDistScheduler{ - BaseScheduler: scheduler.NewBaseScheduler(ctx, id, taskID, taskTable), + BaseScheduler: scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable), d: d, } s.BaseScheduler.Extension = s + + wrapErr := func(err error) scheduler.Scheduler { + s.BaseScheduler.Extension = &failedExtension{err: err} + return s + } + + bgm := &BackfillGlobalMeta{} + err := json.Unmarshal(task.Meta, bgm) + if err != nil { + return wrapErr(err) + } + job := &bgm.Job + _, tbl, err := d.getTableByTxn(d.store, job.SchemaID, job.TableID) + if err != nil { + return wrapErr(err) + } + idx := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleID) + if idx == nil { + return wrapErr(errors.Trace(errors.New("index info not found"))) + } + bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName) + if err != nil { + return wrapErr(errors.Trace(err)) + } + s.backendCtx = bc + s.jobID = job.ID return s } func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) { switch task.Step { case proto.StepInit, proto.StepOne: - return NewBackfillSchedulerHandle(ctx, task.Meta, s.d, task.Step, summary) + return NewBackfillSchedulerHandle(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary) default: return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID) } } + +func (s *backfillDistScheduler) Close() { + if s.backendCtx != nil { + ingest.LitBackCtxMgr.Unregister(s.jobID) + } +} + +type failedExtension struct { + err error +} + +func (e *failedExtension) GetSubtaskExecutor(_ context.Context, _ *proto.Task, _ *execute.Summary) ( + execute.SubtaskExecutor, error) { + return nil, e.err +} diff --git a/disttask/framework/framework_test.go b/disttask/framework/framework_test.go index 78fc7bfe7f70d..5b8cbc862b945 100644 --- a/disttask/framework/framework_test.go +++ b/disttask/framework/framework_test.go @@ -141,8 +141,8 @@ func registerTaskMetaInner(t *testing.T, taskType string, mockExtension schedule return baseDispatcher }) scheduler.RegisterTaskType(taskType, - func(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable) scheduler.Scheduler { - s := scheduler.NewBaseScheduler(ctx, id, taskID, taskTable) + func(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler { + s := scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable) s.Extension = mockExtension return s }, diff --git a/disttask/framework/mock/scheduler_mock.go b/disttask/framework/mock/scheduler_mock.go index 20287b77ce705..b2e5355770512 100644 --- a/disttask/framework/mock/scheduler_mock.go +++ b/disttask/framework/mock/scheduler_mock.go @@ -281,6 +281,18 @@ func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder { return m.recorder } +// Close mocks base method. +func (m *MockScheduler) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockSchedulerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockScheduler)(nil).Close)) +} + // Rollback mocks base method. func (m *MockScheduler) Rollback(arg0 context.Context, arg1 *proto.Task) error { m.ctrl.T.Helper() diff --git a/disttask/framework/scheduler/interface.go b/disttask/framework/scheduler/interface.go index 5a46c7a23b8a9..2c2686522263f 100644 --- a/disttask/framework/scheduler/interface.go +++ b/disttask/framework/scheduler/interface.go @@ -49,6 +49,7 @@ type Pool interface { type Scheduler interface { Run(context.Context, *proto.Task) error Rollback(context.Context, *proto.Task) error + Close() } // Extension extends the scheduler. diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 96d675f8d06b8..4889461137ac0 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -192,9 +192,8 @@ func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) { } logutil.Logger(m.logCtx).Info("detect new subtask", zap.Any("task_id", task.ID)) m.addHandlingTask(task.ID) - t := task err = m.schedulerPool.Run(func() { - m.onRunnableTask(ctx, t.ID, t.Type) + m.onRunnableTask(ctx, task) m.removeHandlingTask(task.ID) }) // pool closed. @@ -254,15 +253,16 @@ type TestContext struct { var testContexts sync.Map // onRunnableTask handles a runnable task. -func (m *Manager) onRunnableTask(ctx context.Context, taskID int64, taskType string) { - logutil.Logger(m.logCtx).Info("onRunnableTask", zap.Any("task_id", taskID), zap.Any("type", taskType)) +func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { + logutil.Logger(m.logCtx).Info("onRunnableTask", zap.Int64("task_id", task.ID), zap.String("type", task.Type)) // runCtx only used in scheduler.Run, cancel in m.fetchAndFastCancelTasks. - factory := getSchedulerFactory(taskType) + factory := getSchedulerFactory(task.Type) if factory == nil { - m.onError(errors.Errorf("task type %s not found", taskType)) + m.onError(errors.Errorf("task type %s not found", task.Type)) return } - scheduler := factory(ctx, m.id, taskID, m.taskTable) + scheduler := factory(ctx, m.id, task, m.taskTable) + // defer scheduler.Close() for { select { case <-ctx.Done(): @@ -280,13 +280,14 @@ func (m *Manager) onRunnableTask(ctx context.Context, taskID int64, taskType str } }() }) - task, err := m.taskTable.GetGlobalTaskByID(taskID) + task, err := m.taskTable.GetGlobalTaskByID(task.ID) if err != nil { m.onError(err) return } if task.State != proto.TaskStateRunning && task.State != proto.TaskStateReverting { - logutil.Logger(m.logCtx).Info("onRunnableTask exit", zap.Any("task_id", taskID), zap.Int64("step", task.Step), zap.Any("state", task.State)) + logutil.Logger(m.logCtx).Info("onRunnableTask exit", + zap.Int64("task_id", task.ID), zap.Int64("step", task.Step), zap.String("state", task.State)) return } if exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRevertPending); err != nil { diff --git a/disttask/framework/scheduler/manager_test.go b/disttask/framework/scheduler/manager_test.go index dd8b383464e9e..0181f942bf27c 100644 --- a/disttask/framework/scheduler/manager_test.go +++ b/disttask/framework/scheduler/manager_test.go @@ -101,7 +101,7 @@ func TestOnRunnableTasks(t *testing.T) { m.onRunnableTasks(context.Background(), nil) RegisterTaskType("type", - func(ctx context.Context, id string, taskID int64, taskTable TaskTable) Scheduler { + func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) Scheduler { return mockInternalScheduler }) @@ -164,7 +164,7 @@ func TestManager(t *testing.T) { return mockPool, nil }) RegisterTaskType("type", - func(ctx context.Context, id string, taskID int64, taskTable TaskTable) Scheduler { + func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) Scheduler { return mockInternalScheduler }) id := "test" @@ -188,9 +188,11 @@ func TestManager(t *testing.T) { []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(true, nil) mockInternalScheduler.EXPECT().Run(gomock.Any(), task1).Return(nil) + mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID1, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(false, nil).AnyTimes() + // mockInternalScheduler.EXPECT().Close() // task2 mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). @@ -204,6 +206,7 @@ func TestManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(false, nil).AnyTimes() + // mockInternalScheduler.EXPECT().Close() // for scheduler pool mockPool.EXPECT().ReleaseAndWait().Do(func() { wg.Wait() diff --git a/disttask/framework/scheduler/register.go b/disttask/framework/scheduler/register.go index 250d33fe0d9cb..9d55455f94a2b 100644 --- a/disttask/framework/scheduler/register.go +++ b/disttask/framework/scheduler/register.go @@ -17,6 +17,7 @@ package scheduler import ( "context" + "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/scheduler/execute" ) @@ -35,7 +36,7 @@ var ( taskSchedulerFactories = make(map[string]schedulerFactoryFn) ) -type schedulerFactoryFn func(ctx context.Context, id string, taskID int64, taskTable TaskTable) Scheduler +type schedulerFactoryFn func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) Scheduler // RegisterTaskType registers the task type. func RegisterTaskType(taskType string, factory schedulerFactoryFn, opts ...TaskTypeOption) { diff --git a/disttask/framework/scheduler/register_test.go b/disttask/framework/scheduler/register_test.go index b612129757eee..5111a936a7a76 100644 --- a/disttask/framework/scheduler/register_test.go +++ b/disttask/framework/scheduler/register_test.go @@ -18,13 +18,14 @@ import ( "context" "testing" + "github.com/pingcap/tidb/disttask/framework/proto" "github.com/stretchr/testify/require" ) func TestRegisterTaskType(t *testing.T) { // other case might add task types, so we need to clear it first ClearSchedulers() - factoryFn := func(ctx context.Context, id string, taskID int64, taskTable TaskTable) Scheduler { + factoryFn := func(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) Scheduler { return nil } RegisterTaskType("test1", factoryFn) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 0ee1f421abaef..19227ec28adf9 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -350,6 +350,10 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { return s.getError() } +// Close closes the scheduler when all the subtasks are complete. +func (s *BaseScheduler) Close() { +} + func runSummaryCollectLoop( ctx context.Context, task *proto.Task, diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index f64fb638e2281..ef65e1524fe4c 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -249,9 +249,9 @@ type importScheduler struct { *scheduler.BaseScheduler } -func newImportScheduler(ctx context.Context, id string, taskID int64, taskTable scheduler.TaskTable) scheduler.Scheduler { +func newImportScheduler(ctx context.Context, id string, task *proto.Task, taskTable scheduler.TaskTable) scheduler.Scheduler { s := &importScheduler{ - BaseScheduler: scheduler.NewBaseScheduler(ctx, id, taskID, taskTable), + BaseScheduler: scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable), } s.BaseScheduler.Extension = s return s diff --git a/util/generic/sync_map.go b/util/generic/sync_map.go index 0a3d0f734d76f..1b64cf7ff3b9e 100644 --- a/util/generic/sync_map.go +++ b/util/generic/sync_map.go @@ -44,6 +44,15 @@ func (m *SyncMap[K, V]) Load(key K) (V, bool) { return val, exist } +// LoadAndDelete +func (m *SyncMap[K, V]) LoadAndDelete(key K) (V, bool) { + m.mu.Lock() + val, exist := m.item[key] + delete(m.item, key) + m.mu.RUnlock() + return val, exist +} + // Delete deletes a key value. func (m *SyncMap[K, V]) Delete(key K) { m.mu.Lock() From b4d9580af929849edf2a1ffe3ed5c23e65312470 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Sep 2023 18:23:00 +0800 Subject: [PATCH 02/10] fix TestManager --- disttask/framework/scheduler/manager.go | 7 ++++--- disttask/framework/scheduler/manager_test.go | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 4889461137ac0..2d530a51bfc1c 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -192,9 +192,10 @@ func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) { } logutil.Logger(m.logCtx).Info("detect new subtask", zap.Any("task_id", task.ID)) m.addHandlingTask(task.ID) + t := task err = m.schedulerPool.Run(func() { - m.onRunnableTask(ctx, task) - m.removeHandlingTask(task.ID) + m.onRunnableTask(ctx, t) + m.removeHandlingTask(t.ID) }) // pool closed. if err != nil { @@ -262,7 +263,7 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { return } scheduler := factory(ctx, m.id, task, m.taskTable) - // defer scheduler.Close() + defer scheduler.Close() for { select { case <-ctx.Done(): diff --git a/disttask/framework/scheduler/manager_test.go b/disttask/framework/scheduler/manager_test.go index 0181f942bf27c..04bff5e8b82b5 100644 --- a/disttask/framework/scheduler/manager_test.go +++ b/disttask/framework/scheduler/manager_test.go @@ -192,7 +192,7 @@ func TestManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID1, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(false, nil).AnyTimes() - // mockInternalScheduler.EXPECT().Close() + mockInternalScheduler.EXPECT().Close() // task2 mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). @@ -206,7 +206,7 @@ func TestManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(false, nil).AnyTimes() - // mockInternalScheduler.EXPECT().Close() + mockInternalScheduler.EXPECT().Close() // for scheduler pool mockPool.EXPECT().ReleaseAndWait().Do(func() { wg.Wait() From 8629b43ca0af9ba0638d3760d7c83b99aea8b25e Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Sep 2023 18:29:43 +0800 Subject: [PATCH 03/10] fix concurrent delete --- ddl/ingest/backend_mgr.go | 3 +-- util/generic/sync_map.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index bd1abb5724f35..d70875a56bb12 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -160,11 +160,10 @@ func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, cfg // Unregister removes a backend context from the backend context manager. func (m *litBackendCtxMgr) Unregister(jobID int64) { - bc, exist := m.SyncMap.Load(jobID) + bc, exist := m.SyncMap.LoadAndDelete(jobID) if !exist { return } - m.Delete(jobID) bc.unregisterAll(jobID) bc.backend.Close() if bc.checkpointMgr != nil { diff --git a/util/generic/sync_map.go b/util/generic/sync_map.go index 1b64cf7ff3b9e..116d892d909c5 100644 --- a/util/generic/sync_map.go +++ b/util/generic/sync_map.go @@ -49,7 +49,7 @@ func (m *SyncMap[K, V]) LoadAndDelete(key K) (V, bool) { m.mu.Lock() val, exist := m.item[key] delete(m.item, key) - m.mu.RUnlock() + m.mu.Unlock() return val, exist } From 70c1e47f00b987a3896fa56a9320e7642e500548 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Sep 2023 18:49:35 +0800 Subject: [PATCH 04/10] fix linter --- ddl/stage_ingest_index.go | 4 ++-- ddl/stage_merge_sort.go | 2 +- ddl/stage_read_index.go | 2 +- ddl/stage_scheduler.go | 2 +- disttask/framework/scheduler/scheduler.go | 2 +- util/generic/sync_map.go | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ddl/stage_ingest_index.go b/ddl/stage_ingest_index.go index b73d740d03072..50a81fcec510d 100644 --- a/ddl/stage_ingest_index.go +++ b/ddl/stage_ingest_index.go @@ -66,7 +66,7 @@ func (*ingestIndexStage) RunSubtask(ctx context.Context, _ *proto.Subtask) error return nil } -func (i *ingestIndexStage) Cleanup(ctx context.Context) error { +func (*ingestIndexStage) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("ingest index stage cleanup subtask exec env") return nil } @@ -76,7 +76,7 @@ func (*ingestIndexStage) OnFinished(ctx context.Context, _ *proto.Subtask) error return nil } -func (i *ingestIndexStage) Rollback(ctx context.Context) error { +func (*ingestIndexStage) Rollback(ctx context.Context) error { logutil.Logger(ctx).Info("ingest index stage rollback backfill add index task") return nil } diff --git a/ddl/stage_merge_sort.go b/ddl/stage_merge_sort.go index cd4748b45b8b9..8f3d6c99cbca6 100644 --- a/ddl/stage_merge_sort.go +++ b/ddl/stage_merge_sort.go @@ -94,7 +94,7 @@ func (m *mergeSortStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) return err } -func (m *mergeSortStage) Cleanup(ctx context.Context) error { +func (*mergeSortStage) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("merge sort stage clean up subtask env") return nil } diff --git a/ddl/stage_read_index.go b/ddl/stage_read_index.go index c322589e16a37..1c7326a154ebd 100644 --- a/ddl/stage_read_index.go +++ b/ddl/stage_read_index.go @@ -146,7 +146,7 @@ func (r *readIndexStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) return nil } -func (r *readIndexStage) Cleanup(ctx context.Context) error { +func (*readIndexStage) Cleanup(ctx context.Context) error { logutil.Logger(ctx).Info("read index stage cleanup subtask exec env", zap.String("category", "ddl")) return nil diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index eec56df20b292..d2525a73b0e06 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -53,7 +53,7 @@ type BackfillSubTaskMeta struct { } // NewBackfillSchedulerHandle creates a new backfill scheduler. -func NewBackfillSchedulerHandle(ctx context.Context, taskMeta []byte, d *ddl, +func NewBackfillSchedulerHandle(_ context.Context, taskMeta []byte, d *ddl, bc ingest.BackendCtx, stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) { bgm := &BackfillGlobalMeta{} err := json.Unmarshal(taskMeta, bgm) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 19227ec28adf9..ac40b6522bd8b 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -351,7 +351,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { } // Close closes the scheduler when all the subtasks are complete. -func (s *BaseScheduler) Close() { +func (*BaseScheduler) Close() { } func runSummaryCollectLoop( diff --git a/util/generic/sync_map.go b/util/generic/sync_map.go index 116d892d909c5..1690d2bdebd90 100644 --- a/util/generic/sync_map.go +++ b/util/generic/sync_map.go @@ -44,7 +44,7 @@ func (m *SyncMap[K, V]) Load(key K) (V, bool) { return val, exist } -// LoadAndDelete +// LoadAndDelete loads and deletes a key value atomically. func (m *SyncMap[K, V]) LoadAndDelete(key K) (V, bool) { m.mu.Lock() val, exist := m.item[key] From 964ed52c330adc357492a858244fa946b5a01413 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Sep 2023 20:52:46 +0800 Subject: [PATCH 05/10] rename stuff --- ddl/stage_ingest_index.go | 82 ------------ ddl/stage_merge_sort.go | 110 --------------- ddl/stage_read_index.go | 274 -------------------------------------- ddl/stage_scheduler.go | 12 +- 4 files changed, 6 insertions(+), 472 deletions(-) delete mode 100644 ddl/stage_ingest_index.go delete mode 100644 ddl/stage_merge_sort.go delete mode 100644 ddl/stage_read_index.go diff --git a/ddl/stage_ingest_index.go b/ddl/stage_ingest_index.go deleted file mode 100644 index 50a81fcec510d..0000000000000 --- a/ddl/stage_ingest_index.go +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "context" - - "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/ddl/ingest" - "github.com/pingcap/tidb/disttask/framework/proto" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" -) - -type ingestIndexStage struct { - jobID int64 - index *model.IndexInfo - ptbl table.PhysicalTable - bc ingest.BackendCtx -} - -func newIngestIndexStage( - jobID int64, - index *model.IndexInfo, - ptbl table.PhysicalTable, - bc ingest.BackendCtx, -) *ingestIndexStage { - return &ingestIndexStage{ - jobID: jobID, - index: index, - ptbl: ptbl, - bc: bc, - } -} - -func (i *ingestIndexStage) Init(ctx context.Context) error { - logutil.Logger(ctx).Info("ingest index stage init subtask exec env") - _, _, err := i.bc.Flush(i.index.ID, ingest.FlushModeForceGlobal) - if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, i.index, i.ptbl.Meta()) - return err - } - logutil.Logger(ctx).Error("flush error", zap.Error(err)) - return err - } - return err -} - -func (*ingestIndexStage) RunSubtask(ctx context.Context, _ *proto.Subtask) error { - logutil.Logger(ctx).Info("ingest index stage split subtask") - return nil -} - -func (*ingestIndexStage) Cleanup(ctx context.Context) error { - logutil.Logger(ctx).Info("ingest index stage cleanup subtask exec env") - return nil -} - -func (*ingestIndexStage) OnFinished(ctx context.Context, _ *proto.Subtask) error { - logutil.Logger(ctx).Info("ingest index stage finish subtask") - return nil -} - -func (*ingestIndexStage) Rollback(ctx context.Context) error { - logutil.Logger(ctx).Info("ingest index stage rollback backfill add index task") - return nil -} diff --git a/ddl/stage_merge_sort.go b/ddl/stage_merge_sort.go deleted file mode 100644 index 8f3d6c99cbca6..0000000000000 --- a/ddl/stage_merge_sort.go +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "context" - "encoding/json" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/br/pkg/lightning/backend" - "github.com/pingcap/tidb/br/pkg/lightning/config" - "github.com/pingcap/tidb/ddl/ingest" - "github.com/pingcap/tidb/disttask/framework/proto" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" -) - -type mergeSortStage struct { - jobID int64 - index *model.IndexInfo - ptbl table.PhysicalTable - bc ingest.BackendCtx - cloudStoreURI string -} - -func newMergeSortStage( - jobID int64, - index *model.IndexInfo, - ptbl table.PhysicalTable, - bc ingest.BackendCtx, - cloudStoreURI string, -) (*mergeSortStage, error) { - return &mergeSortStage{ - jobID: jobID, - index: index, - ptbl: ptbl, - bc: bc, - cloudStoreURI: cloudStoreURI, - }, nil -} - -func (*mergeSortStage) Init(ctx context.Context) error { - logutil.Logger(ctx).Info("merge sort stage init subtask exec env") - return nil -} - -func (m *mergeSortStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) error { - logutil.Logger(ctx).Info("merge sort stage split subtask") - - sm := &BackfillSubTaskMeta{} - err := json.Unmarshal(subtask.Meta, sm) - if err != nil { - logutil.BgLogger().Error("unmarshal error", - zap.String("category", "ddl"), - zap.Error(err)) - return err - } - - local := m.bc.GetLocalBackend() - if local == nil { - return errors.Errorf("local backend not found") - } - _, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, int32(m.index.ID)) - err = local.CloseEngine(ctx, &backend.EngineConfig{ - External: &backend.ExternalEngineConfig{ - StorageURI: m.cloudStoreURI, - DataFiles: sm.DataFiles, - StatFiles: sm.StatFiles, - MinKey: sm.MinKey, - MaxKey: sm.MaxKey, - SplitKeys: sm.RangeSplitKeys, - TotalFileSize: int64(sm.TotalKVSize), - TotalKVCount: 0, - }, - }, engineUUID) - if err != nil { - return err - } - err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) - return err -} - -func (*mergeSortStage) Cleanup(ctx context.Context) error { - logutil.Logger(ctx).Info("merge sort stage clean up subtask env") - return nil -} - -func (*mergeSortStage) OnFinished(ctx context.Context, _ *proto.Subtask) error { - logutil.Logger(ctx).Info("merge sort stage finish subtask") - return nil -} - -func (*mergeSortStage) Rollback(ctx context.Context) error { - logutil.Logger(ctx).Info("merge sort stage rollback subtask") - return nil -} diff --git a/ddl/stage_read_index.go b/ddl/stage_read_index.go deleted file mode 100644 index 1c7326a154ebd..0000000000000 --- a/ddl/stage_read_index.go +++ /dev/null @@ -1,274 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "context" - "encoding/hex" - "encoding/json" - "sync" - "sync/atomic" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/br/pkg/lightning/backend/external" - "github.com/pingcap/tidb/ddl/ingest" - "github.com/pingcap/tidb/disttask/framework/proto" - "github.com/pingcap/tidb/disttask/framework/scheduler/execute" - "github.com/pingcap/tidb/disttask/operator" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/metrics" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/table" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" -) - -type readIndexStage struct { - d *ddl - job *model.Job - index *model.IndexInfo - ptbl table.PhysicalTable - jc *JobContext - - cloudStorageURI string - - bc ingest.BackendCtx - summary *execute.Summary - - subtaskSummary sync.Map // subtaskID => readIndexSummary -} - -type readIndexSummary struct { - minKey []byte - maxKey []byte - totalSize uint64 - dataFiles []string - statFiles []string - mu sync.Mutex -} - -func newReadIndexStage( - d *ddl, - job *model.Job, - index *model.IndexInfo, - ptbl table.PhysicalTable, - jc *JobContext, - bc ingest.BackendCtx, - summary *execute.Summary, - cloudStorageURI string, -) *readIndexStage { - return &readIndexStage{ - d: d, - job: job, - index: index, - ptbl: ptbl, - jc: jc, - bc: bc, - summary: summary, - cloudStorageURI: cloudStorageURI, - } -} - -func (*readIndexStage) Init(_ context.Context) error { - logutil.BgLogger().Info("read index stage init subtask exec env", - zap.String("category", "ddl")) - return nil -} - -func (r *readIndexStage) RunSubtask(ctx context.Context, subtask *proto.Subtask) error { - logutil.BgLogger().Info("read index stage run subtask", - zap.String("category", "ddl")) - - r.subtaskSummary.Store(subtask.ID, &readIndexSummary{}) - - d := r.d - sm := &BackfillSubTaskMeta{} - err := json.Unmarshal(subtask.Meta, sm) - if err != nil { - logutil.BgLogger().Error("unmarshal error", - zap.String("category", "ddl"), - zap.Error(err)) - return err - } - - startKey, endKey, tbl, err := r.getTableStartEndKey(sm) - if err != nil { - return err - } - - sessCtx, err := newSessCtx( - d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName) - if err != nil { - return err - } - - opCtx := NewOperatorCtx(ctx) - defer opCtx.Cancel() - totalRowCount := &atomic.Int64{} - - var pipe *operator.AsyncPipeline - if len(r.cloudStorageURI) > 0 { - pipe, err = r.buildExternalStorePipeline(opCtx, d, subtask.ID, sessCtx, tbl, startKey, endKey, totalRowCount) - } else { - pipe, err = r.buildLocalStorePipeline(opCtx, d, sessCtx, tbl, startKey, endKey, totalRowCount) - } - if err != nil { - return err - } - - err = pipe.Execute() - if err != nil { - return err - } - err = pipe.Close() - if opCtx.OperatorErr() != nil { - return opCtx.OperatorErr() - } - if err != nil { - return err - } - - r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load()) - return nil -} - -func (*readIndexStage) Cleanup(ctx context.Context) error { - logutil.Logger(ctx).Info("read index stage cleanup subtask exec env", - zap.String("category", "ddl")) - return nil -} - -// MockDMLExecutionAddIndexSubTaskFinish is used to mock DML execution during distributed add index. -var MockDMLExecutionAddIndexSubTaskFinish func() - -func (r *readIndexStage) OnFinished(ctx context.Context, subtask *proto.Subtask) error { - failpoint.Inject("mockDMLExecutionAddIndexSubTaskFinish", func(val failpoint.Value) { - //nolint:forcetypeassert - if val.(bool) && MockDMLExecutionAddIndexSubTaskFinish != nil { - MockDMLExecutionAddIndexSubTaskFinish() - } - }) - if len(r.cloudStorageURI) == 0 { - return nil - } - // Rewrite the subtask meta to record statistics. - var subtaskMeta BackfillSubTaskMeta - err := json.Unmarshal(subtask.Meta, &subtaskMeta) - if err != nil { - return err - } - sum, _ := r.subtaskSummary.LoadAndDelete(subtask.ID) - s := sum.(*readIndexSummary) - subtaskMeta.MinKey = s.minKey - subtaskMeta.MaxKey = s.maxKey - subtaskMeta.TotalKVSize = s.totalSize - subtaskMeta.DataFiles = s.dataFiles - subtaskMeta.StatFiles = s.statFiles - logutil.Logger(ctx).Info("get key boundary on subtask finished", - zap.String("min", hex.EncodeToString(s.minKey)), - zap.String("max", hex.EncodeToString(s.maxKey)), - zap.Int("fileCount", len(s.dataFiles)), - zap.Uint64("totalSize", s.totalSize)) - meta, err := json.Marshal(subtaskMeta) - if err != nil { - return err - } - subtask.Meta = meta - return nil -} - -func (r *readIndexStage) Rollback(ctx context.Context) error { - logutil.Logger(ctx).Info("read index stage rollback backfill add index task", - zap.String("category", "ddl"), zap.Int64("jobID", r.job.ID)) - return nil -} - -func (r *readIndexStage) getTableStartEndKey(sm *BackfillSubTaskMeta) ( - start, end kv.Key, tbl table.PhysicalTable, err error) { - currentVer, err1 := getValidCurrentVersion(r.d.store) - if err1 != nil { - return nil, nil, nil, errors.Trace(err1) - } - if parTbl, ok := r.ptbl.(table.PartitionedTable); ok { - pid := sm.PhysicalTableID - start, end, err = getTableRange(r.jc, r.d.ddlCtx, parTbl.GetPartition(pid), currentVer.Ver, r.job.Priority) - if err != nil { - logutil.BgLogger().Error("get table range error", - zap.String("category", "ddl"), - zap.Error(err)) - return nil, nil, nil, err - } - tbl = parTbl.GetPartition(pid) - } else { - start, end = sm.StartKey, sm.EndKey - tbl = r.ptbl - } - return start, end, tbl, nil -} - -func (r *readIndexStage) buildLocalStorePipeline( - opCtx *OperatorCtx, - d *ddl, - sessCtx sessionctx.Context, - tbl table.PhysicalTable, - start, end kv.Key, - totalRowCount *atomic.Int64, -) (*operator.AsyncPipeline, error) { - ei, err := r.bc.Register(r.job.ID, r.index.ID, r.job.SchemaName, r.job.TableName) - if err != nil { - logutil.Logger(opCtx).Warn("cannot register new engine", zap.Error(err), - zap.Int64("job ID", r.job.ID), zap.Int64("index ID", r.index.ID)) - return nil, err - } - counter := metrics.BackfillTotalCounter.WithLabelValues( - metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O)) - return NewAddIndexIngestPipeline( - opCtx, d.store, d.sessPool, r.bc, ei, sessCtx, tbl, r.index, start, end, totalRowCount, counter) -} - -func (r *readIndexStage) buildExternalStorePipeline( - opCtx *OperatorCtx, - d *ddl, - subtaskID int64, - sessCtx sessionctx.Context, - tbl table.PhysicalTable, - start, end kv.Key, - totalRowCount *atomic.Int64, -) (*operator.AsyncPipeline, error) { - onClose := func(summary *external.WriterSummary) { - sum, _ := r.subtaskSummary.Load(subtaskID) - s := sum.(*readIndexSummary) - s.mu.Lock() - if len(s.minKey) == 0 || summary.Min.Cmp(s.minKey) < 0 { - s.minKey = summary.Min.Clone() - } - if len(s.maxKey) == 0 || summary.Max.Cmp(s.maxKey) > 0 { - s.maxKey = summary.Max.Clone() - } - s.totalSize += summary.TotalSize - for _, f := range summary.MultipleFilesStats { - for _, filename := range f.Filenames { - s.dataFiles = append(s.dataFiles, filename[0]) - s.statFiles = append(s.statFiles, filename[1]) - } - } - s.mu.Unlock() - } - return NewWriteIndexToExternalStoragePipeline( - opCtx, d.store, r.cloudStorageURI, r.d.sessPool, sessCtx, r.job.ID, subtaskID, - tbl, r.index, start, end, totalRowCount, onClose) -} diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index d2525a73b0e06..aacc1646d84fe 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -52,8 +52,8 @@ type BackfillSubTaskMeta struct { TotalKVSize uint64 `json:"total_kv_size"` } -// NewBackfillSchedulerHandle creates a new backfill scheduler. -func NewBackfillSchedulerHandle(_ context.Context, taskMeta []byte, d *ddl, +// NewBackfillSubtaskExecutor creates a new backfill backfill subtask executor. +func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl, bc ingest.BackendCtx, stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) { bgm := &BackfillGlobalMeta{} err := json.Unmarshal(taskMeta, bgm) @@ -78,13 +78,13 @@ func NewBackfillSchedulerHandle(_ context.Context, taskMeta []byte, d *ddl, jc := d.jobContext(jobMeta.ID, jobMeta.ReorgMeta) d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) - return newReadIndexStage( + return newReadIndexExecutor( d, &bgm.Job, indexInfo, tbl.(table.PhysicalTable), jc, bc, summary, bgm.CloudStorageURI), nil case proto.StepOne: if len(bgm.CloudStorageURI) > 0 { - return newMergeSortStage(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) + return newCloudImportExecutor(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc, bgm.CloudStorageURI) } - return newIngestIndexStage(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc), nil + return newImportFromLocalStepExecutor(jobMeta.ID, indexInfo, tbl.(table.PhysicalTable), bc), nil default: return nil, errors.Errorf("unknown step %d for job %d", stage, jobMeta.ID) } @@ -138,7 +138,7 @@ func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) { switch task.Step { case proto.StepInit, proto.StepOne: - return NewBackfillSchedulerHandle(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary) + return NewBackfillSubtaskExecutor(ctx, task.Meta, s.d, s.backendCtx, task.Step, summary) default: return nil, errors.Errorf("unknown backfill step %d for task %d", task.Step, task.ID) } From 7848b5d7a9d40761da2e919e25b74dba09231f88 Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Sep 2023 20:53:44 +0800 Subject: [PATCH 06/10] rename stuff --- ddl/backfilling_import_cloud.go | 110 +++++++++++++ ddl/backfilling_import_local.go | 82 ++++++++++ ddl/backfilling_read_index.go | 274 ++++++++++++++++++++++++++++++++ 3 files changed, 466 insertions(+) create mode 100644 ddl/backfilling_import_cloud.go create mode 100644 ddl/backfilling_import_local.go create mode 100644 ddl/backfilling_read_index.go diff --git a/ddl/backfilling_import_cloud.go b/ddl/backfilling_import_cloud.go new file mode 100644 index 0000000000000..693af13232f10 --- /dev/null +++ b/ddl/backfilling_import_cloud.go @@ -0,0 +1,110 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "encoding/json" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type cloudImportExecutor struct { + jobID int64 + index *model.IndexInfo + ptbl table.PhysicalTable + bc ingest.BackendCtx + cloudStoreURI string +} + +func newCloudImportExecutor( + jobID int64, + index *model.IndexInfo, + ptbl table.PhysicalTable, + bc ingest.BackendCtx, + cloudStoreURI string, +) (*cloudImportExecutor, error) { + return &cloudImportExecutor{ + jobID: jobID, + index: index, + ptbl: ptbl, + bc: bc, + cloudStoreURI: cloudStoreURI, + }, nil +} + +func (*cloudImportExecutor) Init(ctx context.Context) error { + logutil.Logger(ctx).Info("merge sort stage init subtask exec env") + return nil +} + +func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error { + logutil.Logger(ctx).Info("merge sort stage split subtask") + + sm := &BackfillSubTaskMeta{} + err := json.Unmarshal(subtask.Meta, sm) + if err != nil { + logutil.BgLogger().Error("unmarshal error", + zap.String("category", "ddl"), + zap.Error(err)) + return err + } + + local := m.bc.GetLocalBackend() + if local == nil { + return errors.Errorf("local backend not found") + } + _, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, int32(m.index.ID)) + err = local.CloseEngine(ctx, &backend.EngineConfig{ + External: &backend.ExternalEngineConfig{ + StorageURI: m.cloudStoreURI, + DataFiles: sm.DataFiles, + StatFiles: sm.StatFiles, + MinKey: sm.MinKey, + MaxKey: sm.MaxKey, + SplitKeys: sm.RangeSplitKeys, + TotalFileSize: int64(sm.TotalKVSize), + TotalKVCount: 0, + }, + }, engineUUID) + if err != nil { + return err + } + err = local.ImportEngine(ctx, engineUUID, int64(config.SplitRegionSize), int64(config.SplitRegionKeys)) + return err +} + +func (*cloudImportExecutor) Cleanup(ctx context.Context) error { + logutil.Logger(ctx).Info("merge sort stage clean up subtask env") + return nil +} + +func (*cloudImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error { + logutil.Logger(ctx).Info("merge sort stage finish subtask") + return nil +} + +func (*cloudImportExecutor) Rollback(ctx context.Context) error { + logutil.Logger(ctx).Info("merge sort stage rollback subtask") + return nil +} diff --git a/ddl/backfilling_import_local.go b/ddl/backfilling_import_local.go new file mode 100644 index 0000000000000..40f5b8a8a6b3f --- /dev/null +++ b/ddl/backfilling_import_local.go @@ -0,0 +1,82 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + + "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type localImportExecutor struct { + jobID int64 + index *model.IndexInfo + ptbl table.PhysicalTable + bc ingest.BackendCtx +} + +func newImportFromLocalStepExecutor( + jobID int64, + index *model.IndexInfo, + ptbl table.PhysicalTable, + bc ingest.BackendCtx, +) *localImportExecutor { + return &localImportExecutor{ + jobID: jobID, + index: index, + ptbl: ptbl, + bc: bc, + } +} + +func (i *localImportExecutor) Init(ctx context.Context) error { + logutil.Logger(ctx).Info("ingest index stage init subtask exec env") + _, _, err := i.bc.Flush(i.index.ID, ingest.FlushModeForceGlobal) + if err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = convertToKeyExistsErr(err, i.index, i.ptbl.Meta()) + return err + } + logutil.Logger(ctx).Error("flush error", zap.Error(err)) + return err + } + return err +} + +func (*localImportExecutor) RunSubtask(ctx context.Context, _ *proto.Subtask) error { + logutil.Logger(ctx).Info("ingest index stage split subtask") + return nil +} + +func (*localImportExecutor) Cleanup(ctx context.Context) error { + logutil.Logger(ctx).Info("ingest index stage cleanup subtask exec env") + return nil +} + +func (*localImportExecutor) OnFinished(ctx context.Context, _ *proto.Subtask) error { + logutil.Logger(ctx).Info("ingest index stage finish subtask") + return nil +} + +func (*localImportExecutor) Rollback(ctx context.Context) error { + logutil.Logger(ctx).Info("ingest index stage rollback backfill add index task") + return nil +} diff --git a/ddl/backfilling_read_index.go b/ddl/backfilling_read_index.go new file mode 100644 index 0000000000000..1d86f9a83f737 --- /dev/null +++ b/ddl/backfilling_read_index.go @@ -0,0 +1,274 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import ( + "context" + "encoding/hex" + "encoding/json" + "sync" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" + "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/disttask/framework/scheduler/execute" + "github.com/pingcap/tidb/disttask/operator" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type readIndexExecutor struct { + d *ddl + job *model.Job + index *model.IndexInfo + ptbl table.PhysicalTable + jc *JobContext + + cloudStorageURI string + + bc ingest.BackendCtx + summary *execute.Summary + + subtaskSummary sync.Map // subtaskID => readIndexSummary +} + +type readIndexSummary struct { + minKey []byte + maxKey []byte + totalSize uint64 + dataFiles []string + statFiles []string + mu sync.Mutex +} + +func newReadIndexExecutor( + d *ddl, + job *model.Job, + index *model.IndexInfo, + ptbl table.PhysicalTable, + jc *JobContext, + bc ingest.BackendCtx, + summary *execute.Summary, + cloudStorageURI string, +) *readIndexExecutor { + return &readIndexExecutor{ + d: d, + job: job, + index: index, + ptbl: ptbl, + jc: jc, + bc: bc, + summary: summary, + cloudStorageURI: cloudStorageURI, + } +} + +func (*readIndexExecutor) Init(_ context.Context) error { + logutil.BgLogger().Info("read index stage init subtask exec env", + zap.String("category", "ddl")) + return nil +} + +func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error { + logutil.BgLogger().Info("read index stage run subtask", + zap.String("category", "ddl")) + + r.subtaskSummary.Store(subtask.ID, &readIndexSummary{}) + + d := r.d + sm := &BackfillSubTaskMeta{} + err := json.Unmarshal(subtask.Meta, sm) + if err != nil { + logutil.BgLogger().Error("unmarshal error", + zap.String("category", "ddl"), + zap.Error(err)) + return err + } + + startKey, endKey, tbl, err := r.getTableStartEndKey(sm) + if err != nil { + return err + } + + sessCtx, err := newSessCtx( + d.store, r.job.ReorgMeta.SQLMode, r.job.ReorgMeta.Location, r.job.ReorgMeta.ResourceGroupName) + if err != nil { + return err + } + + opCtx := NewOperatorCtx(ctx) + defer opCtx.Cancel() + totalRowCount := &atomic.Int64{} + + var pipe *operator.AsyncPipeline + if len(r.cloudStorageURI) > 0 { + pipe, err = r.buildExternalStorePipeline(opCtx, d, subtask.ID, sessCtx, tbl, startKey, endKey, totalRowCount) + } else { + pipe, err = r.buildLocalStorePipeline(opCtx, d, sessCtx, tbl, startKey, endKey, totalRowCount) + } + if err != nil { + return err + } + + err = pipe.Execute() + if err != nil { + return err + } + err = pipe.Close() + if opCtx.OperatorErr() != nil { + return opCtx.OperatorErr() + } + if err != nil { + return err + } + + r.summary.UpdateRowCount(subtask.ID, totalRowCount.Load()) + return nil +} + +func (*readIndexExecutor) Cleanup(ctx context.Context) error { + logutil.Logger(ctx).Info("read index stage cleanup subtask exec env", + zap.String("category", "ddl")) + return nil +} + +// MockDMLExecutionAddIndexSubTaskFinish is used to mock DML execution during distributed add index. +var MockDMLExecutionAddIndexSubTaskFinish func() + +func (r *readIndexExecutor) OnFinished(ctx context.Context, subtask *proto.Subtask) error { + failpoint.Inject("mockDMLExecutionAddIndexSubTaskFinish", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && MockDMLExecutionAddIndexSubTaskFinish != nil { + MockDMLExecutionAddIndexSubTaskFinish() + } + }) + if len(r.cloudStorageURI) == 0 { + return nil + } + // Rewrite the subtask meta to record statistics. + var subtaskMeta BackfillSubTaskMeta + err := json.Unmarshal(subtask.Meta, &subtaskMeta) + if err != nil { + return err + } + sum, _ := r.subtaskSummary.LoadAndDelete(subtask.ID) + s := sum.(*readIndexSummary) + subtaskMeta.MinKey = s.minKey + subtaskMeta.MaxKey = s.maxKey + subtaskMeta.TotalKVSize = s.totalSize + subtaskMeta.DataFiles = s.dataFiles + subtaskMeta.StatFiles = s.statFiles + logutil.Logger(ctx).Info("get key boundary on subtask finished", + zap.String("min", hex.EncodeToString(s.minKey)), + zap.String("max", hex.EncodeToString(s.maxKey)), + zap.Int("fileCount", len(s.dataFiles)), + zap.Uint64("totalSize", s.totalSize)) + meta, err := json.Marshal(subtaskMeta) + if err != nil { + return err + } + subtask.Meta = meta + return nil +} + +func (r *readIndexExecutor) Rollback(ctx context.Context) error { + logutil.Logger(ctx).Info("read index stage rollback backfill add index task", + zap.String("category", "ddl"), zap.Int64("jobID", r.job.ID)) + return nil +} + +func (r *readIndexExecutor) getTableStartEndKey(sm *BackfillSubTaskMeta) ( + start, end kv.Key, tbl table.PhysicalTable, err error) { + currentVer, err1 := getValidCurrentVersion(r.d.store) + if err1 != nil { + return nil, nil, nil, errors.Trace(err1) + } + if parTbl, ok := r.ptbl.(table.PartitionedTable); ok { + pid := sm.PhysicalTableID + start, end, err = getTableRange(r.jc, r.d.ddlCtx, parTbl.GetPartition(pid), currentVer.Ver, r.job.Priority) + if err != nil { + logutil.BgLogger().Error("get table range error", + zap.String("category", "ddl"), + zap.Error(err)) + return nil, nil, nil, err + } + tbl = parTbl.GetPartition(pid) + } else { + start, end = sm.StartKey, sm.EndKey + tbl = r.ptbl + } + return start, end, tbl, nil +} + +func (r *readIndexExecutor) buildLocalStorePipeline( + opCtx *OperatorCtx, + d *ddl, + sessCtx sessionctx.Context, + tbl table.PhysicalTable, + start, end kv.Key, + totalRowCount *atomic.Int64, +) (*operator.AsyncPipeline, error) { + ei, err := r.bc.Register(r.job.ID, r.index.ID, r.job.SchemaName, r.job.TableName) + if err != nil { + logutil.Logger(opCtx).Warn("cannot register new engine", zap.Error(err), + zap.Int64("job ID", r.job.ID), zap.Int64("index ID", r.index.ID)) + return nil, err + } + counter := metrics.BackfillTotalCounter.WithLabelValues( + metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O)) + return NewAddIndexIngestPipeline( + opCtx, d.store, d.sessPool, r.bc, ei, sessCtx, tbl, r.index, start, end, totalRowCount, counter) +} + +func (r *readIndexExecutor) buildExternalStorePipeline( + opCtx *OperatorCtx, + d *ddl, + subtaskID int64, + sessCtx sessionctx.Context, + tbl table.PhysicalTable, + start, end kv.Key, + totalRowCount *atomic.Int64, +) (*operator.AsyncPipeline, error) { + onClose := func(summary *external.WriterSummary) { + sum, _ := r.subtaskSummary.Load(subtaskID) + s := sum.(*readIndexSummary) + s.mu.Lock() + if len(s.minKey) == 0 || summary.Min.Cmp(s.minKey) < 0 { + s.minKey = summary.Min.Clone() + } + if len(s.maxKey) == 0 || summary.Max.Cmp(s.maxKey) > 0 { + s.maxKey = summary.Max.Clone() + } + s.totalSize += summary.TotalSize + for _, f := range summary.MultipleFilesStats { + for _, filename := range f.Filenames { + s.dataFiles = append(s.dataFiles, filename[0]) + s.statFiles = append(s.statFiles, filename[1]) + } + } + s.mu.Unlock() + } + return NewWriteIndexToExternalStoragePipeline( + opCtx, d.store, r.cloudStorageURI, r.d.sessPool, sessCtx, r.job.ID, subtaskID, + tbl, r.index, start, end, totalRowCount, onClose) +} From cf932e39a9799a2a56c1ee323441d19c305e665d Mon Sep 17 00:00:00 2001 From: tangenta Date: Wed, 13 Sep 2023 20:57:04 +0800 Subject: [PATCH 07/10] fix TestOnRunnableTasks --- disttask/framework/scheduler/manager_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/disttask/framework/scheduler/manager_test.go b/disttask/framework/scheduler/manager_test.go index 04bff5e8b82b5..5a472037a7b50 100644 --- a/disttask/framework/scheduler/manager_test.go +++ b/disttask/framework/scheduler/manager_test.go @@ -109,6 +109,7 @@ func TestOnRunnableTasks(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(false, errors.New("get subtask failed")) + mockInternalScheduler.EXPECT().Close() m.onRunnableTasks(context.Background(), []*proto.Task{task}) // no subtask From b498374131f948511b4c49116333a7d26b38b180 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 14 Sep 2023 12:02:34 +0800 Subject: [PATCH 08/10] address comments --- ddl/BUILD.bazel | 6 ++-- ddl/ingest/backend_mgr.go | 2 +- ddl/stage_scheduler.go | 35 +++++++++----------- disttask/framework/mock/scheduler_mock.go | 14 ++++++++ disttask/framework/scheduler/interface.go | 3 +- disttask/framework/scheduler/manager.go | 5 +++ disttask/framework/scheduler/manager_test.go | 3 ++ disttask/framework/scheduler/scheduler.go | 4 +++ util/generic/sync_map.go | 18 ++++------ 9 files changed, 55 insertions(+), 35 deletions(-) diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 5325601fee7ec..f7fda21cb115e 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -13,7 +13,10 @@ go_library( srcs = [ "backfilling.go", "backfilling_dispatcher.go", + "backfilling_import_cloud.go", + "backfilling_import_local.go", "backfilling_operator.go", + "backfilling_read_index.go", "backfilling_scheduler.go", "callback.go", "cluster.go", @@ -47,9 +50,6 @@ go_library( "schema.go", "sequence.go", "split_region.go", - "stage_ingest_index.go", - "stage_merge_sort.go", - "stage_read_index.go", "stage_scheduler.go", "stat.go", "table.go", diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index d70875a56bb12..f63e7e91dcee9 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -160,7 +160,7 @@ func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, cfg // Unregister removes a backend context from the backend context manager. func (m *litBackendCtxMgr) Unregister(jobID int64) { - bc, exist := m.SyncMap.LoadAndDelete(jobID) + bc, exist := m.SyncMap.Delete(jobID) if !exist { return } diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index aacc1646d84fe..b7599f0babfa7 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -96,6 +96,8 @@ const BackfillTaskType = "backfill" type backfillDistScheduler struct { *scheduler.BaseScheduler d *ddl + task *proto.Task + taskTable scheduler.TaskTable backendCtx ingest.BackendCtx jobID int64 } @@ -104,35 +106,38 @@ func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, s := &backfillDistScheduler{ BaseScheduler: scheduler.NewBaseScheduler(ctx, id, task.ID, taskTable), d: d, + task: task, + taskTable: taskTable, } s.BaseScheduler.Extension = s + return s +} - wrapErr := func(err error) scheduler.Scheduler { - s.BaseScheduler.Extension = &failedExtension{err: err} - return s - } +func (s *backfillDistScheduler) Init(ctx context.Context) error { + s.BaseScheduler.Init(ctx) + d := s.d bgm := &BackfillGlobalMeta{} - err := json.Unmarshal(task.Meta, bgm) + err := json.Unmarshal(s.task.Meta, bgm) if err != nil { - return wrapErr(err) + return errors.Trace(err) } job := &bgm.Job _, tbl, err := d.getTableByTxn(d.store, job.SchemaID, job.TableID) if err != nil { - return wrapErr(err) + return errors.Trace(err) } idx := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleID) if idx == nil { - return wrapErr(errors.Trace(errors.New("index info not found"))) + return errors.Trace(errors.New("index info not found")) } bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, job.ReorgMeta.ResourceGroupName) if err != nil { - return wrapErr(errors.Trace(err)) + return errors.Trace(err) } s.backendCtx = bc s.jobID = job.ID - return s + return nil } func (s *backfillDistScheduler) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) { @@ -148,13 +153,5 @@ func (s *backfillDistScheduler) Close() { if s.backendCtx != nil { ingest.LitBackCtxMgr.Unregister(s.jobID) } -} - -type failedExtension struct { - err error -} - -func (e *failedExtension) GetSubtaskExecutor(_ context.Context, _ *proto.Task, _ *execute.Summary) ( - execute.SubtaskExecutor, error) { - return nil, e.err + s.BaseScheduler.Close() } diff --git a/disttask/framework/mock/scheduler_mock.go b/disttask/framework/mock/scheduler_mock.go index b2e5355770512..e4b99663e46d4 100644 --- a/disttask/framework/mock/scheduler_mock.go +++ b/disttask/framework/mock/scheduler_mock.go @@ -293,6 +293,20 @@ func (mr *MockSchedulerMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockScheduler)(nil).Close)) } +// Init mocks base method. +func (m *MockScheduler) Init(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockSchedulerMockRecorder) Init(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockScheduler)(nil).Init), arg0) +} + // Rollback mocks base method. func (m *MockScheduler) Rollback(arg0 context.Context, arg1 *proto.Task) error { m.ctrl.T.Helper() diff --git a/disttask/framework/scheduler/interface.go b/disttask/framework/scheduler/interface.go index 2c2686522263f..fe20abea7a69e 100644 --- a/disttask/framework/scheduler/interface.go +++ b/disttask/framework/scheduler/interface.go @@ -45,8 +45,9 @@ type Pool interface { } // Scheduler is the subtask scheduler for a task. -// each task type should implement this interface. +// Each task type should implement this interface. type Scheduler interface { + Init(context.Context) error Run(context.Context, *proto.Task) error Rollback(context.Context, *proto.Task) error Close() diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 2d530a51bfc1c..edf9c47b53c26 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -263,6 +263,11 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { return } scheduler := factory(ctx, m.id, task, m.taskTable) + err := scheduler.Init(ctx) + if err != nil { + m.onError(err) + return + } defer scheduler.Close() for { select { diff --git a/disttask/framework/scheduler/manager_test.go b/disttask/framework/scheduler/manager_test.go index 5a472037a7b50..de0dfa341f00a 100644 --- a/disttask/framework/scheduler/manager_test.go +++ b/disttask/framework/scheduler/manager_test.go @@ -106,6 +106,7 @@ func TestOnRunnableTasks(t *testing.T) { }) // get subtask failed + mockInternalScheduler.EXPECT().Init(gomock.Any()).Return(nil) mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(false, errors.New("get subtask failed")) @@ -178,6 +179,7 @@ func TestManager(t *testing.T) { Return([]*proto.Task{task1, task2}, nil).AnyTimes() mockTaskTable.EXPECT().GetGlobalTasksInStates(proto.TaskStateReverting). Return([]*proto.Task{task2}, nil).AnyTimes() + mockInternalScheduler.EXPECT().Init(gomock.Any()).Return(nil) // task1 mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID1, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). @@ -203,6 +205,7 @@ func TestManager(t *testing.T) { mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). Return(true, nil) + mockInternalScheduler.EXPECT().Init(gomock.Any()).Return(nil) mockInternalScheduler.EXPECT().Rollback(gomock.Any(), task2).Return(nil) mockTaskTable.EXPECT().HasSubtasksInStates(id, taskID2, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRevertPending}). diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index ac40b6522bd8b..da44ea8ccb4d3 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -96,6 +96,10 @@ func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup }() } +func (s *BaseScheduler) Init(_ context.Context) error { + return nil +} + // Run runs the scheduler task. func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) { defer func() { diff --git a/util/generic/sync_map.go b/util/generic/sync_map.go index 1690d2bdebd90..26ca29de21c50 100644 --- a/util/generic/sync_map.go +++ b/util/generic/sync_map.go @@ -44,22 +44,18 @@ func (m *SyncMap[K, V]) Load(key K) (V, bool) { return val, exist } -// LoadAndDelete loads and deletes a key value atomically. -func (m *SyncMap[K, V]) LoadAndDelete(key K) (V, bool) { +// Delete deletes the value for a key, returning the previous value if any. +// The exist result reports whether the key was present. +func (m *SyncMap[K, V]) Delete(key K) (val V, exist bool) { m.mu.Lock() - val, exist := m.item[key] - delete(m.item, key) + val, exist = m.item[key] + if exist { + delete(m.item, key) + } m.mu.Unlock() return val, exist } -// Delete deletes a key value. -func (m *SyncMap[K, V]) Delete(key K) { - m.mu.Lock() - delete(m.item, key) - m.mu.Unlock() -} - // Keys returns all the keys in the map. func (m *SyncMap[K, V]) Keys() []K { ret := make([]K, 0, len(m.item)) From 1785de57f50d10526fc62af8e1b4fe92f90a059e Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 14 Sep 2023 13:42:59 +0800 Subject: [PATCH 09/10] fix linter --- ddl/stage_scheduler.go | 7 +++++-- disttask/framework/scheduler/scheduler.go | 3 ++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index b7599f0babfa7..b46c4ae0521f9 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -114,11 +114,14 @@ func newBackfillDistScheduler(ctx context.Context, id string, task *proto.Task, } func (s *backfillDistScheduler) Init(ctx context.Context) error { - s.BaseScheduler.Init(ctx) + err := s.BaseScheduler.Init(ctx) + if err != nil { + return err + } d := s.d bgm := &BackfillGlobalMeta{} - err := json.Unmarshal(s.task.Meta, bgm) + err = json.Unmarshal(s.task.Meta, bgm) if err != nil { return errors.Trace(err) } diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index da44ea8ccb4d3..96a41e6d350b7 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -96,7 +96,8 @@ func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup }() } -func (s *BaseScheduler) Init(_ context.Context) error { +// Init implements the Scheduler interface. +func (*BaseScheduler) Init(_ context.Context) error { return nil } From 88b9d5f326c81d35ea12806d3f8305a0ef32cdc6 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 14 Sep 2023 13:52:41 +0800 Subject: [PATCH 10/10] fix typo --- ddl/stage_scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/stage_scheduler.go b/ddl/stage_scheduler.go index b46c4ae0521f9..833750e6ef50a 100644 --- a/ddl/stage_scheduler.go +++ b/ddl/stage_scheduler.go @@ -52,7 +52,7 @@ type BackfillSubTaskMeta struct { TotalKVSize uint64 `json:"total_kv_size"` } -// NewBackfillSubtaskExecutor creates a new backfill backfill subtask executor. +// NewBackfillSubtaskExecutor creates a new backfill subtask executor. func NewBackfillSubtaskExecutor(_ context.Context, taskMeta []byte, d *ddl, bc ingest.BackendCtx, stage int64, summary *execute.Summary) (execute.SubtaskExecutor, error) { bgm := &BackfillGlobalMeta{}