From 7703e4122c71d247e38c8dbce968366f690885b0 Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Mon, 14 Aug 2023 15:12:45 +0800 Subject: [PATCH] fix --- disttask/framework/dispatcher/BUILD.bazel | 1 + disttask/framework/dispatcher/dispatcher.go | 16 +++++++++----- .../dispatcher/dispatcher_manager.go | 22 +++++++++++-------- domain/domain.go | 2 +- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/disttask/framework/dispatcher/BUILD.bazel b/disttask/framework/dispatcher/BUILD.bazel index d9a6a23857790..d58c399dd2f2c 100644 --- a/disttask/framework/dispatcher/BUILD.bazel +++ b/disttask/framework/dispatcher/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//disttask/framework/proto", "//disttask/framework/storage", "//domain/infosync", + "//owner", "//resourcemanager/pool/spool", "//resourcemanager/util", "//sessionctx", diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index 5eb8daa6eb7cf..819d2fe8514d3 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" disttaskutil "github.com/pingcap/tidb/util/disttask" @@ -60,20 +61,22 @@ type TaskHandle interface { // Manage the lifetime of a task // including submitting subtasks and updating the status of a task. type dispatcher struct { - ctx context.Context - taskMgr *storage.TaskManager - task *proto.Task - logCtx context.Context + ctx context.Context + taskMgr *storage.TaskManager + ownerManager owner.Manager + task *proto.Task + logCtx context.Context } // MockOwnerChange mock owner change in tests. var MockOwnerChange func() -func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, task *proto.Task) *dispatcher { +func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, ownerManager owner.Manager, task *proto.Task) *dispatcher { logPrefix := fmt.Sprintf("task_id: %d, task_type: %s", task.ID, task.Type) return &dispatcher{ ctx, taskMgr, + ownerManager, task, logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix), } @@ -211,6 +214,9 @@ func (d *dispatcher) handleRunning() error { } func (d *dispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) { + if !d.ownerManager.IsOwner() { + return errors.New("dispatcher is not owner anymore") + } prevState := d.task.State d.task.State = taskState for i := 0; i < retryTimes; i++ { diff --git a/disttask/framework/dispatcher/dispatcher_manager.go b/disttask/framework/dispatcher/dispatcher_manager.go index df22c0bf263f0..2db385e2f090b 100644 --- a/disttask/framework/dispatcher/dispatcher_manager.go +++ b/disttask/framework/dispatcher/dispatcher_manager.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/resourcemanager/pool/spool" "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" @@ -70,12 +71,14 @@ func (dm *Manager) clearRunningTasks() { // Dispatcher schedule and monitor tasks. // The scheduling task number is limited by size of gPool. type Manager struct { - ctx context.Context - cancel context.CancelFunc - taskMgr *storage.TaskManager - wg tidbutil.WaitGroupWrapper - gPool *spool.Pool - inited bool + ctx context.Context + cancel context.CancelFunc + taskMgr *storage.TaskManager + wg tidbutil.WaitGroupWrapper + ownerManager owner.Manager + + gPool *spool.Pool + inited bool runningTasks struct { syncutil.RWMutex @@ -86,10 +89,11 @@ type Manager struct { } // NewManager creates a dispatcher struct. -func NewManager(ctx context.Context, taskTable *storage.TaskManager) (*Manager, error) { +func NewManager(ctx context.Context, taskTable *storage.TaskManager, ownerManager owner.Manager) (*Manager, error) { dispatcherManager := &Manager{ taskMgr: taskTable, finishedTaskCh: make(chan *proto.Task, DefaultDispatchConcurrency), + ownerManager: ownerManager, } gPool, err := spool.NewPool("dispatch_pool", int32(DefaultDispatchConcurrency), util.DistTask, spool.WithBlocking(true)) if err != nil { @@ -184,7 +188,7 @@ func (*Manager) checkConcurrencyOverflow(cnt int) bool { func (dm *Manager) startDispatcher(task *proto.Task) { // Using the pool with block, so it wouldn't return an error. _ = dm.gPool.Run(func() { - dispatcher := newDispatcher(dm.ctx, dm.taskMgr, task) + dispatcher := newDispatcher(dm.ctx, dm.taskMgr, dm.ownerManager, task) dm.setRunningTask(task, dispatcher) dispatcher.executeTask() dm.delRunningTask(task.ID) @@ -193,5 +197,5 @@ func (dm *Manager) startDispatcher(task *proto.Task) { // MockDispatcher mock one dispatcher for one task, only used for tests. func (dm *Manager) MockDispatcher(task *proto.Task) *dispatcher { - return newDispatcher(dm.ctx, dm.taskMgr, task) + return newDispatcher(dm.ctx, dm.taskMgr, dm.ownerManager, task) } diff --git a/domain/domain.go b/domain/domain.go index 6bb63b69278c9..dde80bb085b4e 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1472,7 +1472,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag return } var err error - dispatcherManager, err = dispatcher.NewManager(ctx, taskManager) + dispatcherManager, err = dispatcher.NewManager(ctx, taskManager, do.ddl.OwnerManager()) if err != nil { logutil.BgLogger().Error("failed to create a disttask dispatcher", zap.Error(err)) return