Skip to content

Commit

Permalink
disttask: refine scheduler error handling (#47313)
Browse files Browse the repository at this point in the history
ref #46258
  • Loading branch information
ywqzzy authored Oct 12, 2023
1 parent 0fd232f commit 0c34940
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 104 deletions.
6 changes: 2 additions & 4 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ func TestFrameworkSubtaskFinishedCancel(t *testing.T) {
defer ctrl.Finish()
RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
distContext := testkit.NewDistExecutionContext(t, 3)
err := failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel", "1*return(true)")
require.NoError(t, err)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockSubtaskFinishedCancel"))
}()
Expand All @@ -649,8 +648,7 @@ func TestFrameworkRunSubtaskCancel(t *testing.T) {
defer ctrl.Finish()
RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
distContext := testkit.NewDistExecutionContext(t, 3)
err := failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel", "1*return(true)")
require.NoError(t, err)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel", "1*return(true)"))
DispatchTaskAndCheckState("key1", t, &m, proto.TaskStateReverted)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/framework/scheduler/MockRunSubtaskCancel"))
distContext.Close()
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ go_library(
importpath = "github.com/pingcap/tidb/disttask/framework/scheduler",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/common",
"//config",
"//disttask/framework/dispatcher",
"//disttask/framework/handle",
"//disttask/framework/proto",
"//disttask/framework/scheduler/execute",
"//disttask/framework/storage",
"//domain/infosync",
"//metrics",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//util/backoff",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
31 changes: 19 additions & 12 deletions disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import (
)

var (
schedulerPoolSize int32 = 4
subtaskExecutorPoolSize int32 = 10
schedulerPoolSize int32 = 4
// same as dispatcher
checkTime = 300 * time.Millisecond
retrySQLTimes = 3
Expand Down Expand Up @@ -65,9 +64,9 @@ type Manager struct {
schedulerPool Pool
mu struct {
sync.RWMutex
// taskID -> cancelFunc.
// cancelFunc is used to fast cancel the scheduler.Run.
handlingTasks map[int64]context.CancelFunc
// taskID -> CancelCauseFunc.
// CancelCauseFunc is used to fast cancel the scheduler.Run.
handlingTasks map[int64]context.CancelCauseFunc
}
// id, it's the same as server id now, i.e. host:port.
id string
Expand All @@ -87,7 +86,7 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
newPool: b.newPool,
}
m.ctx, m.cancel = context.WithCancel(ctx)
m.mu.handlingTasks = make(map[int64]context.CancelFunc)
m.mu.handlingTasks = make(map[int64]context.CancelCauseFunc)

schedulerPool, err := m.newPool("scheduler_pool", schedulerPoolSize, util.DistTask)
if err != nil {
Expand Down Expand Up @@ -227,7 +226,8 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onCanceledTasks", zap.Int64("task-id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
cancel()
// subtask needs to change its state to canceled.
cancel(ErrCancelSubtask)
}
}
}
Expand All @@ -239,7 +239,9 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("onPausingTasks", zap.Any("task_id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
cancel()
// Pause all running subtasks, don't mark subtasks as canceled.
// Should not change the subtask's state.
cancel(nil)
}
if err := m.taskTable.PauseSubtasks(m.id, task.ID); err != nil {
return err
Expand All @@ -255,7 +257,9 @@ func (m *Manager) cancelAllRunningTasks() {
for id, cancel := range m.mu.handlingTasks {
logutil.Logger(m.logCtx).Info("cancelAllRunningTasks", zap.Int64("task-id", id))
if cancel != nil {
cancel()
// tidb shutdown, don't mark subtask as canceled.
// Should not change the subtask's state.
cancel(nil)
}
}
}
Expand Down Expand Up @@ -322,6 +326,9 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
m.logErr(err)
return
}
if task == nil {
return
}
if task.State != proto.TaskStateRunning && task.State != proto.TaskStateReverting {
logutil.Logger(m.logCtx).Info("onRunnableTask exit",
zap.Int64("task-id", task.ID), zap.Int64("step", task.Step), zap.String("state", task.State))
Expand All @@ -338,10 +345,10 @@ func (m *Manager) onRunnableTask(ctx context.Context, task *proto.Task) {
}
switch task.State {
case proto.TaskStateRunning:
runCtx, runCancel := context.WithCancel(ctx)
runCtx, runCancel := context.WithCancelCause(ctx)
m.registerCancelFunc(task.ID, runCancel)
err = scheduler.Run(runCtx, task)
runCancel()
runCancel(nil)
case proto.TaskStatePausing:
err = scheduler.Pause(ctx, task)
case proto.TaskStateReverting:
Expand All @@ -361,7 +368,7 @@ func (m *Manager) addHandlingTask(id int64) {
}

// registerCancelFunc registers a cancel function for a task.
func (m *Manager) registerCancelFunc(id int64, cancel context.CancelFunc) {
func (m *Manager) registerCancelFunc(id int64, cancel context.CancelCauseFunc) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.handlingTasks[id] = cancel
Expand Down
8 changes: 4 additions & 4 deletions disttask/framework/scheduler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ func TestManageTask(t *testing.T) {
newTasks = m.filterAlreadyHandlingTasks(tasks)
require.Equal(t, []*proto.Task{{ID: 1}}, newTasks)

ctx1, cancel1 := context.WithCancel(context.Background())
ctx1, cancel1 := context.WithCancelCause(context.Background())
m.registerCancelFunc(2, cancel1)
m.cancelAllRunningTasks()
require.Equal(t, context.Canceled, ctx1.Err())

// test cancel.
m.addHandlingTask(1)
ctx2, cancel2 := context.WithCancel(context.Background())
ctx2, cancel2 := context.WithCancelCause(context.Background())
m.registerCancelFunc(1, cancel2)
ctx3, cancel3 := context.WithCancel(context.Background())
ctx3, cancel3 := context.WithCancelCause(context.Background())
m.registerCancelFunc(2, cancel3)
m.onCanceledTasks(context.Background(), []*proto.Task{{ID: 1}})
require.Equal(t, context.Canceled, ctx2.Err())
require.NoError(t, ctx3.Err())

// test pause.
m.addHandlingTask(3)
ctx4, cancel4 := context.WithCancel(context.Background())
ctx4, cancel4 := context.WithCancelCause(context.Background())
m.registerCancelFunc(1, cancel4)
mockTaskTable.EXPECT().PauseSubtasks("test", int64(1)).Return(nil)
m.onPausingTasks([]*proto.Task{{ID: 1}})
Expand Down
Loading

0 comments on commit 0c34940

Please sign in to comment.