diff --git a/disttask/framework/mock/scheduler_mock.go b/disttask/framework/mock/scheduler_mock.go index f60f20209fc8f..032633b71d2f6 100644 --- a/disttask/framework/mock/scheduler_mock.go +++ b/disttask/framework/mock/scheduler_mock.go @@ -50,6 +50,26 @@ func (mr *MockTaskTableMockRecorder) FinishSubtask(arg0, arg1 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinishSubtask", reflect.TypeOf((*MockTaskTable)(nil).FinishSubtask), arg0, arg1) } +// GetFirstSubtaskInStates mocks base method. +func (m *MockTaskTable) GetFirstSubtaskInStates(arg0 string, arg1, arg2 int64, arg3 ...interface{}) (*proto.Subtask, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetFirstSubtaskInStates", varargs...) + ret0, _ := ret[0].(*proto.Subtask) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetFirstSubtaskInStates indicates an expected call of GetFirstSubtaskInStates. +func (mr *MockTaskTableMockRecorder) GetFirstSubtaskInStates(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstSubtaskInStates", reflect.TypeOf((*MockTaskTable)(nil).GetFirstSubtaskInStates), varargs...) +} + // GetGlobalTaskByID mocks base method. func (m *MockTaskTable) GetGlobalTaskByID(arg0 int64) (*proto.Task, error) { m.ctrl.T.Helper() @@ -84,24 +104,24 @@ func (mr *MockTaskTableMockRecorder) GetGlobalTasksInStates(arg0 ...interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGlobalTasksInStates", reflect.TypeOf((*MockTaskTable)(nil).GetGlobalTasksInStates), arg0...) } -// GetSubtaskInStates mocks base method. -func (m *MockTaskTable) GetSubtaskInStates(arg0 string, arg1, arg2 int64, arg3 ...interface{}) (*proto.Subtask, error) { +// GetSubtasksInStates mocks base method. +func (m *MockTaskTable) GetSubtasksInStates(arg0 string, arg1, arg2 int64, arg3 ...interface{}) ([]*proto.Subtask, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1, arg2} for _, a := range arg3 { varargs = append(varargs, a) } - ret := m.ctrl.Call(m, "GetSubtaskInStates", varargs...) - ret0, _ := ret[0].(*proto.Subtask) + ret := m.ctrl.Call(m, "GetSubtasksInStates", varargs...) + ret0, _ := ret[0].([]*proto.Subtask) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetSubtaskInStates indicates an expected call of GetSubtaskInStates. -func (mr *MockTaskTableMockRecorder) GetSubtaskInStates(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { +// GetSubtasksInStates indicates an expected call of GetSubtasksInStates. +func (mr *MockTaskTableMockRecorder) GetSubtasksInStates(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubtaskInStates", reflect.TypeOf((*MockTaskTable)(nil).GetSubtaskInStates), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubtasksInStates", reflect.TypeOf((*MockTaskTable)(nil).GetSubtasksInStates), varargs...) } // HasSubtasksInStates mocks base method. diff --git a/disttask/framework/proto/task.go b/disttask/framework/proto/task.go index 2403f76304586..9de22f1fe8dae 100644 --- a/disttask/framework/proto/task.go +++ b/disttask/framework/proto/task.go @@ -104,6 +104,12 @@ type Subtask struct { Summary string } +// IsFinished checks if the subtask is finished. +func (t *Subtask) IsFinished() bool { + return t.State == TaskStateSucceed || t.State == TaskStateReverted || t.State == TaskStateCanceled || + t.State == TaskStateFailed || t.State == TaskStateRevertFailed +} + // NewSubtask create a new subtask. func NewSubtask(step int64, taskID int64, tp, schedulerID string, meta []byte) *Subtask { return &Subtask{ diff --git a/disttask/framework/scheduler/BUILD.bazel b/disttask/framework/scheduler/BUILD.bazel index 31ec6b859267b..b068ea7bb840b 100644 --- a/disttask/framework/scheduler/BUILD.bazel +++ b/disttask/framework/scheduler/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//disttask/framework/scheduler/execute", "//disttask/framework/storage", "//domain/infosync", + "//metrics", "//resourcemanager/pool/spool", "//resourcemanager/util", "//util/logutil", diff --git a/disttask/framework/scheduler/interface.go b/disttask/framework/scheduler/interface.go index b25ea724d8781..73cb54f1acc11 100644 --- a/disttask/framework/scheduler/interface.go +++ b/disttask/framework/scheduler/interface.go @@ -26,7 +26,8 @@ type TaskTable interface { GetGlobalTasksInStates(states ...interface{}) (task []*proto.Task, err error) GetGlobalTaskByID(taskID int64) (task *proto.Task, err error) - GetSubtaskInStates(tidbID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error) + GetSubtasksInStates(tidbID string, taskID int64, step int64, states ...interface{}) ([]*proto.Subtask, error) + GetFirstSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error) StartManager(tidbID string, role string) error StartSubtask(subtaskID int64) error UpdateSubtaskStateAndError(subtaskID int64, state string, err error) error diff --git a/disttask/framework/scheduler/scheduler.go b/disttask/framework/scheduler/scheduler.go index 5cec67917e43b..a2bd6745090a1 100644 --- a/disttask/framework/scheduler/scheduler.go +++ b/disttask/framework/scheduler/scheduler.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/scheduler/execute" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -165,14 +166,25 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { wg.Wait() }() + subtasks, err := s.taskTable.GetSubtasksInStates(s.id, task.ID, task.Step, proto.TaskStatePending) + if err != nil { + s.onError(err) + return s.getError() + } + for _, subtask := range subtasks { + metrics.IncDistTaskSubTaskCnt(subtask) + metrics.StartDistTaskSubTask(subtask) + } + for { // check if any error occurs. if err := s.getError(); err != nil { break } - subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending) + + subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending) if err != nil { - logutil.Logger(s.logCtx).Warn("GetSubtaskInStates meets error", zap.Error(err)) + logutil.Logger(s.logCtx).Warn("GetFirstSubtaskInStates meets error", zap.Error(err)) continue } if subtask == nil { @@ -187,11 +199,13 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error { } continue } - s.startSubtask(subtask.ID) + + s.startSubtaskAndUpdateState(subtask) if err := s.getError(); err != nil { - logutil.Logger(s.logCtx).Warn("startSubtask meets error", zap.Error(err)) + logutil.Logger(s.logCtx).Warn("startSubtaskAndUpdateState meets error", zap.Error(err)) continue } + failpoint.Inject("mockCleanScheduler", func() { v, ok := testContexts.Load(s.id) if ok { @@ -216,9 +230,9 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, scheduler execute.Subtas if err != nil { s.onError(err) if errors.Cause(err) == context.Canceled { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, s.getError()) + s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, s.getError()) } else { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError()) + s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError()) } s.markErrorHandled() return @@ -293,16 +307,14 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, scheduler execute }) if err := s.getError(); err != nil { if errors.Cause(err) == context.Canceled { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil) + s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) } else { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError()) + s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError()) } s.markErrorHandled() return } - if err := s.taskTable.FinishSubtask(subtask.ID, subtask.Meta); err != nil { - s.onError(err) - } + s.finishSubtaskAndUpdateState(subtask) failpoint.Inject("syncAfterSubtaskFinish", func() { TestSyncChan <- struct{}{} <-TestSyncChan @@ -320,7 +332,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { // We should cancel all subtasks before rolling back for { - subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRunning) + subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRunning) if err != nil { s.onError(err) return s.getError() @@ -330,7 +342,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { break } - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil) + s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil) if err = s.getError(); err != nil { return err } @@ -341,7 +353,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { s.onError(err) return s.getError() } - subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStateRevertPending) + subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStateRevertPending) if err != nil { s.onError(err) return s.getError() @@ -350,17 +362,17 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error { logutil.BgLogger().Warn("scheduler rollback a step, but no subtask in revert_pending state", zap.Any("step", task.Step)) return nil } - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateReverting, nil) + s.updateSubtaskStateAndError(subtask, proto.TaskStateReverting, nil) if err := s.getError(); err != nil { return err } err = executor.Rollback(rollbackCtx) if err != nil { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateRevertFailed, nil) + s.updateSubtaskStateAndError(subtask, proto.TaskStateRevertFailed, nil) s.onError(err) } else { - s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateReverted, nil) + s.updateSubtaskStateAndError(subtask, proto.TaskStateReverted, nil) } return s.getError() } @@ -446,16 +458,38 @@ func (s *BaseScheduler) resetError() { s.mu.handled = false } -func (s *BaseScheduler) startSubtask(id int64) { - err := s.taskTable.StartSubtask(id) +func (s *BaseScheduler) startSubtaskAndUpdateState(subtask *proto.Subtask) { + metrics.DecDistTaskSubTaskCnt(subtask) + metrics.EndDistTaskSubTask(subtask) + err := s.taskTable.StartSubtask(subtask.ID) if err != nil { s.onError(err) } + subtask.State = proto.TaskStateRunning + metrics.IncDistTaskSubTaskCnt(subtask) + metrics.StartDistTaskSubTask(subtask) } -func (s *BaseScheduler) updateSubtaskStateAndError(subtaskID int64, state string, subTaskErr error) { - err := s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr) +func (s *BaseScheduler) updateSubtaskStateAndError(subtask *proto.Subtask, state string, subTaskErr error) { + metrics.DecDistTaskSubTaskCnt(subtask) + metrics.EndDistTaskSubTask(subtask) + err := s.taskTable.UpdateSubtaskStateAndError(subtask.ID, state, subTaskErr) if err != nil { s.onError(err) } + subtask.State = state + metrics.IncDistTaskSubTaskCnt(subtask) + if !subtask.IsFinished() { + metrics.StartDistTaskSubTask(subtask) + } +} + +func (s *BaseScheduler) finishSubtaskAndUpdateState(subtask *proto.Subtask) { + metrics.DecDistTaskSubTaskCnt(subtask) + metrics.EndDistTaskSubTask(subtask) + if err := s.taskTable.FinishSubtask(subtask.ID, subtask.Meta); err != nil { + s.onError(err) + } + subtask.State = proto.TaskStateSucceed + metrics.IncDistTaskSubTaskCnt(subtask) } diff --git a/disttask/framework/scheduler/scheduler_test.go b/disttask/framework/scheduler/scheduler_test.go index 05a675aca4ef4..724eb6508d4c2 100644 --- a/disttask/framework/scheduler/scheduler_test.go +++ b/disttask/framework/scheduler/scheduler_test.go @@ -72,7 +72,8 @@ func TestSchedulerRun(t *testing.T) { // 5. run subtask failed runSubtaskErr := errors.New("run subtask error") mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) + mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 1, Type: tp, Step: proto.StepOne}}, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateFailed, gomock.Any()).Return(nil) @@ -83,12 +84,14 @@ func TestSchedulerRun(t *testing.T) { // 6. run subtask success mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) + mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 1, Type: tp, Step: proto.StepOne}}, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().FinishSubtask(int64(1), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(nil, nil) + mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 2, Type: tp, Step: proto.StepOne}}, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(nil, nil) mockSubtaskTable.EXPECT().GetGlobalTaskByID(gomock.Any()).Return(&proto.Task{ID: taskID, Step: proto.StepTwo}, nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) @@ -96,25 +99,26 @@ func TestSchedulerRun(t *testing.T) { // 7. run subtask one by one mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) + mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 1, Type: tp, Step: proto.StepOne}, {ID: 2, Type: tp, Step: proto.StepOne}}, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(int64(1)).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().FinishSubtask(int64(1), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 2, Type: tp, Step: proto.StepOne}, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 2, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(int64(2)).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskExecutor.EXPECT().OnFinished(gomock.Any(), gomock.Any()).Return(nil) mockSubtaskTable.EXPECT().FinishSubtask(int64(2), gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(nil, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(nil, nil) mockSubtaskExecutor.EXPECT().Cleanup(gomock.Any()).Return(nil) err = scheduler.Run(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID, Concurrency: concurrency}) require.NoError(t, err) // 8. cancel mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(context.Canceled) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateCanceled, gomock.Any()).Return(nil) @@ -149,7 +153,7 @@ func TestSchedulerRollback(t *testing.T) { mockExtension.EXPECT().GetSubtaskExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, schedulerRegisterErr) scheduler := NewBaseScheduler(ctx, "id", 1, mockSubtaskTable) scheduler.Extension = mockExtension - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", int64(1), proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", int64(1), proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) err := scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, ID: 1, Type: tp}) require.EqualError(t, err, schedulerRegisterErr.Error()) @@ -159,26 +163,26 @@ func TestSchedulerRollback(t *testing.T) { // 2. get subtask failed getSubtaskErr := errors.New("get subtask error") var taskID int64 = 1 - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(nil, getSubtaskErr) err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.EqualError(t, err, getSubtaskErr.Error()) // 3. no subtask - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(nil, nil) err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) require.NoError(t, err) // 4. update subtask error updateSubtaskErr := errors.New("update subtask error") - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(&proto.Subtask{ID: 1}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(updateSubtaskErr) err = scheduler.Rollback(runCtx, &proto.Task{Step: proto.StepOne, Type: tp, ID: taskID}) @@ -186,9 +190,9 @@ func TestSchedulerRollback(t *testing.T) { // 5. rollback failed rollbackErr := errors.New("rollback error") - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(&proto.Subtask{ID: 1}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(nil) mockSubtaskExecutor.EXPECT().Rollback(gomock.Any()).Return(rollbackErr) @@ -197,15 +201,15 @@ func TestSchedulerRollback(t *testing.T) { require.EqualError(t, err, rollbackErr.Error()) // 6. rollback success - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(&proto.Subtask{ID: 1}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(int64(1), proto.TaskStateCanceled, nil).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(&proto.Subtask{ID: 2}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(int64(2), proto.TaskStateCanceled, nil).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(&proto.Subtask{ID: 3}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(int64(3), proto.TaskStateReverting, nil).Return(nil) mockSubtaskExecutor.EXPECT().Rollback(gomock.Any()).Return(nil) @@ -260,7 +264,9 @@ func TestScheduler(t *testing.T) { // run failed runSubtaskErr := errors.New("run subtask error") mockSubtaskExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", proto.StepOne, taskID, + mockSubtaskTable.EXPECT().GetSubtasksInStates("id", taskID, proto.StepOne, + []interface{}{proto.TaskStatePending}).Return([]*proto.Subtask{{ID: 1, Type: tp, Step: proto.StepOne}}, nil) + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", proto.StepOne, taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 1, Type: tp, Step: proto.StepOne}, nil) mockSubtaskTable.EXPECT().StartSubtask(taskID).Return(nil) mockSubtaskExecutor.EXPECT().RunSubtask(gomock.Any(), gomock.Any()).Return(runSubtaskErr) @@ -270,9 +276,9 @@ func TestScheduler(t *testing.T) { require.EqualError(t, err, runSubtaskErr.Error()) // rollback success - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStatePending, proto.TaskStateRunning}).Return(nil, nil) - mockSubtaskTable.EXPECT().GetSubtaskInStates("id", taskID, proto.StepOne, + mockSubtaskTable.EXPECT().GetFirstSubtaskInStates("id", taskID, proto.StepOne, []interface{}{proto.TaskStateRevertPending}).Return(&proto.Subtask{ID: 1, Type: tp}, nil) mockSubtaskTable.EXPECT().UpdateSubtaskStateAndError(taskID, proto.TaskStateReverting, nil).Return(nil) mockSubtaskExecutor.EXPECT().Rollback(gomock.Any()).Return(nil) diff --git a/disttask/framework/storage/table_test.go b/disttask/framework/storage/table_test.go index 09f4d14f3ff43..c6214e451cf43 100644 --- a/disttask/framework/storage/table_test.go +++ b/disttask/framework/storage/table_test.go @@ -132,11 +132,11 @@ func TestSubTaskTable(t *testing.T) { err := sm.AddNewSubTask(1, proto.StepInit, "tidb1", []byte("test"), proto.TaskTypeExample, false) require.NoError(t, err) - nilTask, err := sm.GetSubtaskInStates("tidb2", 1, proto.StepInit, proto.TaskStatePending) + nilTask, err := sm.GetFirstSubtaskInStates("tidb2", 1, proto.StepInit, proto.TaskStatePending) require.NoError(t, err) require.Nil(t, nilTask) - subtask, err := sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending) + subtask, err := sm.GetFirstSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending) require.NoError(t, err) require.Equal(t, proto.TaskTypeExample, subtask.Type) require.Equal(t, int64(1), subtask.TaskID) @@ -146,7 +146,7 @@ func TestSubTaskTable(t *testing.T) { require.Zero(t, subtask.StartTime) require.Zero(t, subtask.UpdateTime) - subtask2, err := sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending, proto.TaskStateReverted) + subtask2, err := sm.GetFirstSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending, proto.TaskStateReverted) require.NoError(t, err) require.Equal(t, subtask, subtask2) @@ -175,11 +175,11 @@ func TestSubTaskTable(t *testing.T) { time.Sleep(time.Second) require.NoError(t, sm.StartSubtask(1)) - subtask, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending) + subtask, err = sm.GetFirstSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStatePending) require.NoError(t, err) require.Nil(t, subtask) - subtask, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStateRunning) + subtask, err = sm.GetFirstSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStateRunning) require.NoError(t, err) require.Equal(t, proto.TaskTypeExample, subtask.Type) require.Equal(t, int64(1), subtask.TaskID) @@ -192,7 +192,7 @@ func TestSubTaskTable(t *testing.T) { // check update time after state change to cancel time.Sleep(time.Second) require.NoError(t, sm.UpdateSubtaskStateAndError(1, proto.TaskStateCancelling, nil)) - subtask2, err = sm.GetSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStateCancelling) + subtask2, err = sm.GetFirstSubtaskInStates("tidb1", 1, proto.StepInit, proto.TaskStateCancelling) require.NoError(t, err) require.Equal(t, proto.TaskStateCancelling, subtask2.State) require.Greater(t, subtask2.UpdateTime, subtask.UpdateTime) @@ -243,7 +243,7 @@ func TestSubTaskTable(t *testing.T) { err = sm.AddNewSubTask(3, proto.StepInit, "for_test", []byte("test"), proto.TaskTypeExample, false) require.NoError(t, err) require.NoError(t, sm.UpdateErrorToSubtask("for_test", 3, errors.New("fail"))) - subtask, err = sm.GetSubtaskInStates("for_test", 3, proto.StepInit, proto.TaskStateFailed) + subtask, err = sm.GetFirstSubtaskInStates("for_test", 3, proto.StepInit, proto.TaskStateFailed) require.NoError(t, err) require.Equal(t, proto.TaskStateFailed, subtask.State) require.Greater(t, subtask.StartTime, ts) @@ -252,16 +252,16 @@ func TestSubTaskTable(t *testing.T) { // test FinishSubtask do update update time err = sm.AddNewSubTask(4, proto.StepInit, "for_test1", []byte("test"), proto.TaskTypeExample, false) require.NoError(t, err) - subtask, err = sm.GetSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStatePending) + subtask, err = sm.GetFirstSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStatePending) require.NoError(t, err) require.NoError(t, sm.StartSubtask(subtask.ID)) - subtask, err = sm.GetSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStateRunning) + subtask, err = sm.GetFirstSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStateRunning) require.NoError(t, err) require.Greater(t, subtask.StartTime, ts) require.Greater(t, subtask.UpdateTime, ts) time.Sleep(time.Second) require.NoError(t, sm.FinishSubtask(subtask.ID, []byte{})) - subtask2, err = sm.GetSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStateSucceed) + subtask2, err = sm.GetFirstSubtaskInStates("for_test1", 4, proto.StepInit, proto.TaskStateSucceed) require.NoError(t, err) require.Equal(t, subtask2.StartTime, subtask.StartTime) require.Greater(t, subtask2.UpdateTime, subtask.UpdateTime) @@ -332,13 +332,13 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) { require.NoError(t, err) require.Equal(t, proto.TaskStateRunning, task.State) - subtask1, err := sm.GetSubtaskInStates("instance1", 1, proto.StepInit, proto.TaskStatePending) + subtask1, err := sm.GetFirstSubtaskInStates("instance1", 1, proto.StepInit, proto.TaskStatePending) require.NoError(t, err) require.Equal(t, int64(1), subtask1.ID) require.Equal(t, proto.TaskTypeExample, subtask1.Type) require.Equal(t, []byte("m1"), subtask1.Meta) - subtask2, err := sm.GetSubtaskInStates("instance2", 1, proto.StepInit, proto.TaskStatePending) + subtask2, err := sm.GetFirstSubtaskInStates("instance2", 1, proto.StepInit, proto.TaskStatePending) require.NoError(t, err) require.Equal(t, int64(2), subtask2.ID) require.Equal(t, proto.TaskTypeExample, subtask2.Type) @@ -373,13 +373,13 @@ func TestBothGlobalAndSubTaskTable(t *testing.T) { require.NoError(t, err) require.Equal(t, proto.TaskStateReverting, task.State) - subtask1, err = sm.GetSubtaskInStates("instance3", 1, proto.StepInit, proto.TaskStateRevertPending) + subtask1, err = sm.GetFirstSubtaskInStates("instance3", 1, proto.StepInit, proto.TaskStateRevertPending) require.NoError(t, err) require.Equal(t, int64(3), subtask1.ID) require.Equal(t, proto.TaskTypeExample, subtask1.Type) require.Equal(t, []byte("m3"), subtask1.Meta) - subtask2, err = sm.GetSubtaskInStates("instance4", 1, proto.StepInit, proto.TaskStateRevertPending) + subtask2, err = sm.GetFirstSubtaskInStates("instance4", 1, proto.StepInit, proto.TaskStateRevertPending) require.NoError(t, err) require.Equal(t, int64(4), subtask2.ID) require.Equal(t, proto.TaskTypeExample, subtask2.Type) diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index 3e6ff66229a21..6bc1b9a47dabe 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -324,8 +324,8 @@ func (stm *TaskManager) AddNewSubTask(globalTaskID int64, step int64, designated return nil } -// GetSubtaskInStates gets the subtask in the states. -func (stm *TaskManager) GetSubtaskInStates(tidbID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error) { +// GetSubtasksInStates gets all subtasks by given states. +func (stm *TaskManager) GetSubtasksInStates(tidbID string, taskID int64, step int64, states ...interface{}) ([]*proto.Subtask, error) { args := []interface{}{tidbID, taskID, step} args = append(args, states...) rs, err := stm.executeSQLWithNewSession(stm.ctx, `select * from mysql.tidb_background_subtask @@ -334,6 +334,25 @@ func (stm *TaskManager) GetSubtaskInStates(tidbID string, taskID int64, step int if err != nil { return nil, err } + + subtasks := make([]*proto.Subtask, len(rs)) + for i, row := range rs { + subtasks[i] = row2SubTask(row) + } + return subtasks, nil +} + +// GetFirstSubtaskInStates gets the first subtask by given states. +func (stm *TaskManager) GetFirstSubtaskInStates(tidbID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error) { + args := []interface{}{tidbID, taskID, step} + args = append(args, states...) + rs, err := stm.executeSQLWithNewSession(stm.ctx, `select * from mysql.tidb_background_subtask + where exec_id = %? and task_key = %? and step = %? + and state in (`+strings.Repeat("%?,", len(states)-1)+"%?) limit 1", args...) + if err != nil { + return nil, err + } + if len(rs) == 0 { return nil, nil } diff --git a/metrics/BUILD.bazel b/metrics/BUILD.bazel index 80c3de8b6e599..43202ffe47865 100644 --- a/metrics/BUILD.bazel +++ b/metrics/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "bindinfo.go", "ddl.go", "distsql.go", + "disttask.go", "domain.go", "executor.go", "gc_worker.go", @@ -28,6 +29,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//br/pkg/lightning/metric", + "//disttask/framework/proto", "//parser/terror", "//timer/metrics", "//util/logutil", diff --git a/metrics/disttask.go b/metrics/disttask.go new file mode 100644 index 0000000000000..c9d1450b8ad47 --- /dev/null +++ b/metrics/disttask.go @@ -0,0 +1,93 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "strconv" + + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + lblTaskStatus = "status" + lblTaskType = "task_type" + lblTaskID = "task_id" + lblSubTaskID = "subtask_id" +) + +var ( + // DistTaskSubTaskCntGauge is the gauge of dist task subtask count. + DistTaskSubTaskCntGauge *prometheus.GaugeVec + // DistTaskSubTaskStartTimeGauge is the gauge of dist task subtask start time. + DistTaskSubTaskStartTimeGauge *prometheus.GaugeVec +) + +// InitDistTaskMetrics initializes disttask metrics. +func InitDistTaskMetrics() { + DistTaskSubTaskCntGauge = NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "disttask", + Name: "subtask_cnt", + Help: "Gauge of subtask count.", + }, []string{lblTaskType, lblTaskID, lblTaskStatus}) + + DistTaskSubTaskStartTimeGauge = NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "disttask", + Name: "subtask_start_time", + Help: "Gauge of subtask start time.", + }, []string{lblTaskType, lblTaskID, lblTaskStatus, lblSubTaskID}) +} + +// IncDistTaskSubTaskCnt increases the count of dist task subtask. +func IncDistTaskSubTaskCnt(subtask *proto.Subtask) { + DistTaskSubTaskCntGauge.WithLabelValues( + subtask.Type, + strconv.Itoa(int(subtask.TaskID)), + subtask.State, + ).Inc() +} + +// DecDistTaskSubTaskCnt decreases the count of dist task subtask. +func DecDistTaskSubTaskCnt(subtask *proto.Subtask) { + DistTaskSubTaskCntGauge.WithLabelValues( + subtask.Type, + strconv.Itoa(int(subtask.TaskID)), + subtask.State, + ).Dec() +} + +// StartDistTaskSubTask sets the start time of dist task subtask. +func StartDistTaskSubTask(subtask *proto.Subtask) { + DistTaskSubTaskStartTimeGauge.WithLabelValues( + subtask.Type, + strconv.Itoa(int(subtask.TaskID)), + subtask.State, + strconv.Itoa(int(subtask.ID)), + ).SetToCurrentTime() +} + +// EndDistTaskSubTask deletes the start time of dist task subtask. +func EndDistTaskSubTask(subtask *proto.Subtask) { + DistTaskSubTaskStartTimeGauge.DeleteLabelValues( + subtask.Type, + strconv.Itoa(int(subtask.TaskID)), + subtask.State, + strconv.Itoa(int(subtask.ID)), + ) +} diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index 9eddcc30dbd09..06ade9cc86cb2 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -13748,6 +13748,466 @@ "title": "DDL", "type": "row" }, + { + "collapsed": true, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 10 + }, + "id": 153, + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 20 + }, + "hiddenSeries": false, + "id": 317, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"pending|running|revert_pending|reverting|paused\"}) by (task_id)", + "interval": "", + "legendFormat": "", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Running Dist Task Subtasks", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 20 + }, + "hiddenSeries": false, + "id": 318, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"succeed|failed|canceled|reverted|revert_failed\"}) by (task_id)", + "interval": "", + "legendFormat": "", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Finished Dist Task Subtasks", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 28 + }, + "hiddenSeries": false, + "id": 327, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum by (task_id) (increase(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"succeed\"}[1m]))", + "interval": "", + "legendFormat": "task_id_{{task_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Subtasks Complete Speed", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 28 + }, + "hiddenSeries": false, + "id": 324, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "increase(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"succeed\"}[1m])", + "interval": "", + "legendFormat": "{{instance}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Subtask Complete Speed in Each TiDB", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "mappings": [], + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 36 + }, + "id": 23763571993, + "options": { + "reduceOptions": { + "values": false, + "calcs": [ + "lastNotNull" + ], + "fields": "" + }, + "text": {}, + "pieType": "pie", + "displayLabels": [], + "legend": { + "displayMode": "list", + "placement": "right", + "values": [] + } + }, + "pluginVersion": "7.5.11", + "targets": [ + { + "exemplar": true, + "expr": "sum(tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"pending|running|reverting|revert_pending|paused\"}) by (instance)", + "interval": "", + "legendFormat": "", + "queryType": "randomWalk", + "refId": "A" + } + ], + "title": "Distributed Task Running Subtask Distribution on TiDB Nodes", + "type": "piechart" + } + ], + "repeat": null, + "title": "Dist Execute Framework", + "type": "row" + }, { "collapsed": true, "datasource": null, diff --git a/metrics/grafana/tidb_runtime.json b/metrics/grafana/tidb_runtime.json index ce2a88509f931..ab97eaa879118 100644 --- a/metrics/grafana/tidb_runtime.json +++ b/metrics/grafana/tidb_runtime.json @@ -1443,7 +1443,7 @@ "refId": "A", "step": 10 }, - { + { "expr": "histogram_quantile(0.9999, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", "format": "time_series", "intervalFactor": 1, @@ -1451,7 +1451,7 @@ "refId": "B", "step": 10 }, - { + { "expr": "histogram_quantile(0.99999, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", "format": "time_series", "intervalFactor": 1, @@ -1459,7 +1459,7 @@ "refId": "C", "step": 10 }, - { + { "expr": "histogram_quantile(0.999999, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", "format": "time_series", "intervalFactor": 1, @@ -1467,7 +1467,7 @@ "refId": "D", "step": 10 }, - { + { "expr": "histogram_quantile(1, sum(rate(go_sched_latencies_seconds_bucket{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[30s])) by (le, instance))", "format": "time_series", "intervalFactor": 1, @@ -1517,6 +1517,314 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 50 + }, + "hiddenSeries": false, + "id": 31, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "time()-tidb_disttask_subtask_start_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"pending\"}", + "interval": "", + "legendFormat": "{{status}}_subtask_id_{{subtask_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Distributed Task SubTask Pending Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + }, + "scopedVars": { + "instance": { + "text": "127.0.0.1:10080", + "value": "127.0.0.1:10080", + "selected": false + } + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 43 + }, + "hiddenSeries": false, + "id": 32, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "tidb_disttask_subtask_cnt{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"pending\"}", + "interval": "", + "legendFormat": "pending_subtasks", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Distributed Task Pending SubTask Cnt", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + }, + "scopedVars": { + "instance": { + "text": "127.0.0.1:10080", + "value": "127.0.0.1:10080", + "selected": false + } + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 50 + }, + "hiddenSeries": false, + "id": 33, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "time()-tidb_disttask_subtask_start_time{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=\"running\"}", + "interval": "", + "legendFormat": "{{status}}_subtask_id_{{subtask_id}}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Distributed Task SubTask Running Duration", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + }, + "scopedVars": { + "instance": { + "text": "127.0.0.1:10080", + "value": "127.0.0.1:10080", + "selected": false + } + } } ], "repeat": "instance", diff --git a/metrics/metrics.go b/metrics/metrics.go index 5957b13f358e5..1a438827c09f5 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -92,6 +92,7 @@ func InitMetrics() { InitTelemetryMetrics() InitTopSQLMetrics() InitTTLMetrics() + InitDistTaskMetrics() timermetrics.InitTimerMetrics() PanicCounter = NewCounterVec( @@ -264,6 +265,9 @@ func RegisterMetrics() { prometheus.MustRegister(PlanReplayerTaskCounter) prometheus.MustRegister(PlanReplayerRegisterTaskGauge) + prometheus.MustRegister(DistTaskSubTaskCntGauge) + prometheus.MustRegister(DistTaskSubTaskStartTimeGauge) + tikvmetrics.InitMetrics(TiDB, TiKVClient) tikvmetrics.RegisterMetrics() tikvmetrics.TiKVPanicCounter = PanicCounter // reset tidb metrics for tikv metrics