Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: fix bug when owner change #46054

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//disttask/framework/proto",
"//disttask/framework/storage",
"//domain/infosync",
"//owner",
"//resourcemanager/pool/spool",
"//resourcemanager/util",
"//sessionctx",
Expand Down
16 changes: 11 additions & 5 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
disttaskutil "github.com/pingcap/tidb/util/disttask"
Expand Down Expand Up @@ -60,20 +61,22 @@
// Manage the lifetime of a task
// including submitting subtasks and updating the status of a task.
type dispatcher struct {
ctx context.Context
taskMgr *storage.TaskManager
task *proto.Task
logCtx context.Context
ctx context.Context
taskMgr *storage.TaskManager
ownerManager owner.Manager
task *proto.Task
logCtx context.Context
}

// MockOwnerChange mock owner change in tests.
var MockOwnerChange func()

func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, task *proto.Task) *dispatcher {
func newDispatcher(ctx context.Context, taskMgr *storage.TaskManager, ownerManager owner.Manager, task *proto.Task) *dispatcher {

Check warning on line 74 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L74

Added line #L74 was not covered by tests
logPrefix := fmt.Sprintf("task_id: %d, task_type: %s", task.ID, task.Type)
return &dispatcher{
ctx,
taskMgr,
ownerManager,

Check warning on line 79 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L79

Added line #L79 was not covered by tests
task,
logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix),
}
Expand Down Expand Up @@ -211,6 +214,9 @@
}

func (d *dispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) {
if !d.ownerManager.IsOwner() {
Copy link
Contributor

@lance6716 lance6716 Aug 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the owner is dropped just after this check? There is always an interval between time of check to time of use. Can we add a version to transactional storage to detect it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

return errors.New("dispatcher is not owner anymore")
}

Check warning on line 219 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L217-L219

Added lines #L217 - L219 were not covered by tests
prevState := d.task.State
d.task.State = taskState
for i := 0; i < retryTimes; i++ {
Expand Down
22 changes: 13 additions & 9 deletions disttask/framework/dispatcher/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/storage"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/resourcemanager/pool/spool"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -70,12 +71,14 @@
// Dispatcher schedule and monitor tasks.
// The scheduling task number is limited by size of gPool.
type Manager struct {
ctx context.Context
cancel context.CancelFunc
taskMgr *storage.TaskManager
wg tidbutil.WaitGroupWrapper
gPool *spool.Pool
inited bool
ctx context.Context
cancel context.CancelFunc
taskMgr *storage.TaskManager
wg tidbutil.WaitGroupWrapper
ownerManager owner.Manager

gPool *spool.Pool
inited bool

runningTasks struct {
syncutil.RWMutex
Expand All @@ -86,10 +89,11 @@
}

// NewManager creates a dispatcher struct.
func NewManager(ctx context.Context, taskTable *storage.TaskManager) (*Manager, error) {
func NewManager(ctx context.Context, taskTable *storage.TaskManager, ownerManager owner.Manager) (*Manager, error) {
dispatcherManager := &Manager{
taskMgr: taskTable,
finishedTaskCh: make(chan *proto.Task, DefaultDispatchConcurrency),
ownerManager: ownerManager,
}
gPool, err := spool.NewPool("dispatch_pool", int32(DefaultDispatchConcurrency), util.DistTask, spool.WithBlocking(true))
if err != nil {
Expand Down Expand Up @@ -184,7 +188,7 @@
func (dm *Manager) startDispatcher(task *proto.Task) {
// Using the pool with block, so it wouldn't return an error.
_ = dm.gPool.Run(func() {
dispatcher := newDispatcher(dm.ctx, dm.taskMgr, task)
dispatcher := newDispatcher(dm.ctx, dm.taskMgr, dm.ownerManager, task)

Check warning on line 191 in disttask/framework/dispatcher/dispatcher_manager.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher_manager.go#L191

Added line #L191 was not covered by tests
dm.setRunningTask(task, dispatcher)
dispatcher.executeTask()
dm.delRunningTask(task.ID)
Expand All @@ -193,5 +197,5 @@

// MockDispatcher mock one dispatcher for one task, only used for tests.
func (dm *Manager) MockDispatcher(task *proto.Task) *dispatcher {
return newDispatcher(dm.ctx, dm.taskMgr, task)
return newDispatcher(dm.ctx, dm.taskMgr, dm.ownerManager, task)

Check warning on line 200 in disttask/framework/dispatcher/dispatcher_manager.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher_manager.go#L200

Added line #L200 was not covered by tests
}
2 changes: 1 addition & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,7 @@ func (do *Domain) distTaskFrameworkLoop(ctx context.Context, taskManager *storag
return
}
var err error
dispatcherManager, err = dispatcher.NewManager(ctx, taskManager)
dispatcherManager, err = dispatcher.NewManager(ctx, taskManager, do.ddl.OwnerManager())
if err != nil {
logutil.BgLogger().Error("failed to create a disttask dispatcher", zap.Error(err))
return
Expand Down