Skip to content

Commit

Permalink
use gpool
Browse files Browse the repository at this point in the history
  • Loading branch information
ywqzzy committed Aug 1, 2023
1 parent 52def17 commit 61a4fe6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
9 changes: 6 additions & 3 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/resourcemanager/pool/spool"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
disttaskutil "github.com/pingcap/tidb/util/disttask"
Expand Down Expand Up @@ -72,6 +73,7 @@ type TaskHandle interface {
// including submitting subtasks and updating the status of a task.
type dispatcher struct {
ctx context.Context
gPool *spool.Pool
taskMgr *storage.TaskManager
task *proto.Task
finishedCh chan *proto.Task
Expand All @@ -80,9 +82,10 @@ type dispatcher struct {
// MockOwnerChange mock owner change in tests.
var MockOwnerChange func()

func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, task *proto.Task, finishedCh chan *proto.Task) *dispatcher {
func newDispatcher(ctx context.Context, gPool *spool.Pool, taskMgr *storage.TaskManager, task *proto.Task, finishedCh chan *proto.Task) *dispatcher {
return &dispatcher{
ctx,
gPool,
taskMgr,
task,
finishedCh,
Expand All @@ -92,12 +95,12 @@ func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, task *prot
// ExecuteTask start to schedule a task
func (d *dispatcher) ExecuteTask() {
// Using the pool with block, so it wouldn't return an error.
go func() {
_ = d.gPool.Run(func() {
logutil.BgLogger().Info("execute one task", zap.Int64("task ID", d.task.ID),
zap.String("state", d.task.State), zap.Uint64("concurrency", d.task.Concurrency))
d.scheduleTask(d.task.ID)
d.finishedCh <- d.task
}()
})
}

// monitorTask checks whether the current step of one task is finished,
Expand Down
4 changes: 2 additions & 2 deletions disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ func (*Manager) checkConcurrencyOverflow(cnt int) bool {
}

func (dm *Manager) startDispatcher(task *proto.Task) {
dispatcher := newDispatcher(dm.ctx, dm.taskMgr, task, dm.finishedTaskCh)
dispatcher := newDispatcher(dm.ctx, dm.gPool, dm.taskMgr, task, dm.finishedTaskCh)
dm.setRunningTask(task, dispatcher)
}

// MockDispatcher mock one dispatcher for one task, only used for tests.
func (dm *Manager) MockDispatcher(task *proto.Task) *dispatcher {
return &dispatcher{dm.ctx, dm.taskMgr, task, nil}
return &dispatcher{dm.ctx, dm.gPool, dm.taskMgr, task, nil}
}

0 comments on commit 61a4fe6

Please sign in to comment.