Skip to content

Commit

Permalink
disttask: fix removing meta when met network partition for so long th…
Browse files Browse the repository at this point in the history
…en recover from it (#48005) (#48024)

close #47954
  • Loading branch information
ti-chi-bot authored Oct 27, 2023
1 parent 4cb200d commit b1966e7
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 36 deletions.
3 changes: 3 additions & 0 deletions pkg/disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,16 @@ func TestFrameworkSetLabel(t *testing.T) {
RegisterTaskMeta(t, ctrl, &m, &testDispatcherExt{})
distContext := testkit.NewDistExecutionContext(t, 3)
tk := testkit.NewTestKit(t, distContext.Store)

// 1. all "" role.
DispatchTaskAndCheckSuccess("😁", t, &m)

// 2. one "background" role.
tk.MustExec("set global tidb_service_scope=background")
tk.MustQuery("select @@global.tidb_service_scope").Check(testkit.Rows("background"))
tk.MustQuery("select @@tidb_service_scope").Check(testkit.Rows("background"))
DispatchTaskAndCheckSuccess("😊", t, &m)

// 3. 2 "background" role.
tk.MustExec("update mysql.dist_framework_meta set role = \"background\" where host = \":4001\"")
DispatchTaskAndCheckSuccess("😆", t, &m)
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/metrics",
"//pkg/resourcemanager/pool/spool",
"//pkg/resourcemanager/util",
"//pkg/util",
"//pkg/util/backoff",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
Expand Down
94 changes: 64 additions & 30 deletions pkg/disttask/framework/scheduler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
)

var (
schedulerPoolSize int32 = 4
// same as dispatcher
checkTime = 300 * time.Millisecond
retrySQLTimes = 3
retrySQLInterval = 500 * time.Millisecond
checkTime = 300 * time.Millisecond
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond
)

// ManagerBuilder is used to build a Manager.
Expand Down Expand Up @@ -70,7 +73,7 @@ type Manager struct {
}
// id, it's the same as server id now, i.e. host:port.
id string
wg sync.WaitGroup
wg tidbutil.WaitGroupWrapper
ctx context.Context
cancel context.CancelFunc
logCtx context.Context
Expand All @@ -97,36 +100,33 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
return m, nil
}

// Start starts the Manager.
func (m *Manager) Start() error {
logutil.Logger(m.logCtx).Debug("manager start")
var err error
func (m *Manager) initMeta() (err error) {
for i := 0; i < retrySQLTimes; i++ {
err = m.taskTable.StartManager(m.id, config.GetGlobalConfig().Instance.TiDBServiceScope)
if err == nil {
break
}
if i%10 == 0 {
logutil.Logger(m.logCtx).Warn("start manager failed", zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
zap.Int("retry times", retrySQLTimes), zap.Error(err))
logutil.Logger(m.logCtx).Warn("start manager failed",
zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
zap.Int("retry times", i),
zap.Error(err))
}
time.Sleep(retrySQLInterval)
}
if err != nil {
return err
}

// Start starts the Manager.
func (m *Manager) Start() error {
logutil.Logger(m.logCtx).Debug("manager start")
if err := m.initMeta(); err != nil {
return err
}

m.wg.Add(1)
go func() {
defer m.wg.Done()
m.fetchAndHandleRunnableTasks(m.ctx)
}()

m.wg.Add(1)
go func() {
defer m.wg.Done()
m.fetchAndFastCancelTasks(m.ctx)
}()
m.wg.Run(m.fetchAndHandleRunnableTasksLoop)
m.wg.Run(m.fetchAndFastCancelTasksLoop)
m.wg.Run(m.recoverMetaLoop)
return nil
}

Expand All @@ -138,40 +138,43 @@ func (m *Manager) Stop() {
}

// fetchAndHandleRunnableTasks fetches the runnable tasks from the global task table and handles them.
func (m *Manager) fetchAndHandleRunnableTasks(ctx context.Context) {
func (m *Manager) fetchAndHandleRunnableTasksLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "fetchAndHandleRunnableTasksLoop", m.fetchAndHandleRunnableTasksLoop, false)
ticker := time.NewTicker(checkTime)
for {
select {
case <-ctx.Done():
logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasks done")
case <-m.ctx.Done():
logutil.Logger(m.logCtx).Info("fetchAndHandleRunnableTasksLoop done")
return
case <-ticker.C:
tasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStateRunning, proto.TaskStateReverting)
if err != nil {
m.logErr(err)
continue
}
m.onRunnableTasks(ctx, tasks)
m.onRunnableTasks(m.ctx, tasks)
}
}
}

// fetchAndFastCancelTasks fetches the reverting/pausing tasks from the global task table and fast cancels them.
func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) {
func (m *Manager) fetchAndFastCancelTasksLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "fetchAndFastCancelTasksLoop", m.fetchAndFastCancelTasksLoop, false)

