diff --git a/pkg/disttask/framework/framework_test.go b/pkg/disttask/framework/framework_test.go index 286b2c86ee1a7..c59a9a4fcd570 100644 --- a/pkg/disttask/framework/framework_test.go +++ b/pkg/disttask/framework/framework_test.go @@ -525,13 +525,16 @@ func TestFrameworkSetLabel(t *testing.T) { RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{}) distContext := testkit.NewDistExecutionContext(t, 3) tk := testkit.NewTestKit(t, distContext.Store) + // 1. all "" role. DispatchTaskAndCheckSuccess("😁", t, &m) + // 2. one "background" role. tk.MustExec("set global tidb_service_scope=background") tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background")) tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background")) DispatchTaskAndCheckSuccess("😊", t, &m) + // 3. 2 "background" role. tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"") DispatchTaskAndCheckSuccess("😆", t, &m) diff --git a/pkg/disttask/framework/scheduler/BUILD.bazel b/pkg/disttask/framework/scheduler/BUILD.bazel index b72f51527a99e..7d4d274cf0b80 100644 --- a/pkg/disttask/framework/scheduler/BUILD.bazel +++ b/pkg/disttask/framework/scheduler/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/metrics", "//pkg/resourcemanager/pool/spool", "//pkg/resourcemanager/util", + "//pkg/util", "//pkg/util/backoff", "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", diff --git a/pkg/disttask/framework/scheduler/manager.go b/pkg/disttask/framework/scheduler/manager.go index d917d2cdac826..95c9aa99c9bba 100644 --- a/pkg/disttask/framework/scheduler/manager.go +++ b/pkg/disttask/framework/scheduler/manager.go @@ -25,8 +25,10 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/domain/infosync" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/resourcemanager/pool/spool" "github.com/pingcap/tidb/pkg/resourcemanager/util" + tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) @@ -34,9 +36,10 @@ import ( var ( schedulerPoolSize int32 = 4 // same as dispatcher - checkTime = 300 * time.Millisecond - retrySQLTimes = 3 - retrySQLInterval = 500 * time.Millisecond + checkTime = 300 * time.Millisecond + recoverMetaInterval = 90 * time.Second + retrySQLTimes = 30 + retrySQLInterval = 500 * time.Millisecond ) // ManagerBuilder is used to build a Manager. @@ -70,7 +73,7 @@ type Manager struct { } // id, it's the same as server id now, i.e. host:port. id string - wg sync.WaitGroup + wg tidbutil.WaitGroupWrapper ctx context.Context cancel context.CancelFunc logCtx context.Context @@ -97,36 +100,33 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable return m, nil } -// Start starts the Manager. -func (m *Manager) Start() error { - logutil.Logger(m.logCtx).Debug("manager start") - var err error +func (m *Manager) initMeta() (err error) { for i := 0; i < retrySQLTimes; i++ { err = m.taskTable.StartManager(m.id, config.GetGlobalConfig().Instance.TiDBServiceScope) if err == nil { break } if i%10 == 0 { - logutil.Logger(m.logCtx).Warn("start manager failed", zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope), - zap.Int("retry times", retrySQLTimes), zap.Error(err)) + logutil.Logger(m.logCtx).Warn("start manager failed", + zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope), + zap.Int("retry times", i), + zap.Error(err)) } time.Sleep(retrySQLInterval) } - if err != nil { + return err +} + +// Start starts the Manager. +func (m *Manager) Start() error { + logutil.Logger(m.logCtx).Debug("manager start") + if err := m.initMeta(); err != nil { return err } - m.wg.Add(1) - go func() { - defer m.wg.Done() - m.fetchAndHandleRunnableTasks(m.ctx) - }() - - m.wg.Add(1) - go func() { - defer m.wg.Done() - m.fetchAndFastCancelTasks(m.ctx) - }() + m.wg.Run(m.fetchAndHandleRunnableTasksLoop) + m.wg.Run(m.fetchAndFastCancelTasksLoop) + m.wg.Run(m.recoverMetaLoop) return nil } @@ -138,12 +138,13 @@ func (m *Manager) Stop() { } // fetchAndHandleRunnableTasks fetches the runnable tasks from the global task table and handles them. -func (m *Manager) fetchAndHandleRunnableTasks(ctx context.Context) { +func (m *Manager) fetchAndHandleRunnableTasksLoop() { + defer tidbutil.Recover(metrics.LabelDomain, "fetchAndHandleRunnableTasksLoop", m.fetchAndHandleRunnableTasksLoop, false) ticker := time.NewTicker(checkTime) for { select { - case <-ctx.Done(): - logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasks done") + case <-m.ctx.Done(): + logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasksLoop done") return case <-ticker.C: tasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStateRunning, proto.TaskStateReverting) @@ -151,19 +152,21 @@ func (m *Manager) fetchAndHandleRunnableTasks(ctx context.Context) { m.logErr(err) continue } - m.onRunnableTasks(ctx, tasks) + m.onRunnableTasks(m.ctx, tasks) } } } // fetchAndFastCancelTasks fetches the reverting/pausing tasks from the global task table and fast cancels them. -func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) { +func (m *Manager) fetchAndFastCancelTasksLoop() { + defer tidbutil.Recover(metrics.LabelDomain, "fetchAndFastCancelTasksLoop", m.fetchAndFastCancelTasksLoop, false) + ticker := time.NewTicker(checkTime) for { select { - case <-ctx.Done(): + case <-m.ctx.Done(): m.cancelAllRunningTasks() - logutil.Logger(m.logCtx).Info("fetchAndFastCancelTasks done") + logutil.Logger(m.logCtx).Info("fetchAndFastCancelTasksLoop done") return case <-ticker.C: tasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStateReverting) @@ -171,7 +174,7 @@ func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) { m.logErr(err) continue } - m.onCanceledTasks(ctx, tasks) + m.onCanceledTasks(m.ctx, tasks) // cancel pending/running subtasks, and mark them as paused. pausingTasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStatePausing) @@ -189,6 +192,9 @@ func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) { // onRunnableTasks handles runnable tasks. func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) { + if len(tasks) == 0 { + return + } tasks = m.filterAlreadyHandlingTasks(tasks) for _, task := range tasks { exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step, @@ -221,6 +227,9 @@ func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) { // onCanceledTasks cancels the running subtasks. func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) { + if len(tasks) == 0 { + return + } m.mu.RLock() defer m.mu.RUnlock() for _, task := range tasks { @@ -234,6 +243,9 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) { // onPausingTasks pauses/cancels the pending/running subtasks. func (m *Manager) onPausingTasks(tasks []*proto.Task) error { + if len(tasks) == 0 { + return nil + } m.mu.RLock() defer m.mu.RUnlock() for _, task := range tasks { @@ -250,6 +262,28 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error { return nil } +// recoverMetaLoop inits and recovers dist_framework_meta for the tidb node running the scheduler manager. +// This is necessary when the TiDB node experiences a prolonged network partition +// and the dispatcher deletes `dist_framework_meta`. +// When the TiDB node recovers from the network partition, +// we need to re-insert the metadata. +func (m *Manager) recoverMetaLoop() { + defer tidbutil.Recover(metrics.LabelDomain, "recoverMetaLoop", m.recoverMetaLoop, false) + ticker := time.NewTicker(recoverMetaInterval) + for { + select { + case <-m.ctx.Done(): + logutil.Logger(m.logCtx).Info("recoverMetaLoop done") + return + case <-ticker.C: + if err := m.initMeta(); err != nil { + m.logErr(err) + continue + } + } + } +} + // cancelAllRunningTasks cancels all running tasks. func (m *Manager) cancelAllRunningTasks() { m.mu.RLock() diff --git a/pkg/disttask/framework/storage/table_test.go b/pkg/disttask/framework/storage/table_test.go index b711066211471..ae88f887660fd 100644 --- a/pkg/disttask/framework/storage/table_test.go +++ b/pkg/disttask/framework/storage/table_test.go @@ -417,6 +417,7 @@ func TestDistFrameworkMeta(t *testing.T) { require.NoError(t, sm.StartManager(":4000", "background")) require.NoError(t, sm.StartManager(":4001", "")) + require.NoError(t, sm.StartManager(":4002", "")) require.NoError(t, sm.StartManager(":4002", "background")) allNodes, err := sm.GetAllNodes() diff --git a/pkg/disttask/framework/storage/task_table.go b/pkg/disttask/framework/storage/task_table.go index 66736d7e6f3bf..73189b563eb1d 100644 --- a/pkg/disttask/framework/storage/task_table.go +++ b/pkg/disttask/framework/storage/task_table.go @@ -549,8 +549,7 @@ func (stm *TaskManager) StartSubtask(subtaskID int64) error { // StartManager insert the manager information into dist_framework_meta. func (stm *TaskManager) StartManager(tidbID string, role string) error { - _, err := stm.executeSQLWithNewSession(stm.ctx, `insert into mysql.dist_framework_meta values(%?, %?, DEFAULT) - on duplicate key update role = %?`, tidbID, role, role) + _, err := stm.executeSQLWithNewSession(stm.ctx, `replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, tidbID, role) return err } diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 46e777b6f941d..89be424eefef1 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1480,7 +1480,7 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error { func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storage.TaskManager, schedulerManager *scheduler.Manager, serverID string) { err := schedulerManager.Start() if err != nil { - logutil.BgLogger().Error("dist task scheduler manager failed", zap.Error(err)) + logutil.BgLogger().Error("dist task scheduler manager start failed", zap.Error(err)) return } logutil.BgLogger().Info("dist task scheduler manager started") diff --git a/pkg/executor/set.go b/pkg/executor/set.go index aa6cfaf796d3d..27288b32f6c63 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -166,9 +166,7 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres dom := domain.GetDomain(e.Ctx()) serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID()) _, err = e.Ctx().(sqlexec.SQLExecutor).ExecuteInternal(ctx, - `update mysql.dist_framework_meta - set role = %? - where host = %?`, valStr, serverID) + `replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, serverID, valStr) } return err }