Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: refine scheduler error handling #47313

Merged
merged 35 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b34b940
fix
ywqzzy Sep 26, 2023
74c2dfe
Merge branch 'master' of https://github.com/pingcap/tidb into use_bac…
ywqzzy Sep 27, 2023
ecedd73
fix
ywqzzy Sep 27, 2023
8e01c51
Merge branch 'master' of https://github.com/pingcap/tidb into use_bac…
ywqzzy Sep 27, 2023
79416cc
fix test
ywqzzy Sep 27, 2023
7edbced
fix
ywqzzy Sep 27, 2023
f2648eb
refine
ywqzzy Sep 27, 2023
224c1db
fix
ywqzzy Sep 27, 2023
be2eb7a
add cancel cause
ywqzzy Sep 27, 2023
2c38a4f
add cancel cause
ywqzzy Sep 27, 2023
f64af12
retryable error
ywqzzy Oct 8, 2023
6d89e08
Merge branch 'master' of https://github.com/pingcap/tidb into use_bac…
ywqzzy Oct 9, 2023
17a72b9
log refine
ywqzzy Oct 9, 2023
513b7fd
fix gracefully shutdown
ywqzzy Oct 9, 2023
e380b17
Merge branch 'master' of https://github.com/pingcap/tidb into use_bac…
ywqzzy Oct 9, 2023
38edc95
refine
ywqzzy Oct 9, 2023
8942719
fix
ywqzzy Oct 9, 2023
24b2366
fix
ywqzzy Oct 10, 2023
6b84bf3
fix
ywqzzy Oct 10, 2023
1a14ed1
rm log
ywqzzy Oct 10, 2023
2b7dfd9
fix build
ywqzzy Oct 10, 2023
dfb71a3
Update disttask/framework/scheduler/scheduler.go
ywqzzy Oct 10, 2023
cdb2d21
Update disttask/framework/scheduler/scheduler.go
ywqzzy Oct 10, 2023
7c7dd4c
Merge branch 'master' of https://github.com/pingcap/tidb into use_bac…
ywqzzy Oct 10, 2023
3931c39
fix
ywqzzy Oct 10, 2023
652e1e7
Merge branch 'use_backoffer' of https://github.com/ywqzzy/tidb into u…
ywqzzy Oct 10, 2023
0968b82
address comments
ywqzzy Oct 10, 2023
01550ba
check nil
ywqzzy Oct 10, 2023
e730ac7
fix spell
ywqzzy Oct 10, 2023
e188b34
rm
ywqzzy Oct 10, 2023
1a6783a
Merge branch 'master' of https://github.com/pingcap/tidb into use_bac…
ywqzzy Oct 11, 2023
8f2d96a
fix comments
ywqzzy Oct 11, 2023
befaba5
fix
ywqzzy Oct 12, 2023
6ee2cac
add comments
ywqzzy Oct 12, 2023
4d5501a
Merge branch 'master' of https://github.com/pingcap/tidb into use_bac…
ywqzzy Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (d *BaseDispatcher) onCancelling() error {
// handle task in pausing state, cancel all running subtasks.
func (d *BaseDispatcher) onPausing() error {
logutil.Logger(d.logCtx).Info("on pausing state", zap.String("state", d.Task.State), zap.Int64("stage", d.Task.Step))
cnt, err := d.taskMgr.GetSubtaskInStatesCnt(d.Task.ID, proto.TaskStateRunning, proto.TaskStatePending) // ywq todo remove
cnt, err := d.taskMgr.GetSubtaskInStatesCnt(d.Task.ID, proto.TaskStateRunning, proto.TaskStatePending)
if err != nil {
logutil.Logger(d.logCtx).Warn("check task failed", zap.Error(err))
return err
Expand Down
6 changes: 2 additions & 4 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ func TestFrameworkSubtaskFinishedCancel(t *testing.T) {
defer ctrl.Finish()
RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
distContext := testkit.NewDistExecutionContext(t, 3)
err := failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel", "1*return(true)")
require.NoError(t, err)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel"))
}()
Expand All @@ -649,8 +648,7 @@ func TestFrameworkRunSubtaskCancel(t *testing.T) {
defer ctrl.Finish()
RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
distContext := testkit.NewDistExecutionContext(t, 3)
err := failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel", "1*return(true)")
require.NoError(t, err)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel", "1*return(true)"))
DispatchTaskAndCheckState("key1", t, &m, proto.TaskStateReverted)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel"))
distContext.Close()
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ go_library(
importpath = "github.com/pingcap/tidb/disttask/framework/scheduler",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"//config",
"//disttask/framework/dispatcher",
"//disttask/framework/handle",
"//disttask/framework/proto",
"//disttask/framework/scheduler/execute",
"//disttask/framework/storage",
"//domain/infosync",
"//metrics",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//util/backoff",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
27 changes: 17 additions & 10 deletions disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import (
)

var (
schedulerPoolSize int32 = 4
subtaskExecutorPoolSize int32 = 10
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless

schedulerPoolSize int32 = 4
// same as dispatcher
checkTime = 300 * time.Millisecond
retrySQLTimes = 3
Expand Down Expand Up @@ -67,7 +66,7 @@ type Manager struct {
sync.RWMutex
// taskID -> cancelFunc.
// cancelFunc is used to fast cancel the scheduler.Run.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the comment out of date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will fix it

handlingTasks map[int64]context.CancelFunc
handlingTasks map[int64]context.CancelCauseFunc
}
// id, it's the same as server id now, i.e. host:port.
id string
Expand All @@ -87,7 +86,7 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
newPool: b.newPool,
}
m.ctx, m.cancel = context.WithCancel(ctx)
m.mu.handlingTasks = make(map[int64]context.CancelFunc)
m.mu.handlingTasks = make(map[int64]context.CancelCauseFunc)

schedulerPool, err := m.newPool("scheduler_pool", schedulerPoolSize, util.DistTask)
if err != nil {
Expand Down Expand Up @@ -227,7 +226,8 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
cancel()
// subtask needs to change its state to canceled.
cancel(ErrCancelSubtask)
}
}
}
Expand All @@ -239,7 +239,9 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onPausingTasks", zap.Any("task_id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
cancel()
// Pause all running subtasks, don't mark subtasks as canceled.
// Should not change the subtask's state.
cancel(nil)
}
if err := m.taskTable.PauseSubtasks(m.id, task.ID); err != nil {
return err
Expand All @@ -255,7 +257,9 @@ func (m *Manager) cancelAllRunningTasks() {
for id, cancel := range m.mu.handlingTasks {
logutil.Logger(m.logCtx).Info("cancelAllRunningTasks", zap.Int64("task-id", id))
if cancel != nil {
cancel()
// tidb shutdown, don't mark subtask as canceled.
// Should not change the subtask's state.
cancel(nil)
}
}
}
Expand Down Expand Up @@ -322,6 +326,9 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
m.logErr(err)
return
}
if task == nil {
return
}
if task.State != proto.TaskStateRunning && task.State != proto.TaskStateReverting {
logutil.Logger(m.logCtx).Info("onRunnableTask exit",
zap.Int64("task-id", task.ID), zap.Int64("step", task.Step), zap.String("state", task.State))
Expand All @@ -338,10 +345,10 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
}
switch task.State {
case proto.TaskStateRunning:
runCtx, runCancel := context.WithCancel(ctx)
runCtx, runCancel := context.WithCancelCause(ctx)
m.registerCancelFunc(task.ID, runCancel)
err = scheduler.Run(runCtx, task)
runCancel()
runCancel(nil)
case proto.TaskStatePausing:
err = scheduler.Pause(ctx, task)
case proto.TaskStateReverting:
Expand All @@ -361,7 +368,7 @@ func (m *Manager) addHandlingTask(id int64) {
}

// registerCancelFunc registers a cancel function for a task.
func (m *Manager) registerCancelFunc(id int64, cancel context.CancelFunc) {
func (m *Manager) registerCancelFunc(id int64, cancel context.CancelCauseFunc) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.handlingTasks[id] = cancel
Expand Down
8 changes: 4 additions & 4 deletions disttask/framework/scheduler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ func TestManageTask(t *testing.T) {
newTasks = m.filterAlreadyHandlingTasks(tasks)
require.Equal(t, []*proto.Task{{ID: 1}}, newTasks)

ctx1, cancel1 := context.WithCancel(context.Background())
ctx1, cancel1 := context.WithCancelCause(context.Background())
m.registerCancelFunc(2, cancel1)
m.cancelAllRunningTasks()
require.Equal(t, context.Canceled, ctx1.Err())

// test cancel.
m.addHandlingTask(1)
ctx2, cancel2 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancelCause(context.Background())
m.registerCancelFunc(1, cancel2)
ctx3, cancel3 := context.WithCancel(context.Background())
ctx3, cancel3 := context.WithCancelCause(context.Background())
m.registerCancelFunc(2, cancel3)
m.onCanceledTasks(context.Background(), []*proto.Task{{ID: 1}})
require.Equal(t, context.Canceled, ctx2.Err())
require.NoError(t, ctx3.Err())

// test pause.
m.addHandlingTask(3)
ctx4, cancel4 := context.WithCancel(context.Background())
ctx4, cancel4 := context.WithCancelCause(context.Background())
m.registerCancelFunc(1, cancel4)
mockTaskTable.EXPECT().PauseSubtasks("test", int64(1)).Return(nil)
m.onPausingTasks([]*proto.Task{{ID: 1}})
Expand Down
Loading