ticker := time.NewTicker(checkTime)
for {
select {
case <-ctx.Done():
case <-m.ctx.Done():
m.cancelAllRunningTasks()
logutil.Logger(m.logCtx).Info("fetchAndFastCancelTasks done")
logutil.Logger(m.logCtx).Info("fetchAndFastCancelTasksLoop done")
return
case <-ticker.C:
tasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStateReverting)
if err != nil {
m.logErr(err)
continue
}
m.onCanceledTasks(ctx, tasks)
m.onCanceledTasks(m.ctx, tasks)

// cancel pending/running subtasks, and mark them as paused.
pausingTasks, err := m.taskTable.GetGlobalTasksInStates(proto.TaskStatePausing)
Expand All @@ -189,6 +192,9 @@ func (m *Manager) fetchAndFastCancelTasks(ctx context.Context) {

// onRunnableTasks handles runnable tasks.
func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {
if len(tasks) == 0 {
return
}
tasks = m.filterAlreadyHandlingTasks(tasks)
for _, task := range tasks {
exist, err := m.taskTable.HasSubtasksInStates(m.id, task.ID, task.Step,
Expand Down Expand Up @@ -221,6 +227,9 @@ func (m *Manager) onRunnableTasks(ctx context.Context, tasks []*proto.Task) {

// onCanceledTasks cancels the running subtasks.
func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {
if len(tasks) == 0 {
return
}
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
Expand All @@ -234,6 +243,9 @@ func (m *Manager) onCanceledTasks(_ context.Context, tasks []*proto.Task) {

// onPausingTasks pauses/cancels the pending/running subtasks.
func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
if len(tasks) == 0 {
return nil
}
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
Expand All @@ -250,6 +262,28 @@ func (m *Manager) onPausingTasks(tasks []*proto.Task) error {
return nil
}

// recoverMetaLoop inits and recovers dist_framework_meta for the tidb node running the scheduler manager.
// This is necessary when the TiDB node experiences a prolonged network partition
// and the dispatcher deletes `dist_framework_meta`.
// When the TiDB node recovers from the network partition,
// we need to re-insert the metadata.
func (m *Manager) recoverMetaLoop() {
defer tidbutil.Recover(metrics.LabelDomain, "recoverMetaLoop", m.recoverMetaLoop, false)
ticker := time.NewTicker(recoverMetaInterval)
for {
select {
case <-m.ctx.Done():
logutil.Logger(m.logCtx).Info("recoverMetaLoop done")
return
case <-ticker.C:
if err := m.initMeta(); err != nil {
m.logErr(err)
continue
}
}
}
}

// cancelAllRunningTasks cancels all running tasks.
func (m *Manager) cancelAllRunningTasks() {
m.mu.RLock()
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ func TestDistFrameworkMeta(t *testing.T) {

require.NoError(t, sm.StartManager(":4000", "background"))
require.NoError(t, sm.StartManager(":4001", ""))
require.NoError(t, sm.StartManager(":4002", ""))
require.NoError(t, sm.StartManager(":4002", "background"))

allNodes, err := sm.GetAllNodes()
Expand Down
3 changes: 1 addition & 2 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,7 @@ func (stm *TaskManager) StartSubtask(subtaskID int64) error {

// StartManager insert the manager information into dist_framework_meta.
func (stm *TaskManager) StartManager(tidbID string, role string) error {
_, err := stm.executeSQLWithNewSession(stm.ctx, `insert into mysql.dist_framework_meta values(%?, %?, DEFAULT)
on duplicate key update role = %?`, tidbID, role, role)
_, err := stm.executeSQLWithNewSession(stm.ctx, `replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, tidbID, role)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error {
func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storage.TaskManager, schedulerManager *scheduler.Manager, serverID string) {
err := schedulerManager.Start()
if err != nil {
logutil.BgLogger().Error("dist task scheduler manager failed", zap.Error(err))
logutil.BgLogger().Error("dist task scheduler manager start failed", zap.Error(err))
return
}
logutil.BgLogger().Info("dist task scheduler manager started")
Expand Down
4 changes: 1 addition & 3 deletions pkg/executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
dom := domain.GetDomain(e.Ctx())
serverID := disttaskutil.GenerateSubtaskExecID(ctx, dom.DDL().GetID())
_, err = e.Ctx().(sqlexec.SQLExecutor).ExecuteInternal(ctx,
`update mysql.dist_framework_meta
set role = %?
where host = %?`, valStr, serverID)
`replace into mysql.dist_framework_meta values(%?, %?, DEFAULT)`, serverID, valStr)
}
return err
}
Expand Down

0 comments on commit b1966e7

Please sign in to comment.