Skip to content

Commit

Permalink
disttask: impl slot manager in dispatcher, and check resource before …
Browse files Browse the repository at this point in the history
…start dispatcher (#49195)

close #49100
  • Loading branch information
D3Hunter authored Dec 11, 2023
1 parent 9bf043b commit a7260ff
Show file tree
Hide file tree
Showing 30 changed files with 1,115 additions and 267 deletions.
8 changes: 7 additions & 1 deletion pkg/disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"dispatcher.go",
"dispatcher_manager.go",
"interface.go",
"slots.go",
"state_transform.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher",
Expand All @@ -19,6 +20,7 @@ go_library(
"//pkg/resourcemanager/util",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/cpu",
"//pkg/util/disttask",
"//pkg/util/intest",
"//pkg/util/logutil",
Expand All @@ -37,20 +39,24 @@ go_test(
"dispatcher_test.go",
"main_test.go",
"rebalance_test.go",
"slots_test.go",
],
embed = [":dispatcher"],
flaky = True,
race = "off",
shard_count = 19,
shard_count = 22,
deps = [
"//pkg/disttask/framework/dispatcher/mock",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/domain/infosync",
"//pkg/kv",
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util/disttask",
"//pkg/util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
25 changes: 13 additions & 12 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (*BaseDispatcher) Init() error {
// ExecuteTask implements the Dispatcher interface.
func (d *BaseDispatcher) ExecuteTask() {
logutil.Logger(d.logCtx).Info("execute one task",
zap.Stringer("state", d.Task.State), zap.Uint64("concurrency", d.Task.Concurrency))
zap.Stringer("state", d.Task.State), zap.Int("concurrency", d.Task.Concurrency))
d.scheduleTask()
}

Expand Down Expand Up @@ -570,7 +570,7 @@ func (d *BaseDispatcher) onErrHandlingStage(receiveErrs []error) error {
subTasks = make([]*proto.Subtask, 0, len(instanceIDs))
for _, id := range instanceIDs {
// reverting subtasks belong to the same step as current active step.
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, int(d.Task.Concurrency), []byte("{}")))
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, d.Task.Concurrency, []byte("{}")))
}
}
return d.updateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
Expand Down Expand Up @@ -686,7 +686,11 @@ func (d *BaseDispatcher) dispatchSubTask(
subtaskStep proto.Step,
metas [][]byte,
serverNodes []*infosync.ServerInfo) error {
logutil.Logger(d.logCtx).Info("dispatch subtasks", zap.Stringer("state", d.Task.State), zap.Int64("step", int64(d.Task.Step)), zap.Uint64("concurrency", d.Task.Concurrency), zap.Int("subtasks", len(metas)))
logutil.Logger(d.logCtx).Info("dispatch subtasks",
zap.Stringer("state", d.Task.State),
zap.Int64("step", int64(d.Task.Step)),
zap.Int("concurrency", d.Task.Concurrency),
zap.Int("subtasks", len(metas)))
d.TaskNodes = make([]string, len(serverNodes))
for i := range serverNodes {
d.TaskNodes[i] = disttaskutil.GenerateExecID(serverNodes[i].IP, serverNodes[i].Port)
Expand All @@ -698,7 +702,7 @@ func (d *BaseDispatcher) dispatchSubTask(
pos := i % len(serverNodes)
instanceID := disttaskutil.GenerateExecID(serverNodes[pos].IP, serverNodes[pos].Port)
logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID))
subTasks = append(subTasks, proto.NewSubtask(subtaskStep, d.Task.ID, d.Task.Type, instanceID, int(d.Task.Concurrency), meta))
subTasks = append(subTasks, proto.NewSubtask(subtaskStep, d.Task.ID, d.Task.Type, instanceID, d.Task.Concurrency, meta))
}
failpoint.Inject("cancelBeforeUpdateTask", func() {
_ = d.updateTask(proto.TaskStateCancelling, subTasks, RetrySQLTimes)
Expand Down Expand Up @@ -750,22 +754,19 @@ func GenerateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.Ser
}

func (d *BaseDispatcher) filterByRole(infos []*infosync.ServerInfo) ([]*infosync.ServerInfo, error) {
nodes, err := d.taskMgr.GetNodesByRole(d.ctx, "background")
nodes, err := d.taskMgr.GetManagedNodes(d.ctx)
if err != nil {
return nil, err
}

if len(nodes) == 0 {
nodes, err = d.taskMgr.GetNodesByRole(d.ctx, "")
}

if err != nil {
return nil, err
nodeMap := make(map[string]struct{}, len(nodes))
for _, node := range nodes {
nodeMap[node] = struct{}{}
}

res := make([]*infosync.ServerInfo, 0, len(nodes))
for _, info := range infos {
_, ok := nodes[disttaskutil.GenerateExecID(info.IP, info.Port)]
_, ok := nodeMap[disttaskutil.GenerateExecID(info.IP, info.Port)]
if ok {
res = append(res, info)
}
Expand Down
Loading

0 comments on commit a7260ff

Please sign in to comment.