Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: impl slot manager in dispatcher, and check resource before start dispatcher #49195

Merged
merged 15 commits into from
Dec 11, 2023
8 changes: 7 additions & 1 deletion pkg/disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"dispatcher.go",
"dispatcher_manager.go",
"interface.go",
"slots.go",
"state_transform.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/dispatcher",
Expand All @@ -19,6 +20,7 @@ go_library(
"//pkg/resourcemanager/util",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/cpu",
"//pkg/util/disttask",
"//pkg/util/intest",
"//pkg/util/logutil",
Expand All @@ -37,20 +39,24 @@ go_test(
"dispatcher_test.go",
"main_test.go",
"rebalance_test.go",
"slots_test.go",
],
embed = [":dispatcher"],
flaky = True,
race = "off",
shard_count = 19,
shard_count = 22,
deps = [
"//pkg/disttask/framework/dispatcher/mock",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/domain/infosync",
"//pkg/kv",
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util/disttask",
"//pkg/util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
25 changes: 13 additions & 12 deletions pkg/disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (*BaseDispatcher) Init() error {
// ExecuteTask implements the Dispatcher interface.
func (d *BaseDispatcher) ExecuteTask() {
logutil.Logger(d.logCtx).Info("execute one task",
zap.Stringer("state", d.Task.State), zap.Uint64("concurrency", d.Task.Concurrency))
zap.Stringer("state", d.Task.State), zap.Int("concurrency", d.Task.Concurrency))
d.scheduleTask()
}

Expand Down Expand Up @@ -570,7 +570,7 @@ func (d *BaseDispatcher) onErrHandlingStage(receiveErrs []error) error {
subTasks = make([]*proto.Subtask, 0, len(instanceIDs))
for _, id := range instanceIDs {
// reverting subtasks belong to the same step as current active step.
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, int(d.Task.Concurrency), []byte("{}")))
subTasks = append(subTasks, proto.NewSubtask(d.Task.Step, d.Task.ID, d.Task.Type, id, d.Task.Concurrency, []byte("{}")))
}
}
return d.updateTask(proto.TaskStateReverting, subTasks, RetrySQLTimes)
Expand Down Expand Up @@ -686,7 +686,11 @@ func (d *BaseDispatcher) dispatchSubTask(
subtaskStep proto.Step,
metas [][]byte,
serverNodes []*infosync.ServerInfo) error {
logutil.Logger(d.logCtx).Info("dispatch subtasks", zap.Stringer("state", d.Task.State), zap.Int64("step", int64(d.Task.Step)), zap.Uint64("concurrency", d.Task.Concurrency), zap.Int("subtasks", len(metas)))
logutil.Logger(d.logCtx).Info("dispatch subtasks",
zap.Stringer("state", d.Task.State),
zap.Int64("step", int64(d.Task.Step)),
zap.Int("concurrency", d.Task.Concurrency),
zap.Int("subtasks", len(metas)))
d.TaskNodes = make([]string, len(serverNodes))
for i := range serverNodes {
d.TaskNodes[i] = disttaskutil.GenerateExecID(serverNodes[i].IP, serverNodes[i].Port)
Expand All @@ -698,7 +702,7 @@ func (d *BaseDispatcher) dispatchSubTask(
pos := i % len(serverNodes)
instanceID := disttaskutil.GenerateExecID(serverNodes[pos].IP, serverNodes[pos].Port)
logutil.Logger(d.logCtx).Debug("create subtasks", zap.String("instanceID", instanceID))
subTasks = append(subTasks, proto.NewSubtask(subtaskStep, d.Task.ID, d.Task.Type, instanceID, int(d.Task.Concurrency), meta))
subTasks = append(subTasks, proto.NewSubtask(subtaskStep, d.Task.ID, d.Task.Type, instanceID, d.Task.Concurrency, meta))
}
failpoint.Inject("cancelBeforeUpdateTask", func() {
_ = d.updateTask(proto.TaskStateCancelling, subTasks, RetrySQLTimes)
Expand Down Expand Up @@ -750,22 +754,19 @@ func GenerateTaskExecutorNodes(ctx context.Context) (serverNodes []*infosync.Ser
}

func (d *BaseDispatcher) filterByRole(infos []*infosync.ServerInfo) ([]*infosync.ServerInfo, error) {
nodes, err := d.taskMgr.GetNodesByRole(d.ctx, "background")
nodes, err := d.taskMgr.GetManagedNodes(d.ctx)
if err != nil {
return nil, err
}

if len(nodes) == 0 {
nodes, err = d.taskMgr.GetNodesByRole(d.ctx, "")
}

if err != nil {
return nil, err
nodeMap := make(map[string]struct{}, len(nodes))
for _, node := range nodes {
nodeMap[node] = struct{}{}
}

res := make([]*infosync.ServerInfo, 0, len(nodes))
for _, info := range infos {
_, ok := nodes[disttaskutil.GenerateExecID(info.IP, info.Port)]
_, ok := nodeMap[disttaskutil.GenerateExecID(info.IP, info.Port)]
if ok {
res = append(res, info)
}
Expand Down
Loading