From b34b940fffaefb0e4de767c119ae77aaee02c1f8 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 26 Sep 2023 18:18:39 +0800 Subject: [PATCH 01/27] fix --- disttask/framework/scheduler/scheduler.go | 93 +++++++++++++++---- .../framework/scheduler/scheduler_test.go | 62 ++++++++----- 2 files changed, 115 insertions(+), 40 deletions(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 5cec67917e43b..47a67e54289d8 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -22,10 +22,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/disttask/framework/dispatcher" + "github.com/pingcap/tidb/disttask/framework/handle" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/scheduler/execute" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/util/backoff" + "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -107,7 +111,7 @@ func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) { if r := recover(); r != nil { logutil.Logger(ctx).Error("BaseScheduler panicked", zap.Any("recover", r), zap.Stack("stack")) err4Panic := errors.Errorf("%v", r) - err1 := s.taskTable.UpdateErrorToSubtask(s.id, task.ID, err4Panic) + err1 := s.updateErrorToSubtask(ctx, task.ID, err4Panic) if err == nil { err = err1 } @@ -117,7 +121,7 @@ func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) { if s.mu.handled { return err } - return s.taskTable.UpdateErrorToSubtask(s.id, task.ID, err) + return s.updateErrorToSubtask(ctx, task.ID, err) } func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { @@ -127,6 +131,7 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { } runCtx, runCancel := context.WithCancel(ctx) defer runCancel() + s.registerCancelFunc(runCancel) s.resetError() logutil.Logger(s.logCtx).Info("scheduler run a step", zap.Any("step", task.Step), zap.Any("concurrency", task.Concurrency)) @@ -190,6 +195,7 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { s.startSubtask(subtask.ID) if err := s.getError(); err != nil { logutil.Logger(s.logCtx).Warn("startSubtask meets error", zap.Error(err)) + s.resetError() continue } failpoint.Inject("mockCleanScheduler", func() { @@ -215,14 +221,13 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, scheduler execute.Subtas }) if err != nil { s.onError(err) - if errors.Cause(err) == context.Canceled { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, s.getError()) - } else { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError()) - } - s.markErrorHandled() + } + + finished := s.markTaskCancelOrFailed(subtask) + if finished { return } + if ctx.Err() != nil { s.onError(ctx.Err()) return @@ -291,18 +296,19 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, scheduler execute s.onError(context.Canceled) } }) - if err := s.getError(); err != nil { - if errors.Cause(err) == context.Canceled { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil) - } else { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError()) - } - s.markErrorHandled() + + finished := s.markTaskCancelOrFailed(subtask) + if finished { return } - if err := s.taskTable.FinishSubtask(subtask.ID, subtask.Meta); err != nil { - s.onError(err) + + s.finishSubtask(ctx, subtask) + + finished = s.markTaskCancelOrFailed(subtask) + if finished { + return } + failpoint.Inject("syncAfterSubtaskFinish", func() { TestSyncChan <- struct{}{} <-TestSyncChan @@ -433,6 +439,8 @@ func (s *BaseScheduler) markErrorHandled() { s.mu.handled = true } +// func (s *BaseScheduler) + func (s *BaseScheduler) getError() error { s.mu.RLock() defer s.mu.RUnlock() @@ -454,8 +462,57 @@ func (s *BaseScheduler) startSubtask(id int64) { } func (s *BaseScheduler) updateSubtaskStateAndError(subtaskID int64, state string, subTaskErr error) { - err := s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr) + // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes + logger := logutil.Logger(s.logCtx) + backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) + retryTimes := dispatcher.RetrySQLTimes + if intest.InTest { + retryTimes = 2 + } + ctx := context.Background() + err := handle.RunWithRetry(ctx, retryTimes, backoffer, logger, + func(ctx context.Context) (bool, error) { + return true, s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr) + }, + ) + if err != nil { + s.onError(err) + } +} + +func (s *BaseScheduler) finishSubtask(ctx context.Context, subtask *proto.Subtask) { + logger := logutil.Logger(s.logCtx) + backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) + err := handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger, + func(ctx context.Context) (bool, error) { + return true, s.taskTable.FinishSubtask(subtask.ID, subtask.Meta) + }, + ) if err != nil { s.onError(err) } } + +func (s *BaseScheduler) markTaskCancelOrFailed(subtask *proto.Subtask) bool { + if err := s.getError(); err != nil { + if errors.Cause(err) == context.Canceled { + s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil) + } else { + s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError()) + } + s.markErrorHandled() + return true + } + return false +} + +func (s *BaseScheduler) updateErrorToSubtask(ctx context.Context, taskID int64, err error) error { + logger := logutil.Logger(s.logCtx) + backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) + err1 := handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger, + func(ctx context.Context) (bool, error) { + return true, s.taskTable.UpdateErrorToSubtask(s.id, taskID, err) + }, + ) + return err1 +} diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index 05a675aca4ef4..1f3d66e7c4983 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -45,43 +45,38 @@ func TestSchedulerRun(t *testing.T) { // 1. no scheduler constructor schedulerRegisterErr := errors.Errorf("constructor of scheduler for key not found") - mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, schedulerRegisterErr) + mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, schedulerRegisterErr).Times(2) scheduler := NewBaseScheduler(ctx, "id", 1, mockSubtaskTable) scheduler.Extension = mockExtension - // UpdateErrorToSubtask won't return such errors, but since the error is not handled, - // it's saved by UpdateErrorToSubtask. - // here we use this to check the returned error of s.run. - forwardErrFn := func(_ string, _ int64, err error) error { - return err - } - mockSubtaskTable.EXPECT().UpdateErrorToSubtask(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(forwardErrFn).AnyTimes() - err := scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp}) + err := scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp}) require.EqualError(t, err, schedulerRegisterErr.Error()) + mockSubtaskTable.EXPECT().UpdateErrorToSubtask(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp}) + require.NoError(t, err) + // 2. init subtask exec env failed mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes() - // 2. init subtask exec env failed initErr := errors.New("init error") mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(initErr) - err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp}) + err = scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp}) require.EqualError(t, err, initErr.Error()) var taskID int64 = 1 var concurrency uint64 = 10 - // 5. run subtask failed + // 3. run subtask failed runSubtaskErr := errors.New("run subtask error") mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetGlobalTaskByID(gomock.Any()).Return(&proto.Task{ID: taskID, Step: proto.StepTwo}, nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) + err = scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) require.EqualError(t, err, runSubtaskErr.Error()) - // 6. run subtask success + // 4. run subtask success mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) @@ -89,12 +84,12 @@ func TestSchedulerRun(t *testing.T) { mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().FinishSubtask(int64(1), gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(nil, nil) - mockSubtaskTable.EXPECT().GetGlobalTaskByID(gomock.Any()).Return(&proto.Task{ID: taskID, Step: proto.StepTwo}, nil) + mockSubtaskTable.EXPECT().GetGlobalTaskByID(gomock.Any()).Return(nil, nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) require.NoError(t, err) - // 7. run subtask one by one + // 5. run subtask one by one mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(int64(1)).Return(nil) @@ -110,9 +105,10 @@ func TestSchedulerRun(t *testing.T) { mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(nil, nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + mockSubtaskTable.EXPECT().GetGlobalTaskByID(gomock.Any()).Return(nil, nil) err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) require.NoError(t, err) - // 8. cancel + // 6. cancel mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) @@ -124,7 +120,7 @@ func TestSchedulerRun(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) + err = scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) require.EqualError(t, err, context.Canceled.Error()) }() time.Sleep(time.Second) @@ -251,32 +247,54 @@ func TestScheduler(t *testing.T) { mockSubtaskTable := mock.NewMockTaskTable(ctrl) mockSubtaskExecutor := mockexecute.NewMockSubtaskExecutor(ctrl) mockExtension := mock.NewMockExtension(ctrl) + mockSubtaskTable.EXPECT().IsSchedulerCanceled(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(mockSubtaskExecutor, nil).AnyTimes() scheduler := NewBaseScheduler(ctx, "id", 1, mockSubtaskTable) scheduler.Extension = mockExtension - // run failed + // 1. run failed. runSubtaskErr := errors.New("run subtask error") mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetSubtaskInStates("id", proto.StepOne, taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) + // mock sql failed + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(errors.New("update error")) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err := scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) + err := scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) require.EqualError(t, err, runSubtaskErr.Error()) - // rollback success + // 2. rollback success. mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(&proto.Subtask{ID: 1, Type: tp}, nil) + // mock sql failed + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(errors.New("update error")) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(nil) mockSubtaskExecutor.EXPECT().Rollback(gomock.Any()).Return(nil) + // mock sql failed + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverted, nil).Return(errors.New("update error")) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverted, nil).Return(nil) err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.NoError(t, err) + + // 3. GetSubtaskInStates fail in scheduler.run + mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) + mockSubtaskTable.EXPECT().GetSubtaskInStates("id", proto.StepOne, taskID, + []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, errors.New("mock get fail")) + mockSubtaskTable.EXPECT().GetSubtaskInStates("id", proto.StepOne, taskID, + []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) + mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) + mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) + // mock sql failed + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(errors.New("update error")) + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil) + mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + err = scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) + require.EqualError(t, err, runSubtaskErr.Error()) } From ecedd735152834197eb59cf2e5e67ae93ea1444e Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 27 Sep 2023 10:57:15 +0800 Subject: [PATCH 02/27] fix --- disttask/framework/scheduler/scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index 9365aacc04517..83df53564d0a6 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -181,7 +181,7 @@ func TestSchedulerRollback(t *testing.T) { []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(&proto.Subtask{ID: 1}, nil) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(updateSubtaskErr) + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(updateSubtaskErr).Times(2) err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.EqualError(t, err, updateSubtaskErr.Error()) @@ -263,7 +263,7 @@ func TestScheduler(t *testing.T) { runSubtaskErr := errors.New("run subtask error") mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, - []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 1, Type: tp, Step: proto.StepOne}}, nil) + []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 1, Type: tp, Step: proto.StepOne}}, nil).AnyTimes() mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", proto.StepOne, taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) From 79416cc5455cfc17343d50bf11cb12f1bb7c42f1 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 27 Sep 2023 11:36:06 +0800 Subject: [PATCH 03/27] fix test --- disttask/framework/scheduler/scheduler_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index acd988383c55d..598e138572f8c 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -104,8 +104,8 @@ func TestSchedulerRun(t *testing.T) { mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().FinishSubtask(int64(1), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 2, Type: tp, Step: proto.StepOne}}, nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(nil, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(nil, nil) mockSubtaskTable.EXPECT().GetGlobalTaskByID(gomock.Any()).Return(&proto.Task{ID: taskID, Step: proto.StepTwo}, nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = scheduler.Run(runCtx, task) @@ -252,7 +252,7 @@ func TestSchedulerRollback(t *testing.T) { unfinishedNormalSubtaskStates...).Return(nil, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, unfinishedRevertSubtaskStates...).Return(&proto.Subtask{ID: 1, State: proto.TaskStateRevertPending}, nil) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(updateSubtaskErr) + mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(updateSubtaskErr).Times(2) err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.EqualError(t, err, updateSubtaskErr.Error()) @@ -373,10 +373,13 @@ func TestScheduler(t *testing.T) { // 3. GetSubtaskInStates fail in scheduler.run mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) + mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ + ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}}, nil) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", proto.StepOne, taskID, - []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, errors.New("mock get fail")) + unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, errors.New("mock get fail")) mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", proto.StepOne, taskID, - []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) + unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) // mock sql failed From 7edbcedbad0c292e814e72a81c574033c5837049 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 27 Sep 2023 11:58:14 +0800 Subject: [PATCH 04/27] fix --- disttask/framework/scheduler/BUILD.bazel | 3 ++ disttask/framework/scheduler/scheduler.go | 30 +++++++++++-------- .../framework/scheduler/scheduler_test.go | 22 -------------- 3 files changed, 21 insertions(+), 34 deletions(-) diff --git a/disttask/framework/scheduler/BUILD.bazel b/disttask/framework/scheduler/BUILD.bazel index b068ea7bb840b..ad38f0a1406f6 100644 --- a/disttask/framework/scheduler/BUILD.bazel +++ b/disttask/framework/scheduler/BUILD.bazel @@ -12,6 +12,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//config", + "//disttask/framework/dispatcher", + "//disttask/framework/handle", "//disttask/framework/proto", "//disttask/framework/scheduler/execute", "//disttask/framework/storage", @@ -19,6 +21,7 @@ go_library( "//metrics", "//resourcemanager/pool/spool", "//resourcemanager/util", + "//util/backoff", "//util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 798de7a174c75..6aae8e389d574 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/backoff" - "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -217,7 +216,7 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { } } else { // subtask.State == proto.TaskStatePending - s.startSubtaskAndUpdateState(subtask) + s.startSubtaskAndUpdateState(ctx, subtask) if err := s.getError(); err != nil { logutil.Logger(s.logCtx).Warn("startSubtaskAndUpdateState meets error", zap.Error(err)) continue @@ -486,13 +485,10 @@ func (s *BaseScheduler) resetError() { s.mu.handled = false } -func (s *BaseScheduler) startSubtaskAndUpdateState(subtask *proto.Subtask) { +func (s *BaseScheduler) startSubtaskAndUpdateState(ctx context.Context, subtask *proto.Subtask) { metrics.DecDistTaskSubTaskCnt(subtask) metrics.EndDistTaskSubTask(subtask) - err := s.taskTable.StartSubtask(subtask.ID) - if err != nil { - s.onError(err) - } + s.startSubtask(ctx, subtask.ID) subtask.State = proto.TaskStateRunning metrics.IncDistTaskSubTaskCnt(subtask) metrics.StartDistTaskSubTask(subtask) @@ -502,12 +498,8 @@ func (s *BaseScheduler) updateSubtaskStateAndErrorImpl(subtaskID int64, state st // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes logger := logutil.Logger(s.logCtx) backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) - retryTimes := dispatcher.RetrySQLTimes - if intest.InTest { - retryTimes = 2 - } ctx := context.Background() - err := handle.RunWithRetry(ctx, retryTimes, backoffer, logger, + err := handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger, func(ctx context.Context) (bool, error) { return true, s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr) }, @@ -517,6 +509,20 @@ func (s *BaseScheduler) updateSubtaskStateAndErrorImpl(subtaskID int64, state st } } +func (s *BaseScheduler) startSubtask(ctx context.Context, subtaskID int64) { + // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes + logger := logutil.Logger(s.logCtx) + backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) + err := handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger, + func(ctx context.Context) (bool, error) { + return true, s.taskTable.StartSubtask(subtaskID) + }, + ) + if err != nil { + s.onError(err) + } +} + func (s *BaseScheduler) finishSubtask(ctx context.Context, subtask *proto.Subtask) error { logger := logutil.Logger(s.logCtx) backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index 598e138572f8c..06edf638945e5 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -351,8 +351,6 @@ func TestScheduler(t *testing.T) { ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) - // mock sql failed - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(errors.New("update error")) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err := scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) @@ -365,27 +363,7 @@ func TestScheduler(t *testing.T) { unfinishedRevertSubtaskStates...).Return(&proto.Subtask{ID: 1, Type: tp, State: proto.TaskStateRevertPending}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(nil) mockSubtaskExecutor.EXPECT().Rollback(gomock.Any()).Return(nil) - // mock sql failed - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverted, nil).Return(errors.New("update error")) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverted, nil).Return(nil) err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.NoError(t, err) - - // 3. GetSubtaskInStates fail in scheduler.run - mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ - ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}}, nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", proto.StepOne, taskID, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, errors.New("mock get fail")) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", proto.StepOne, taskID, - unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) - mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) - mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) - // mock sql failed - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(errors.New("update error")) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil) - mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) - err = scheduler.run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) - require.EqualError(t, err, runSubtaskErr.Error()) } From f2648eb09edfb7e01ad556412b541d79f8dc2749 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 27 Sep 2023 12:00:37 +0800 Subject: [PATCH 05/27] refine --- disttask/framework/scheduler/scheduler.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 6aae8e389d574..eaf53232c037c 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -129,7 +129,6 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { } runCtx, runCancel := context.WithCancel(ctx) defer runCancel() - s.registerCancelFunc(runCancel) s.resetError() logutil.Logger(s.logCtx).Info("scheduler run a step", zap.Any("step", task.Step), zap.Any("concurrency", task.Concurrency)) @@ -470,8 +469,6 @@ func (s *BaseScheduler) markErrorHandled() { s.mu.handled = true } -// func (s *BaseScheduler) - func (s *BaseScheduler) getError() error { s.mu.RLock() defer s.mu.RUnlock() From 224c1db687ec0e65d21606afa11822ac61d4fc5b Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 27 Sep 2023 14:44:51 +0800 Subject: [PATCH 06/27] fix --- disttask/framework/scheduler/manager.go | 2 +- disttask/framework/scheduler/scheduler.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 9dd809a52f35b..2addbb71490c2 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -378,6 +378,6 @@ func (m *Manager) onError(err error) { if err == nil { return } - + err = errors.Trace(err) logutil.Logger(m.logCtx).Error("task manager error", zap.Error(err)) } diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index eaf53232c037c..7a263bc276001 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -520,7 +520,7 @@ func (s *BaseScheduler) startSubtask(ctx context.Context, subtaskID int64) { } } -func (s *BaseScheduler) finishSubtask(ctx context.Context, subtask *proto.Subtask) error { +func (s *BaseScheduler) finishSubtask(ctx context.Context, subtask *proto.Subtask) { logger := logutil.Logger(s.logCtx) backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) err := handle.RunWithRetry(ctx, dispatcher.RetrySQLTimes, backoffer, logger, @@ -528,7 +528,9 @@ func (s *BaseScheduler) finishSubtask(ctx context.Context, subtask *proto.Subtas return true, s.taskTable.FinishSubtask(subtask.ID, subtask.Meta) }, ) - return err + if err != nil { + s.onError(err) + } } func (s *BaseScheduler) updateSubtaskStateAndError(subtask *proto.Subtask, state string, subTaskErr error) { From be2eb7a9a4b1842c24db9556811d729513b255d1 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 27 Sep 2023 17:05:21 +0800 Subject: [PATCH 07/27] add cancel cause --- disttask/framework/scheduler/manager.go | 3 +- disttask/framework/scheduler/scheduler.go | 47 +++++++++++-------- .../framework/scheduler/scheduler_test.go | 43 ++++++++--------- 3 files changed, 48 insertions(+), 45 deletions(-) diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 2addbb71490c2..6d00908363c00 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -32,8 +32,7 @@ import ( ) var ( - schedulerPoolSize int32 = 4 - subtaskExecutorPoolSize int32 = 10 + schedulerPoolSize int32 = 4 // same as dispatcher checkTime = 300 * time.Millisecond retrySQLTimes = 3 diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 7a263bc276001..3bd6cab8e6b3a 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -38,8 +38,17 @@ const ( DefaultCheckSubtaskCanceledInterval = 2 * time.Second ) -// TestSyncChan is used to sync the test. -var TestSyncChan = make(chan struct{}) +var ( + // CancelSubtaskErr is the cancel cause when cancelling subtasks. + CancelSubtaskErr = errors.New("cancel subtasks") + // FinishSubtaskErr is the cancel cause when scheduler successfully processed subtasks. + FinishSubtaskErr = errors.New("finish subtasks") + // FinishRollbackErr is the cancel cause when scheduler rollback successfully. + FinishRollbackErr = errors.New("finish rollback") + + // TestSyncChan is used to sync the test. + TestSyncChan = make(chan struct{}) +) // BaseScheduler is the base implementation of Scheduler. type BaseScheduler struct { @@ -56,7 +65,7 @@ type BaseScheduler struct { // handled indicates whether the error has been updated to one of the subtask. handled bool // runtimeCancel is used to cancel the Run/Rollback when error occurs. - runtimeCancel context.CancelFunc + runtimeCancel context.CancelCauseFunc } } @@ -71,7 +80,7 @@ func NewBaseScheduler(_ context.Context, id string, taskID int64, taskTable Task return schedulerImpl } -func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup, cancelFn context.CancelFunc) { +func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup, cancelFn context.CancelCauseFunc) { wg.Add(1) go func() { defer wg.Done() @@ -90,7 +99,7 @@ func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup if canceled { logutil.Logger(s.logCtx).Info("scheduler canceled") if cancelFn != nil { - cancelFn() + cancelFn(CancelSubtaskErr) } } } @@ -127,8 +136,8 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { s.onError(ctx.Err()) return s.getError() } - runCtx, runCancel := context.WithCancel(ctx) - defer runCancel() + runCtx, runCancel := context.WithCancelCause(ctx) + defer runCancel(FinishSubtaskErr) s.registerCancelFunc(runCancel) s.resetError() logutil.Logger(s.logCtx).Info("scheduler run a step", zap.Any("step", task.Step), zap.Any("concurrency", task.Concurrency)) @@ -240,14 +249,14 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, executor execute.Subtask err := executor.RunSubtask(ctx, subtask) failpoint.Inject("MockRunSubtaskCancel", func(val failpoint.Value) { if val.(bool) { - err = context.Canceled + err = CancelSubtaskErr } }) if err != nil { s.onError(err) } - finished := s.markTaskCancelOrFailed(subtask) + finished := s.markTaskCancelOrFailed(ctx, subtask) if finished { return } @@ -317,18 +326,18 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, executor execute. } failpoint.Inject("MockSubtaskFinishedCancel", func(val failpoint.Value) { if val.(bool) { - s.onError(context.Canceled) + s.onError(CancelSubtaskErr) } }) - finished := s.markTaskCancelOrFailed(subtask) + finished := s.markTaskCancelOrFailed(ctx, subtask) if finished { return } s.finishSubtaskAndUpdateState(ctx, subtask) - finished = s.markTaskCancelOrFailed(subtask) + finished = s.markTaskCancelOrFailed(ctx, subtask) if finished { return } @@ -341,8 +350,8 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, executor execute. // Rollback rollbacks the scheduler task. func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { - rollbackCtx, rollbackCancel := context.WithCancel(ctx) - defer rollbackCancel() + rollbackCtx, rollbackCancel := context.WithCancelCause(ctx) + defer rollbackCancel(FinishRollbackErr) s.registerCancelFunc(rollbackCancel) s.resetError() @@ -438,7 +447,7 @@ func runSummaryCollectLoop( return nil, func() {}, nil } -func (s *BaseScheduler) registerCancelFunc(cancel context.CancelFunc) { +func (s *BaseScheduler) registerCancelFunc(cancel context.CancelCauseFunc) { s.mu.Lock() defer s.mu.Unlock() s.mu.runtimeCancel = cancel @@ -459,7 +468,7 @@ func (s *BaseScheduler) onError(err error) { } if s.mu.runtimeCancel != nil { - s.mu.runtimeCancel() + s.mu.runtimeCancel(err) } } @@ -552,11 +561,11 @@ func (s *BaseScheduler) finishSubtaskAndUpdateState(ctx context.Context, subtask metrics.IncDistTaskSubTaskCnt(subtask) } -func (s *BaseScheduler) markTaskCancelOrFailed(subtask *proto.Subtask) bool { +func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *proto.Subtask) bool { if err := s.getError(); err != nil { - if errors.Cause(err) == context.Canceled { + if ctx.Err() != nil && context.Cause(ctx).Error() == "cancel subtasks" { s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) - } else { + } else if errors.Cause(err) != context.Canceled { s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError()) } s.markErrorHandled() diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index 06edf638945e5..17ecaba22e579 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -16,9 +16,7 @@ package scheduler import ( "context" - "sync" "testing" - "time" "github.com/pingcap/tidb/disttask/framework/mock" mockexecute "github.com/pingcap/tidb/disttask/framework/mock/execute" @@ -188,20 +186,27 @@ func TestSchedulerRun(t *testing.T) { unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) - mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(context.Canceled) + mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(CancelSubtaskErr) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateCanceled, gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + err = scheduler.Run(runCtx, task) + require.EqualError(t, err, CancelSubtaskErr.Error()) + + // 7. RunSubtask return context.Canceled + mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return([]*proto.Subtask{{ + ID: 2, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}}, nil) + mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, + unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ + ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}, nil) + mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) + mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(context.Canceled) + mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) + err = scheduler.Run(runCtx, task) + require.EqualError(t, err, context.Canceled.Error()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - err = scheduler.Run(runCtx, task) - require.EqualError(t, err, context.Canceled.Error()) - }() - time.Sleep(time.Second) runCancel() - wg.Wait() } func TestSchedulerRollback(t *testing.T) { @@ -246,17 +251,7 @@ func TestSchedulerRollback(t *testing.T) { err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.NoError(t, err) - // 4. update subtask error - updateSubtaskErr := errors.New("update subtask error") - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, - unfinishedNormalSubtaskStates...).Return(nil, nil) - mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, - unfinishedRevertSubtaskStates...).Return(&proto.Subtask{ID: 1, State: proto.TaskStateRevertPending}, nil) - mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(updateSubtaskErr).Times(2) - err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) - require.EqualError(t, err, updateSubtaskErr.Error()) - - // 5. rollback failed + // 4. rollback failed rollbackErr := errors.New("rollback error") mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(nil, nil) @@ -268,7 +263,7 @@ func TestSchedulerRollback(t *testing.T) { err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.EqualError(t, err, rollbackErr.Error()) - // 6. rollback success + // 5. rollback success mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ID: 1}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(int64(1), proto.TaskStateCanceled, nil).Return(nil) From 2c38a4f90647ae6a15afaa8a3c46cbfb38d0e5e6 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 27 Sep 2023 17:10:40 +0800 Subject: [PATCH 08/27] add cancel cause --- disttask/framework/scheduler/scheduler.go | 22 +++++++++---------- .../framework/scheduler/scheduler_test.go | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 3bd6cab8e6b3a..cd88915db718f 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -39,12 +39,12 @@ const ( ) var ( - // CancelSubtaskErr is the cancel cause when cancelling subtasks. - CancelSubtaskErr = errors.New("cancel subtasks") - // FinishSubtaskErr is the cancel cause when scheduler successfully processed subtasks. - FinishSubtaskErr = errors.New("finish subtasks") - // FinishRollbackErr is the cancel cause when scheduler rollback successfully. - FinishRollbackErr = errors.New("finish rollback") + // ErrCancelSubtask is the cancel cause when cancelling subtasks. + ErrCancelSubtask = errors.New("cancel subtasks") + // ErrFinishSubtask is the cancel cause when scheduler successfully processed subtasks. + ErrFinishSubtask = errors.New("finish subtasks") + // ErrFinishRollback is the cancel cause when scheduler rollback successfully. + ErrFinishRollback = errors.New("finish rollback") // TestSyncChan is used to sync the test. TestSyncChan = make(chan struct{}) @@ -99,7 +99,7 @@ func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup if canceled { logutil.Logger(s.logCtx).Info("scheduler canceled") if cancelFn != nil { - cancelFn(CancelSubtaskErr) + cancelFn(ErrCancelSubtask) } } } @@ -137,7 +137,7 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { return s.getError() } runCtx, runCancel := context.WithCancelCause(ctx) - defer runCancel(FinishSubtaskErr) + defer runCancel(ErrFinishSubtask) s.registerCancelFunc(runCancel) s.resetError() logutil.Logger(s.logCtx).Info("scheduler run a step", zap.Any("step", task.Step), zap.Any("concurrency", task.Concurrency)) @@ -249,7 +249,7 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, executor execute.Subtask err := executor.RunSubtask(ctx, subtask) failpoint.Inject("MockRunSubtaskCancel", func(val failpoint.Value) { if val.(bool) { - err = CancelSubtaskErr + err = ErrCancelSubtask } }) if err != nil { @@ -326,7 +326,7 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, executor execute. } failpoint.Inject("MockSubtaskFinishedCancel", func(val failpoint.Value) { if val.(bool) { - s.onError(CancelSubtaskErr) + s.onError(ErrCancelSubtask) } }) @@ -351,7 +351,7 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, executor execute. // Rollback rollbacks the scheduler task. func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { rollbackCtx, rollbackCancel := context.WithCancelCause(ctx) - defer rollbackCancel(FinishRollbackErr) + defer rollbackCancel(ErrFinishRollback) s.registerCancelFunc(rollbackCancel) s.resetError() diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index 17ecaba22e579..0f3707fee0618 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -186,11 +186,11 @@ func TestSchedulerRun(t *testing.T) { unfinishedNormalSubtaskStates...).Return(&proto.Subtask{ ID: 1, Type: tp, Step: proto.StepOne, State: proto.TaskStatePending}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) - mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(CancelSubtaskErr) + mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(ErrCancelSubtask) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateCanceled, gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = scheduler.Run(runCtx, task) - require.EqualError(t, err, CancelSubtaskErr.Error()) + require.EqualError(t, err, ErrCancelSubtask.Error()) // 7. RunSubtask return context.Canceled mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, From f64af122ec170297f6141c89b253b50781672085 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Sun, 8 Oct 2023 10:43:38 +0800 Subject: [PATCH 09/27] retryable error --- disttask/framework/dispatcher/dispatcher.go | 6 +-- disttask/framework/scheduler/scheduler.go | 43 +++++++++++++++++++++ 2 files changed, 46 insertions(+), 3 deletions(-) diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index c87bd0a92a4cd..4580db6be05e9 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -407,7 +407,7 @@ func (d *BaseDispatcher) replaceDeadNodesIfAny() error { func (d *BaseDispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) { prevState := d.Task.State d.Task.State = taskState - if !VerifyTaskStateTransform(prevState, taskState) { + if !d.VerifyTaskStateTransform(prevState, taskState) { return errors.Errorf("invalid task state transform, from %s to %s", prevState, taskState) } @@ -697,7 +697,7 @@ func (d *BaseDispatcher) WithNewTxn(ctx context.Context, fn func(se sessionctx.C } // VerifyTaskStateTransform verifies whether the task state transform is valid. -func VerifyTaskStateTransform(from, to string) bool { +func (d *BaseDispatcher) VerifyTaskStateTransform(from, to string) bool { rules := map[string][]string{ proto.TaskStatePending: { proto.TaskStateRunning, @@ -739,7 +739,7 @@ func VerifyTaskStateTransform(from, to string) bool { proto.TaskStateRevertPending: {}, proto.TaskStateReverted: {}, } - logutil.BgLogger().Info("task state transform", zap.String("from", from), zap.String("to", to)) + logutil.Logger(d.logCtx).Info("task state transform", zap.String("from", from), zap.String("to", to)) if from == to { return true diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index cd88915db718f..6c70f290ddad5 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -16,9 +16,11 @@ package scheduler import ( "context" + "strings" "sync" "time" + "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/disttask/framework/dispatcher" @@ -27,6 +29,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/scheduler/execute" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/backoff" "github.com/pingcap/tidb/util/logutil" @@ -561,10 +564,50 @@ func (s *BaseScheduler) finishSubtaskAndUpdateState(ctx context.Context, subtask metrics.IncDistTaskSubTaskCnt(subtask) } +var ( + // Retryable1105Msgs list the error messages of some retryable error with `1105` code (`ErrUnknown`). + Retryable1105Msgs = []string{ + "Information schema is out of date", + "Information schema is changed", + } +) + +func IsRetryableError(err error) bool { + err = errors.Cause(err) // check the original error + mysqlErr, ok := err.(*mysql.MySQLError) + if !ok { + return false + } + + switch mysqlErr.Number { + case errno.ErrLockDeadlock: // https://dev.mysql.com/doc/refman/5.7/en/innodb-deadlocks.html + return true // retryable error in MySQL + case errno.ErrPDServerTimeout, + errno.ErrTiKVServerBusy, + errno.ErrResolveLockTimeout, + errno.ErrInfoSchemaExpired, + errno.ErrInfoSchemaChanged, + errno.ErrWriteConflictInTiDB, + errno.ErrTxnRetryable, + errno.ErrWriteConflict, + errno.ErrColumnInChange, + errno.ErrRegionUnavailable: + return true // retryable error in TiDB + case errno.ErrUnknown: // the old version of TiDB uses `1105` frequently, this should be compatible. + for _, msg := range Retryable1105Msgs { + if strings.Contains(mysqlErr.Message, msg) { + return true + } + } + } + + return false +} func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *proto.Subtask) bool { if err := s.getError(); err != nil { if ctx.Err() != nil && context.Cause(ctx).Error() == "cancel subtasks" { s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) + } else if IsRetryableError(err) { } else if errors.Cause(err) != context.Canceled { s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError()) } From 17a72b900e6eccfec91958d5e61a1611fa9f4b7f Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Mon, 9 Oct 2023 20:44:40 +0800 Subject: [PATCH 10/27] log refine --- disttask/framework/scheduler/scheduler.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 6c70f290ddad5..28c2f07698f43 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -608,8 +608,10 @@ func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *pro if ctx.Err() != nil && context.Cause(ctx).Error() == "cancel subtasks" { s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) } else if IsRetryableError(err) { + logutil.Logger(s.logCtx).Warn("met retryable error", zap.Error(err)) } else if errors.Cause(err) != context.Canceled { - s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError()) + logutil.Logger(s.logCtx).Warn("subtask failed", zap.Error(err)) + s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, err) } s.markErrorHandled() return true From 513b7fd239781160b9e1a9735e2ac074e74d1ade Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 05:14:21 +0800 Subject: [PATCH 11/27] fix gracefully shutdown --- disttask/framework/dispatcher/dispatcher.go | 2 +- disttask/framework/framework_test.go | 6 ++---- disttask/framework/scheduler/BUILD.bazel | 2 ++ disttask/framework/scheduler/manager.go | 16 ++++++++-------- disttask/framework/scheduler/manager_test.go | 8 ++++---- disttask/framework/scheduler/scheduler.go | 19 ++++++++++++++++++- .../addindextest/add_index_test.go | 13 +++++++++---- 7 files changed, 44 insertions(+), 22 deletions(-) diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index 4580db6be05e9..b06655181f80f 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -238,7 +238,7 @@ func (d *BaseDispatcher) onCancelling() error { // handle task in pausing state, cancel all running subtasks. func (d *BaseDispatcher) onPausing() error { logutil.Logger(d.logCtx).Info("on pausing state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step)) - cnt, err := d.taskMgr.GetSubtaskInStatesCnt(d.Task.ID, proto.TaskStateRunning, proto.TaskStatePending) // ywq todo remove + cnt, err := d.taskMgr.GetSubtaskInStatesCnt(d.Task.ID, proto.TaskStateRunning, proto.TaskStatePending) if err != nil { logutil.Logger(d.logCtx).Warn("check task failed", zap.Error(err)) return err diff --git a/disttask/framework/framework_test.go b/disttask/framework/framework_test.go index 39349b3d950c4..bea0170ec4b19 100644 --- a/disttask/framework/framework_test.go +++ b/disttask/framework/framework_test.go @@ -633,8 +633,7 @@ func TestFrameworkSubtaskFinishedCancel(t *testing.T) { defer ctrl.Finish() RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{}) distContext := testkit.NewDistExecutionContext(t, 3) - err := failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel", "1*return(true)") - require.NoError(t, err) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel", "1*return(true)")) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel")) }() @@ -649,8 +648,7 @@ func TestFrameworkRunSubtaskCancel(t *testing.T) { defer ctrl.Finish() RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{}) distContext := testkit.NewDistExecutionContext(t, 3) - err := failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel", "1*return(true)") - require.NoError(t, err) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel", "1*return(true)")) DispatchTaskAndCheckState("key1", t, &m, proto.TaskStateReverted) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel")) distContext.Close() diff --git a/disttask/framework/scheduler/BUILD.bazel b/disttask/framework/scheduler/BUILD.bazel index ad38f0a1406f6..b8d22a21be36f 100644 --- a/disttask/framework/scheduler/BUILD.bazel +++ b/disttask/framework/scheduler/BUILD.bazel @@ -18,11 +18,13 @@ go_library( "//disttask/framework/scheduler/execute", "//disttask/framework/storage", "//domain/infosync", + "//errno", "//metrics", "//resourcemanager/pool/spool", "//resourcemanager/util", "//util/backoff", "//util/logutil", + "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_zap//:zap", diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 6d00908363c00..7aaabe5413df8 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -66,7 +66,7 @@ type Manager struct { sync.RWMutex // taskID -> cancelFunc. // cancelFunc is used to fast cancel the scheduler.Run. - handlingTasks map[int64]context.CancelFunc + handlingTasks map[int64]context.CancelCauseFunc } // id, it's the same as server id now, i.e. host:port. id string @@ -86,7 +86,7 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable newPool: b.newPool, } m.ctx, m.cancel = context.WithCancel(ctx) - m.mu.handlingTasks = make(map[int64]context.CancelFunc) + m.mu.handlingTasks = make(map[int64]context.CancelCauseFunc) schedulerPool, err := m.newPool("scheduler_pool", schedulerPoolSize, util.DistTask) if err != nil { @@ -226,7 +226,7 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) { for _, task := range tasks { logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID)) if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil { - cancel() + cancel(ErrCancelSubtask) } } } @@ -238,7 +238,7 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error { for _, task := range tasks { logutil.Logger(m.logCtx).Info("onPausingTasks", zap.Any("task_id", task.ID)) if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil { - cancel() + cancel(ErrCancelSubtask) } if err := m.taskTable.PauseSubtasks(m.id, task.ID); err != nil { return err @@ -254,7 +254,7 @@ func (m *Manager) cancelAllRunningTasks() { for id, cancel := range m.mu.handlingTasks { logutil.Logger(m.logCtx).Info("cancelAllRunningTasks", zap.Int64("task-id", id)) if cancel != nil { - cancel() + cancel(ErrCancelSubtask) } } } @@ -336,10 +336,10 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { } switch task.State { case proto.TaskStateRunning: - runCtx, runCancel := context.WithCancel(ctx) + runCtx, runCancel := context.WithCancelCause(ctx) m.registerCancelFunc(task.ID, runCancel) err = scheduler.Run(runCtx, task) - runCancel() + runCancel(ErrCancelSubtask) case proto.TaskStatePausing: err = scheduler.Pause(ctx, task) case proto.TaskStateReverting: @@ -359,7 +359,7 @@ func (m *Manager) addHandlingTask(id int64) { } // registerCancelFunc registers a cancel function for a task. -func (m *Manager) registerCancelFunc(id int64, cancel context.CancelFunc) { +func (m *Manager) registerCancelFunc(id int64, cancel context.CancelCauseFunc) { m.mu.Lock() defer m.mu.Unlock() m.mu.handlingTasks[id] = cancel diff --git a/disttask/framework/scheduler/manager_test.go b/disttask/framework/scheduler/manager_test.go index 7873aa59beaf2..323b1a01aded4 100644 --- a/disttask/framework/scheduler/manager_test.go +++ b/disttask/framework/scheduler/manager_test.go @@ -72,16 +72,16 @@ func TestManageTask(t *testing.T) { newTasks = m.filterAlreadyHandlingTasks(tasks) require.Equal(t, []*proto.Task{{ID: 1}}, newTasks) - ctx1, cancel1 := context.WithCancel(context.Background()) + ctx1, cancel1 := context.WithCancelCause(context.Background()) m.registerCancelFunc(2, cancel1) m.cancelAllRunningTasks() require.Equal(t, context.Canceled, ctx1.Err()) // test cancel. m.addHandlingTask(1) - ctx2, cancel2 := context.WithCancel(context.Background()) + ctx2, cancel2 := context.WithCancelCause(context.Background()) m.registerCancelFunc(1, cancel2) - ctx3, cancel3 := context.WithCancel(context.Background()) + ctx3, cancel3 := context.WithCancelCause(context.Background()) m.registerCancelFunc(2, cancel3) m.onCanceledTasks(context.Background(), []*proto.Task{{ID: 1}}) require.Equal(t, context.Canceled, ctx2.Err()) @@ -89,7 +89,7 @@ func TestManageTask(t *testing.T) { // test pause. m.addHandlingTask(3) - ctx4, cancel4 := context.WithCancel(context.Background()) + ctx4, cancel4 := context.WithCancelCause(context.Background()) m.registerCancelFunc(1, cancel4) mockTaskTable.EXPECT().PauseSubtasks("test", int64(1)).Return(nil) m.onPausingTasks([]*proto.Task{{ID: 1}}) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 28c2f07698f43..5d3dcf6281c93 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -131,6 +131,9 @@ func (s *BaseScheduler) Run(ctx context.Context, task *proto.Task) (err error) { if s.mu.handled { return err } + if err == nil { + return nil + } return s.updateErrorToSubtask(ctx, task.ID, err) } @@ -255,6 +258,13 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, executor execute.Subtask err = ErrCancelSubtask } }) + + failpoint.Inject("MockRunSubtaskContextCanceled", func(val failpoint.Value) { + if val.(bool) { + err = context.Canceled + } + }) + if err != nil { s.onError(err) } @@ -266,7 +276,10 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, executor execute.Subtask if ctx.Err() != nil { s.onError(ctx.Err()) - return + finished := s.markTaskCancelOrFailed(ctx, subtask) + if finished { + return + } } failpoint.Inject("mockTiDBDown", func(val failpoint.Value) { logutil.Logger(s.logCtx).Info("trigger mockTiDBDown") @@ -572,6 +585,7 @@ var ( } ) +// IsRetryableError checks if error can retry. func IsRetryableError(err error) bool { err = errors.Cause(err) // check the original error mysqlErr, ok := err.(*mysql.MySQLError) @@ -606,12 +620,15 @@ func IsRetryableError(err error) bool { func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *proto.Subtask) bool { if err := s.getError(); err != nil { if ctx.Err() != nil && context.Cause(ctx).Error() == "cancel subtasks" { + logutil.Logger(s.logCtx).Warn("subtask canceled", zap.Error(err)) s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) } else if IsRetryableError(err) { logutil.Logger(s.logCtx).Warn("met retryable error", zap.Error(err)) } else if errors.Cause(err) != context.Canceled { logutil.Logger(s.logCtx).Warn("subtask failed", zap.Error(err)) s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, err) + } else { + logutil.Logger(s.logCtx).Warn("met context canceled for gracefully shutdown", zap.Error(err)) } s.markErrorHandled() return true diff --git a/tests/realtikvtest/addindextest/add_index_test.go b/tests/realtikvtest/addindextest/add_index_test.go index d0b7fe44537b9..1aaaddb2fd71c 100644 --- a/tests/realtikvtest/addindextest/add_index_test.go +++ b/tests/realtikvtest/addindextest/add_index_test.go @@ -149,8 +149,8 @@ func TestAddIndexDistBasic(t *testing.T) { tk.MustExec("insert into t values (), (), (), (), (), ()") tk.MustExec("insert into t values (), (), (), (), (), ()") tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;") - tk.MustExec("alter table t add index idx(a);") - tk.MustExec("admin check index t idx;") + //tk.MustExec("alter table t add index idx(a);") + //tk.MustExec("admin check index t idx;") tk.MustExec("create table t1(a bigint auto_random primary key);") tk.MustExec("insert into t1 values (), (), (), (), (), ()") @@ -158,8 +158,13 @@ func TestAddIndexDistBasic(t *testing.T) { tk.MustExec("insert into t1 values (), (), (), (), (), ()") tk.MustExec("insert into t1 values (), (), (), (), (), ()") tk.MustExec("split table t1 between (3) and (8646911284551352360) regions 50;") - tk.MustExec("alter table t1 add index idx(a);") - tk.MustExec("admin check index t1 idx;") + //tk.MustExec("alter table t1 add index idx(a);") + //tk.MustExec("admin check index t1 idx;") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskContextCanceled", "1*return(true)")) + tk.MustExec("alter table t1 add index idx1(a);") + tk.MustExec("admin check index t1 idx1;") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskContextCanceled")) tk.MustExec(`set global tidb_enable_dist_task=0;`) } From 38edc95f75e55b253c91f25e4390bac65953b781 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 06:46:43 +0800 Subject: [PATCH 12/27] refine --- tests/realtikvtest/addindextest/add_index_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/realtikvtest/addindextest/add_index_test.go b/tests/realtikvtest/addindextest/add_index_test.go index 1aaaddb2fd71c..1e87dff26ea45 100644 --- a/tests/realtikvtest/addindextest/add_index_test.go +++ b/tests/realtikvtest/addindextest/add_index_test.go @@ -149,8 +149,8 @@ func TestAddIndexDistBasic(t *testing.T) { tk.MustExec("insert into t values (), (), (), (), (), ()") tk.MustExec("insert into t values (), (), (), (), (), ()") tk.MustExec("split table t between (3) and (8646911284551352360) regions 50;") - //tk.MustExec("alter table t add index idx(a);") - //tk.MustExec("admin check index t idx;") + tk.MustExec("alter table t add index idx(a);") + tk.MustExec("admin check index t idx;") tk.MustExec("create table t1(a bigint auto_random primary key);") tk.MustExec("insert into t1 values (), (), (), (), (), ()") @@ -158,8 +158,8 @@ func TestAddIndexDistBasic(t *testing.T) { tk.MustExec("insert into t1 values (), (), (), (), (), ()") tk.MustExec("insert into t1 values (), (), (), (), (), ()") tk.MustExec("split table t1 between (3) and (8646911284551352360) regions 50;") - //tk.MustExec("alter table t1 add index idx(a);") - //tk.MustExec("admin check index t1 idx;") + tk.MustExec("alter table t1 add index idx(a);") + tk.MustExec("admin check index t1 idx;") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskContextCanceled", "1*return(true)")) tk.MustExec("alter table t1 add index idx1(a);") From 8942719ae12a123799627bb46f889a979396de57 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 07:14:27 +0800 Subject: [PATCH 13/27] fix --- disttask/framework/dispatcher/dispatcher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index b06655181f80f..be43037e05f1b 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -407,7 +407,7 @@ func (d *BaseDispatcher) replaceDeadNodesIfAny() error { func (d *BaseDispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) { prevState := d.Task.State d.Task.State = taskState - if !d.VerifyTaskStateTransform(prevState, taskState) { + if !VerifyTaskStateTransform(prevState, taskState) { return errors.Errorf("invalid task state transform, from %s to %s", prevState, taskState) } @@ -697,7 +697,7 @@ func (d *BaseDispatcher) WithNewTxn(ctx context.Context, fn func(se sessionctx.C } // VerifyTaskStateTransform verifies whether the task state transform is valid. -func (d *BaseDispatcher) VerifyTaskStateTransform(from, to string) bool { +func VerifyTaskStateTransform(from, to string) bool { rules := map[string][]string{ proto.TaskStatePending: { proto.TaskStateRunning, @@ -739,7 +739,7 @@ func (d *BaseDispatcher) VerifyTaskStateTransform(from, to string) bool { proto.TaskStateRevertPending: {}, proto.TaskStateReverted: {}, } - logutil.Logger(d.logCtx).Info("task state transform", zap.String("from", from), zap.String("to", to)) + logutil.BgLogger().Info("task state transform", zap.String("from", from), zap.String("to", to)) if from == to { return true From 24b2366a77dca0a1a0ae62d0230fd2f2c82fef25 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 13:53:11 +0800 Subject: [PATCH 14/27] fix --- br/pkg/lightning/common/retry.go | 6 +++ disttask/framework/scheduler/scheduler.go | 50 +++-------------------- 2 files changed, 12 insertions(+), 44 deletions(-) diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index c3bb979a9bd32..43ec145026e9d 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -28,6 +28,8 @@ import ( "github.com/pingcap/errors" tmysql "github.com/pingcap/tidb/errno" drivererr "github.com/pingcap/tidb/store/driver/error" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -40,12 +42,15 @@ var retryableErrorMsgList = []string{ // this error happens on when distsql.Checksum calls TiKV // see https://github.com/pingcap/tidb/blob/2c3d4f1ae418881a95686e8b93d4237f2e76eec6/store/copr/coprocessor.go#L941 "coprocessor task terminated due to exceeding the deadline", + // ywq todo check + "PD server timeout", } func isRetryableFromErrorMessage(err error) bool { msg := err.Error() msgLower := strings.ToLower(msg) for _, errStr := range retryableErrorMsgList { + logutil.BgLogger().Info("ywq test error error msg", zap.String("errStr", errStr)) if strings.Contains(msgLower, errStr) { return true } @@ -108,6 +113,7 @@ func isSingleRetryableError(err error) bool { } return false case *mysql.MySQLError: + logutil.BgLogger().Info("ywq test error", zap.Uint16("num", nerr.Number)) switch nerr.Number { // ErrLockDeadlock can retry to commit while meet deadlock case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflict, tmysql.ErrWriteConflictInTiDB, diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 5d3dcf6281c93..82c0b5f26f4be 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -16,20 +16,18 @@ package scheduler import ( "context" - "strings" "sync" "time" - "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/disttask/framework/dispatcher" "github.com/pingcap/tidb/disttask/framework/handle" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/scheduler/execute" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" - "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/backoff" "github.com/pingcap/tidb/util/logutil" @@ -186,6 +184,10 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { proto.TaskStatePending, proto.TaskStateRunning) if err != nil { s.onError(err) + if common.IsRetryableError(err) { + logutil.Logger(s.logCtx).Warn("met retryable error", zap.Error(err)) + return nil + } return s.getError() } for _, subtask := range subtasks { @@ -577,52 +579,12 @@ func (s *BaseScheduler) finishSubtaskAndUpdateState(ctx context.Context, subtask metrics.IncDistTaskSubTaskCnt(subtask) } -var ( - // Retryable1105Msgs list the error messages of some retryable error with `1105` code (`ErrUnknown`). - Retryable1105Msgs = []string{ - "Information schema is out of date", - "Information schema is changed", - } -) - -// IsRetryableError checks if error can retry. -func IsRetryableError(err error) bool { - err = errors.Cause(err) // check the original error - mysqlErr, ok := err.(*mysql.MySQLError) - if !ok { - return false - } - - switch mysqlErr.Number { - case errno.ErrLockDeadlock: // https://dev.mysql.com/doc/refman/5.7/en/innodb-deadlocks.html - return true // retryable error in MySQL - case errno.ErrPDServerTimeout, - errno.ErrTiKVServerBusy, - errno.ErrResolveLockTimeout, - errno.ErrInfoSchemaExpired, - errno.ErrInfoSchemaChanged, - errno.ErrWriteConflictInTiDB, - errno.ErrTxnRetryable, - errno.ErrWriteConflict, - errno.ErrColumnInChange, - errno.ErrRegionUnavailable: - return true // retryable error in TiDB - case errno.ErrUnknown: // the old version of TiDB uses `1105` frequently, this should be compatible. - for _, msg := range Retryable1105Msgs { - if strings.Contains(mysqlErr.Message, msg) { - return true - } - } - } - - return false -} func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *proto.Subtask) bool { if err := s.getError(); err != nil { if ctx.Err() != nil && context.Cause(ctx).Error() == "cancel subtasks" { logutil.Logger(s.logCtx).Warn("subtask canceled", zap.Error(err)) s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) - } else if IsRetryableError(err) { + } else if common.IsRetryableError(err) { logutil.Logger(s.logCtx).Warn("met retryable error", zap.Error(err)) } else if errors.Cause(err) != context.Canceled { logutil.Logger(s.logCtx).Warn("subtask failed", zap.Error(err)) From 6b84bf369721f49cbc6afa3cf4adaff5987d4aa8 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 14:13:35 +0800 Subject: [PATCH 15/27] fix --- br/pkg/lightning/common/BUILD.bazel | 1 + disttask/framework/scheduler/BUILD.bazel | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 46b877b62f15d..943a6fc5b6e68 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -39,6 +39,7 @@ go_library( "//util", "//util/codec", "//util/format", + "//util/logutil", "@com_github_cockroachdb_pebble//:pebble", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", diff --git a/disttask/framework/scheduler/BUILD.bazel b/disttask/framework/scheduler/BUILD.bazel index b8d22a21be36f..c92d7043ae39e 100644 --- a/disttask/framework/scheduler/BUILD.bazel +++ b/disttask/framework/scheduler/BUILD.bazel @@ -11,6 +11,7 @@ go_library( importpath = "github.com/pingcap/tidb/disttask/framework/scheduler", visibility = ["//visibility:public"], deps = [ + "//br/pkg/lightning/common", "//config", "//disttask/framework/dispatcher", "//disttask/framework/handle", @@ -18,13 +19,11 @@ go_library( "//disttask/framework/scheduler/execute", "//disttask/framework/storage", "//domain/infosync", - "//errno", "//metrics", "//resourcemanager/pool/spool", "//resourcemanager/util", "//util/backoff", "//util/logutil", - "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_zap//:zap", From 1a14ed1be5c44b303ebe9eae466077f3dd3cf272 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 14:46:45 +0800 Subject: [PATCH 16/27] rm log --- br/pkg/lightning/common/retry.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index 43ec145026e9d..f1005122c4581 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -28,8 +28,6 @@ import ( "github.com/pingcap/errors" tmysql "github.com/pingcap/tidb/errno" drivererr "github.com/pingcap/tidb/store/driver/error" - "github.com/pingcap/tidb/util/logutil" - "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -42,7 +40,6 @@ var retryableErrorMsgList = []string{ // this error happens on when distsql.Checksum calls TiKV // see https://github.com/pingcap/tidb/blob/2c3d4f1ae418881a95686e8b93d4237f2e76eec6/store/copr/coprocessor.go#L941 "coprocessor task terminated due to exceeding the deadline", - // ywq todo check "PD server timeout", } @@ -50,7 +47,6 @@ func isRetryableFromErrorMessage(err error) bool { msg := err.Error() msgLower := strings.ToLower(msg) for _, errStr := range retryableErrorMsgList { - logutil.BgLogger().Info("ywq test error error msg", zap.String("errStr", errStr)) if strings.Contains(msgLower, errStr) { return true } @@ -113,7 +109,6 @@ func isSingleRetryableError(err error) bool { } return false case *mysql.MySQLError: - logutil.BgLogger().Info("ywq test error", zap.Uint16("num", nerr.Number)) switch nerr.Number { // ErrLockDeadlock can retry to commit while meet deadlock case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflict, tmysql.ErrWriteConflictInTiDB, From 2b7dfd996ef1821115a6e3b4cf079f1199eb6884 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 15:06:42 +0800 Subject: [PATCH 17/27] fix build --- br/pkg/lightning/common/BUILD.bazel | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/common/BUILD.bazel b/br/pkg/lightning/common/BUILD.bazel index 943a6fc5b6e68..46b877b62f15d 100644 --- a/br/pkg/lightning/common/BUILD.bazel +++ b/br/pkg/lightning/common/BUILD.bazel @@ -39,7 +39,6 @@ go_library( "//util", "//util/codec", "//util/format", - "//util/logutil", "@com_github_cockroachdb_pebble//:pebble", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_pingcap_errors//:errors", From dfb71a3a35063838e926a90bc524068653422544 Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Tue, 10 Oct 2023 16:22:39 +0800 Subject: [PATCH 18/27] Update disttask/framework/scheduler/scheduler.go Co-authored-by: D3Hunter --- disttask/framework/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 82c0b5f26f4be..126985bfcffb1 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -581,7 +581,7 @@ func (s *BaseScheduler) finishSubtaskAndUpdateState(ctx context.Context, subtask func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *proto.Subtask) bool { if err := s.getError(); err != nil { - if ctx.Err() != nil && context.Cause(ctx).Error() == "cancel subtasks" { + if ctx.Err() != nil && context.Cause(ctx) == ErrCancelSubtask { logutil.Logger(s.logCtx).Warn("subtask canceled", zap.Error(err)) s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) } else if common.IsRetryableError(err) { From cdb2d2181a9b9c009d81ae935fc4d2b4ba9d5411 Mon Sep 17 00:00:00 2001 From: EasonBall <592838129@qq.com> Date: Tue, 10 Oct 2023 16:22:56 +0800 Subject: [PATCH 19/27] Update disttask/framework/scheduler/scheduler.go Co-authored-by: D3Hunter --- disttask/framework/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 126985bfcffb1..77c4ad23481c5 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -590,7 +590,7 @@ func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *pro logutil.Logger(s.logCtx).Warn("subtask failed", zap.Error(err)) s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, err) } else { - logutil.Logger(s.logCtx).Warn("met context canceled for gracefully shutdown", zap.Error(err)) + logutil.Logger(s.logCtx).Info("met context canceled for gracefully shutdown", zap.Error(err)) } s.markErrorHandled() return true From 3931c39a2b85cebdfc1df8b10516d6a97c8b51fe Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 16:57:23 +0800 Subject: [PATCH 20/27] fix --- disttask/framework/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 82c0b5f26f4be..57d217ca8551f 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -100,7 +100,7 @@ func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup if canceled { logutil.Logger(s.logCtx).Info("scheduler canceled") if cancelFn != nil { - cancelFn(ErrCancelSubtask) + cancelFn(nil) } } } From 0968b8226b8e55f7d1c2d04867727f4a86c503d9 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 17:13:26 +0800 Subject: [PATCH 21/27] address comments --- disttask/framework/scheduler/manager.go | 11 ++++++++--- disttask/framework/scheduler/scheduler.go | 9 ++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 3336bca2869d0..7c6113120c7b3 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -226,6 +226,7 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) { for _, task := range tasks { logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID)) if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil { + // subtask needs to change its state to canceled. cancel(ErrCancelSubtask) } } @@ -238,7 +239,9 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error { for _, task := range tasks { logutil.Logger(m.logCtx).Info("onPausingTasks", zap.Any("task_id", task.ID)) if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil { - cancel(ErrCancelSubtask) + // Pause all running subtasks, don't mark subtasks as canceled. + // Should not change the subtask's state. + cancel(nil) } if err := m.taskTable.PauseSubtasks(m.id, task.ID); err != nil { return err @@ -254,7 +257,9 @@ func (m *Manager) cancelAllRunningTasks() { for id, cancel := range m.mu.handlingTasks { logutil.Logger(m.logCtx).Info("cancelAllRunningTasks", zap.Int64("task-id", id)) if cancel != nil { - cancel(ErrCancelSubtask) + // tidb shutdown, don't mark subtask as canceled. + // Should not change the subtask's state. + cancel(nil) } } } @@ -340,7 +345,7 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { runCtx, runCancel := context.WithCancelCause(ctx) m.registerCancelFunc(task.ID, runCancel) err = scheduler.Run(runCtx, task) - runCancel(ErrCancelSubtask) + runCancel(nil) case proto.TaskStatePausing: err = scheduler.Pause(ctx, task) case proto.TaskStateReverting: diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index c09e4543a2ad6..487d960d75f7a 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -100,6 +100,8 @@ func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup if canceled { logutil.Logger(s.logCtx).Info("scheduler canceled") if cancelFn != nil { + // subtask transfered to other tidb, don't mark subtask as canceled. + // Should not change the subtask's state. cancelFn(nil) } } @@ -276,13 +278,6 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, executor execute.Subtask return } - if ctx.Err() != nil { - s.onError(ctx.Err()) - finished := s.markTaskCancelOrFailed(ctx, subtask) - if finished { - return - } - } failpoint.Inject("mockTiDBDown", func(val failpoint.Value) { logutil.Logger(s.logCtx).Info("trigger mockTiDBDown") if s.id == val.(string) || s.id == ":4001" || s.id == ":4002" { From 01550ba2074722ed9ea6fc243dc8433bb4070d68 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 17:16:13 +0800 Subject: [PATCH 22/27] check nil --- disttask/framework/scheduler/manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 7c6113120c7b3..261930bf8fa77 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -326,6 +326,9 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) { m.logErr(err) return } + if task == nil { + return + } if task.State != proto.TaskStateRunning && task.State != proto.TaskStateReverting { logutil.Logger(m.logCtx).Info("onRunnableTask exit", zap.Int64("task-id", task.ID), zap.Int64("step", task.Step), zap.String("state", task.State)) From e730ac71ae4b0ef309ecdea0103482ec189aafb3 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 17:20:45 +0800 Subject: [PATCH 23/27] fix spell --- disttask/framework/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 487d960d75f7a..df3bfb21e70de 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -100,7 +100,7 @@ func (s *BaseScheduler) startCancelCheck(ctx context.Context, wg *sync.WaitGroup if canceled { logutil.Logger(s.logCtx).Info("scheduler canceled") if cancelFn != nil { - // subtask transfered to other tidb, don't mark subtask as canceled. + // subtask transferred to other tidb, don't mark subtask as canceled. // Should not change the subtask's state. cancelFn(nil) } From e188b34cdc2c422ed529c528490a37e6212019e5 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Tue, 10 Oct 2023 17:24:36 +0800 Subject: [PATCH 24/27] rm --- br/pkg/lightning/common/retry.go | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/lightning/common/retry.go b/br/pkg/lightning/common/retry.go index f1005122c4581..c3bb979a9bd32 100644 --- a/br/pkg/lightning/common/retry.go +++ b/br/pkg/lightning/common/retry.go @@ -40,7 +40,6 @@ var retryableErrorMsgList = []string{ // this error happens on when distsql.Checksum calls TiKV // see https://github.com/pingcap/tidb/blob/2c3d4f1ae418881a95686e8b93d4237f2e76eec6/store/copr/coprocessor.go#L941 "coprocessor task terminated due to exceeding the deadline", - "PD server timeout", } func isRetryableFromErrorMessage(err error) bool { From 8f2d96aa78c8730ef4026ce4bb75f288861f5cfa Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Wed, 11 Oct 2023 17:29:55 +0800 Subject: [PATCH 25/27] fix comments --- disttask/framework/scheduler/manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/disttask/framework/scheduler/manager.go b/disttask/framework/scheduler/manager.go index 261930bf8fa77..ff7682c99790c 100644 --- a/disttask/framework/scheduler/manager.go +++ b/disttask/framework/scheduler/manager.go @@ -64,8 +64,8 @@ type Manager struct { schedulerPool Pool mu struct { sync.RWMutex - // taskID -> cancelFunc. - // cancelFunc is used to fast cancel the scheduler.Run. + // taskID -> CancelCauseFunc. + // CancelCauseFunc is used to fast cancel the scheduler.Run. handlingTasks map[int64]context.CancelCauseFunc } // id, it's the same as server id now, i.e. host:port. From befaba521be6588314268a7e2382d7880166923e Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 12 Oct 2023 12:31:31 +0800 Subject: [PATCH 26/27] fix --- disttask/framework/scheduler/scheduler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index df3bfb21e70de..319de43db5ec6 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -273,7 +273,7 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, executor execute.Subtask s.onError(err) } - finished := s.markTaskCancelOrFailed(ctx, subtask) + finished := s.markSubTaskCanceledOrFailed(ctx, subtask) if finished { return } @@ -343,14 +343,14 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, executor execute. } }) - finished := s.markTaskCancelOrFailed(ctx, subtask) + finished := s.markSubTaskCanceledOrFailed(ctx, subtask) if finished { return } s.finishSubtaskAndUpdateState(ctx, subtask) - finished = s.markTaskCancelOrFailed(ctx, subtask) + finished = s.markSubTaskCanceledOrFailed(ctx, subtask) if finished { return } @@ -574,7 +574,7 @@ func (s *BaseScheduler) finishSubtaskAndUpdateState(ctx context.Context, subtask metrics.IncDistTaskSubTaskCnt(subtask) } -func (s *BaseScheduler) markTaskCancelOrFailed(ctx context.Context, subtask *proto.Subtask) bool { +func (s *BaseScheduler) markSubTaskCanceledOrFailed(ctx context.Context, subtask *proto.Subtask) bool { if err := s.getError(); err != nil { if ctx.Err() != nil && context.Cause(ctx) == ErrCancelSubtask { logutil.Logger(s.logCtx).Warn("subtask canceled", zap.Error(err)) From 6ee2cac4e98e5600dba655c329122a1fbb50b263 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Thu, 12 Oct 2023 12:33:45 +0800 Subject: [PATCH 27/27] add comments --- disttask/framework/scheduler/scheduler.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 319de43db5ec6..d6f2074119a53 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -574,6 +574,10 @@ func (s *BaseScheduler) finishSubtaskAndUpdateState(ctx context.Context, subtask metrics.IncDistTaskSubTaskCnt(subtask) } +// markSubTaskCanceledOrFailed check the error type and decide the subtasks' state. +// 1. Only cancel subtasks when meet ErrCancelSubtask. +// 2. Only fail subtasks when meet non retryable error. +// 3. When meet other errors, don't change subtasks' state. func (s *BaseScheduler) markSubTaskCanceledOrFailed(ctx context.Context, subtask *proto.Subtask) bool { if err := s.getError(); err != nil { if ctx.Err() != nil && context.Cause(ctx) == ErrCancelSubtask {