diff --git a/disttask/framework/dispatcher/dispatcher.go b/disttask/framework/dispatcher/dispatcher.go index bac36953d1b2d..92eabbed05b55 100644 --- a/disttask/framework/dispatcher/dispatcher.go +++ b/disttask/framework/dispatcher/dispatcher.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/resourcemanager/pool/spool" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" disttaskutil "github.com/pingcap/tidb/util/disttask" @@ -72,6 +73,7 @@ type TaskHandle interface { // including submitting subtasks and updating the status of a task. type dispatcher struct { ctx context.Context + gPool *spool.Pool taskMgr *storage.TaskManager task *proto.Task finishedCh chan *proto.Task @@ -80,9 +82,10 @@ type dispatcher struct { // MockOwnerChange mock owner change in tests. var MockOwnerChange func() -func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, task *proto.Task, finishedCh chan *proto.Task) *dispatcher { +func newDispatcher(ctx context.Context, gPool *spool.Pool, taskMgr *storage.TaskManager, task *proto.Task, finishedCh chan *proto.Task) *dispatcher { return &dispatcher{ ctx, + gPool, taskMgr, task, finishedCh, @@ -92,12 +95,12 @@ func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, task *prot // ExecuteTask start to schedule a task func (d *dispatcher) ExecuteTask() { // Using the pool with block, so it wouldn't return an error. - go func() { + _ = d.gPool.Run(func() { logutil.BgLogger().Info("execute one task", zap.Int64("task ID", d.task.ID), zap.String("state", d.task.State), zap.Uint64("concurrency", d.task.Concurrency)) d.scheduleTask(d.task.ID) d.finishedCh <- d.task - }() + }) } // monitorTask checks whether the current step of one task is finished, diff --git a/disttask/framework/dispatcher/dispatcher_manager.go b/disttask/framework/dispatcher/dispatcher_manager.go index dd3b4914aaee8..b3acf61ee1819 100644 --- a/disttask/framework/dispatcher/dispatcher_manager.go +++ b/disttask/framework/dispatcher/dispatcher_manager.go @@ -185,11 +185,11 @@ func (*Manager) checkConcurrencyOverflow(cnt int) bool { } func (dm *Manager) startDispatcher(task *proto.Task) { - dispatcher := newDispatcher(dm.ctx, dm.taskMgr, task, dm.finishedTaskCh) + dispatcher := newDispatcher(dm.ctx, dm.gPool, dm.taskMgr, task, dm.finishedTaskCh) dm.setRunningTask(task, dispatcher) } // MockDispatcher mock one dispatcher for one task, only used for tests. func (dm *Manager) MockDispatcher(task *proto.Task) *dispatcher { - return &dispatcher{dm.ctx, dm.taskMgr, task, nil} + return &dispatcher{dm.ctx, dm.gPool, dm.taskMgr, task, nil} }