Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dist/ddl: add subtask metrics #47175

Merged
merged 22 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ type Subtask struct {
Summary string
}

// IsFinished checks if the subtask is finished.
func (t *Subtask) IsFinished() bool {
return t.State == TaskStateSucceed || t.State == TaskStateReverted || t.State == TaskStateCanceled ||
t.State == TaskStateFailed || t.State == TaskStateRevertFailed
}

// NewSubtask create a new subtask.
func NewSubtask(step int64, taskID int64, tp, schedulerID string, meta []byte) *Subtask {
return &Subtask{
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//disttask/framework/scheduler/execute",
"//disttask/framework/storage",
"//domain/infosync",
"//metrics",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//util/logutil",
Expand Down
3 changes: 2 additions & 1 deletion disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type TaskTable interface {
GetGlobalTasksInStates(states ...interface{}) (task []*proto.Task, err error)
GetGlobalTaskByID(taskID int64) (task *proto.Task, err error)

GetSubtaskInStates(tidbID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
GetSubtasksInStates(tidbID string, taskID int64, step int64, states ...interface{}) ([]*proto.Subtask, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it now

GetFirstSubtaskInStates(instanceID string, taskID int64, step int64, states ...interface{}) (*proto.Subtask, error)
StartManager(tidbID string, role string) error
StartSubtask(subtaskID int64) error
UpdateSubtaskStateAndError(subtaskID int64, state string, err error) error
Expand Down
73 changes: 53 additions & 20 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/disttask/framework/scheduler/execute"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -165,14 +166,25 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
wg.Wait()
}()

subtasks, err := s.taskTable.GetSubtasksInStates(s.id, task.ID, task.Step, proto.TaskStatePending)
if err != nil {
s.onError(err)
return s.getError()
}
for _, subtask := range subtasks {
metrics.IncDistTaskSubTaskCnt(subtask)
metrics.StartDistTaskSubTask(subtask)
}
Comment on lines +169 to +177
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this code into dispatcher.go.
When dispatching subtasks success, update the metric.
Then we don't need to fetch the taskTable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was my previous implementation method, which would cause the instance of collecting metrics to be different, thereby causing confusion in Grafana display.


for {
// check if any error occurs.
if err := s.getError(); err != nil {
break
}
subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending)

subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending)
if err != nil {
logutil.Logger(s.logCtx).Warn("GetSubtaskInStates meets error", zap.Error(err))
logutil.Logger(s.logCtx).Warn("GetFirstSubtaskInStates meets error", zap.Error(err))
continue
}
if subtask == nil {
Expand All @@ -187,11 +199,13 @@ func (s *BaseScheduler) run(ctx context.Context, task *proto.Task) error {
}
continue
}
s.startSubtask(subtask.ID)

s.startSubtask(subtask)
if err := s.getError(); err != nil {
logutil.Logger(s.logCtx).Warn("startSubtask meets error", zap.Error(err))
continue
}

failpoint.Inject("mockCleanScheduler", func() {
v, ok := testContexts.Load(s.id)
if ok {
Expand All @@ -216,9 +230,9 @@ func (s *BaseScheduler) runSubtask(ctx context.Context, scheduler execute.Subtas
if err != nil {
s.onError(err)
if errors.Cause(err) == context.Canceled {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, s.getError())
s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, s.getError())
} else {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError())
s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError())
}
s.markErrorHandled()
return
Expand Down Expand Up @@ -293,16 +307,14 @@ func (s *BaseScheduler) onSubtaskFinished(ctx context.Context, scheduler execute
})
if err := s.getError(); err != nil {
if errors.Cause(err) == context.Canceled {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil)
} else {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateFailed, s.getError())
s.updateSubtaskStateAndError(subtask, proto.TaskStateFailed, s.getError())
}
s.markErrorHandled()
return
}
if err := s.taskTable.FinishSubtask(subtask.ID, subtask.Meta); err != nil {
s.onError(err)
}
s.finishSubtask(subtask)
failpoint.Inject("syncAfterSubtaskFinish", func() {
TestSyncChan <- struct{}{}
<-TestSyncChan
Expand All @@ -320,7 +332,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {

// We should cancel all subtasks before rolling back
for {
subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRunning)
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStatePending, proto.TaskStateRunning)
if err != nil {
s.onError(err)
return s.getError()
Expand All @@ -330,7 +342,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
break
}

s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateCanceled, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateCanceled, nil)
if err = s.getError(); err != nil {
return err
}
Expand All @@ -341,7 +353,7 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
s.onError(err)
return s.getError()
}
subtask, err := s.taskTable.GetSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStateRevertPending)
subtask, err := s.taskTable.GetFirstSubtaskInStates(s.id, task.ID, task.Step, proto.TaskStateRevertPending)
if err != nil {
s.onError(err)
return s.getError()
Expand All @@ -350,17 +362,17 @@ func (s *BaseScheduler) Rollback(ctx context.Context, task *proto.Task) error {
logutil.BgLogger().Warn("scheduler rollback a step, but no subtask in revert_pending state", zap.Any("step", task.Step))
return nil
}
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateReverting, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateReverting, nil)
if err := s.getError(); err != nil {
return err
}

err = executor.Rollback(rollbackCtx)
if err != nil {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateRevertFailed, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateRevertFailed, nil)
s.onError(err)
} else {
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateReverted, nil)
s.updateSubtaskStateAndError(subtask, proto.TaskStateReverted, nil)
}
return s.getError()
}
Expand Down Expand Up @@ -446,16 +458,37 @@ func (s *BaseScheduler) resetError() {
s.mu.handled = false
}

func (s *BaseScheduler) startSubtask(id int64) {
err := s.taskTable.StartSubtask(id)
func (s *BaseScheduler) startSubtask(subtask *proto.Subtask) {
metrics.DecDistTaskSubTaskCnt(subtask)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why dec first, then inc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dec pre-state subtask, then inc new-state subtask

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dec pre-state subtask, then inc new-state subtask

IMHO, the method name is confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dec pre-state subtask, then inc new-state subtask

IMHO, the method name is confusing.

Indeed, there is a point, I think the reason for the confusion is that this function implicitly changes the state of the subtask halfway. How about doing so?

func (s *BaseScheduler) startSubtaskAndUpdateState(subtask *proto.Subtask) {
    ....
}

metrics.EndDistTaskSubTask(subtask)
err := s.taskTable.StartSubtask(subtask.ID)
if err != nil {
s.onError(err)
}
subtask.State = proto.TaskStateRunning
metrics.IncDistTaskSubTaskCnt(subtask)
metrics.StartDistTaskSubTask(subtask)
}

func (s *BaseScheduler) updateSubtaskStateAndError(subtaskID int64, state string, subTaskErr error) {
err := s.taskTable.UpdateSubtaskStateAndError(subtaskID, state, subTaskErr)
func (s *BaseScheduler) updateSubtaskStateAndError(subtask *proto.Subtask, state string, subTaskErr error) {
metrics.DecDistTaskSubTaskCnt(subtask)
metrics.EndDistTaskSubTask(subtask)
err := s.taskTable.UpdateSubtaskStateAndError(subtask.ID, state, subTaskErr)
if err != nil {
s.onError(err)
}
subtask.State = state
metrics.IncDistTaskSubTaskCnt(subtask)
metrics.StartDistTaskSubTask(subtask)
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *BaseScheduler) finishSubtask(subtask *proto.Subtask) {
metrics.DecDistTaskSubTaskCnt(subtask)
metrics.EndDistTaskSubTask(subtask)
if err := s.taskTable.FinishSubtask(subtask.ID, subtask.Meta); err != nil {
s.onError(err)
}
subtask.State = proto.TaskStateSucceed
metrics.IncDistTaskSubTaskCnt(subtask)
metrics.StartDistTaskSubTask(subtask)
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
}
Loading