Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disktask: replace failure nodes with alive ones #45935

Merged
merged 12 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ tools/bin/vfsgendev:
tools/bin/gotestsum:
GOBIN=$(shell pwd)/tools/bin $(GO) install gotest.tools/gotestsum@v1.8.1

tools/bin/mockgen:
GOBIN=$(shell pwd)/tools/bin $(GO) install go.uber.org/mock/mockgen@v0.2.0

# Usage:
#
# $ make vectorized-bench VB_FILE=Time VB_FUNC=builtinCurrentDateSig
Expand Down Expand Up @@ -370,18 +373,18 @@ br_compatibility_test_prepare:
br_compatibility_test:
@cd br && tests/run_compatible.sh run

mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go
mock_s3iface: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

# mock interface for lightning and IMPORT INTO
mock_lightning:
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock:
@mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go
mock_lightning: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
tools/bin/mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/br/pkg/utils TaskRegister > br/pkg/mock/task_register.go

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
3 changes: 2 additions & 1 deletion disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ go_test(
timeout = "short",
srcs = [
"framework_err_handling_test.go",
"framework_ha_test.go",
"framework_rollback_test.go",
"framework_test.go",
],
flaky = True,
race = "on",
shard_count = 14,
shard_count = 22,
deps = [
"//disttask/framework/dispatcher",
"//disttask/framework/proto",
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/dispatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//sessionctx/variable",
"//util",
"//util/disttask",
"//util/intest",
"//util/logutil",
"//util/syncutil",
"@com_github_pingcap_errors//:errors",
Expand Down
106 changes: 97 additions & 9 deletions disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"context"
"fmt"
"math/rand"
"time"

"github.com/pingcap/errors"
Expand All @@ -27,6 +28,7 @@
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
disttaskutil "github.com/pingcap/tidb/util/disttask"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -36,6 +38,8 @@
DefaultSubtaskConcurrency = 16
// MaxSubtaskConcurrency is the maximum concurrency for handling subtask.
MaxSubtaskConcurrency = 256
// DefaultLiveNodesCheckInterval is the tick interval of fetching all server infos from etcd.
DefaultLiveNodesCheckInterval = 2
)

var (
Expand Down Expand Up @@ -65,6 +69,17 @@
logCtx context.Context
serverID string
impl Dispatcher

// for HA
// liveNodes will fetch and store all live nodes every liveNodeInterval ticks.
liveNodes []*infosync.ServerInfo
liveNodeFetchInterval int
// liveNodeFetchTick is the tick variable.
liveNodeFetchTick int
// taskNodes stores the id of current scheduler nodes.
taskNodes []string
// rand is for generating random selection of nodes.
rand *rand.Rand
}

// MockOwnerChange mock owner change in tests.
Expand All @@ -74,12 +89,17 @@
logPrefix := fmt.Sprintf("task_id: %d, task_type: %s, server_id: %s", task.ID, task.Type, serverID)
impl := GetTaskDispatcher(task.Type)
dsp := &dispatcher{
ctx: ctx,
taskMgr: taskMgr,
task: task,
logCtx: logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix),
serverID: serverID,
impl: impl,
ctx: ctx,
taskMgr: taskMgr,
task: task,
logCtx: logutil.WithKeyValue(context.Background(), "dispatcher", logPrefix),
serverID: serverID,
impl: impl,
liveNodes: nil,
liveNodeFetchInterval: DefaultLiveNodesCheckInterval,
liveNodeFetchTick: 0,
taskNodes: nil,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
if dsp.impl == nil {
logutil.BgLogger().Warn("gen dispatcher impl failed, this type impl doesn't register")
Expand Down Expand Up @@ -215,12 +235,70 @@
logutil.Logger(d.logCtx).Info("previous stage finished, generate dist plan", zap.Int64("stage", d.task.Step))
return d.onNextStage()
}
// Check if any node are down.
if err := d.replaceDeadNodesIfAny(); err != nil {
return err
}
// Wait all subtasks in this stage finished.
d.impl.OnTick(d.ctx, d.task)
logutil.Logger(d.logCtx).Debug("on running state, this task keeps current state", zap.String("state", d.task.State))
return nil
}

func (d *dispatcher) replaceDeadNodesIfAny() error {
if len(d.taskNodes) == 0 {
xhebox marked this conversation as resolved.
Show resolved Hide resolved
return errors.Errorf("len(d.taskNodes) == 0, onNextStage is not invoked before onRunning")
}
d.liveNodeFetchTick++
if d.liveNodeFetchTick == d.liveNodeFetchInterval {
d.liveNodeFetchTick = 0
serverInfos, err := GenerateSchedulerNodes(d.ctx)
if err != nil {
return err
}

Check warning on line 258 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L257-L258

Added lines #L257 - L258 were not covered by tests
eligibleServerInfos, err := d.impl.GetEligibleInstances(d.ctx, d.task)
if err != nil {
return err
}

Check warning on line 262 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L261-L262

Added lines #L261 - L262 were not covered by tests
newInfos := serverInfos[:0]
for _, m := range serverInfos {
found := false
for _, n := range eligibleServerInfos {
if m.ID == n.ID {
found = true
break
}
}
if found {
newInfos = append(newInfos, m)
}
}
d.liveNodes = newInfos
}
if len(d.liveNodes) > 0 {
replaceNodes := make(map[string]string)
for _, nodeID := range d.taskNodes {
if ok := disttaskutil.MatchServerInfo(d.liveNodes, nodeID); !ok {
n := d.liveNodes[d.rand.Int()%len(d.liveNodes)] //nolint:gosec
replaceNodes[nodeID] = disttaskutil.GenerateExecID(n.IP, n.Port)
}
}
if err := d.taskMgr.UpdateFailedSchedulerIDs(d.task.ID, replaceNodes); err != nil {
return err
}

Check warning on line 288 in disttask/framework/dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/dispatcher/dispatcher.go#L287-L288

Added lines #L287 - L288 were not covered by tests
// replace local cache.
for k, v := range replaceNodes {
for m, n := range d.taskNodes {
if n == k {
d.taskNodes[m] = v
break
}
}
}
}
return nil
}

func (d *dispatcher) updateTask(taskState string, newSubTasks []*proto.Subtask, retryTimes int) (err error) {
prevState := d.task.State
d.task.State = taskState
Expand Down Expand Up @@ -331,6 +409,10 @@
if len(serverNodes) == 0 {
return errors.New("no available TiDB node to dispatch subtasks")
}
d.taskNodes = make([]string, len(serverNodes))
for i := range serverNodes {
d.taskNodes[i] = disttaskutil.GenerateExecID(serverNodes[i].IP, serverNodes[i].Port)
}
subTasks := make([]*proto.Subtask, 0, len(metas))
for i, meta := range metas {
// we assign the subtask to the instance in a round-robin way.
Expand All @@ -353,16 +435,22 @@
}

// GenerateSchedulerNodes generate a eligible TiDB nodes.
func GenerateSchedulerNodes(ctx context.Context) ([]*infosync.ServerInfo, error) {
serverInfos, err := infosync.GetAllServerInfo(ctx)
func GenerateSchedulerNodes(ctx context.Context) (serverNodes []*infosync.ServerInfo, err error) {
var serverInfos map[string]*infosync.ServerInfo
_, etcd := ctx.Value("etcd").(bool)
if intest.InTest && !etcd {
serverInfos = infosync.MockGlobalServerInfoManagerEntry.GetAllServerInfo()
} else {
serverInfos, err = infosync.GetAllServerInfo(ctx)
}
if err != nil {
return nil, err
}
if len(serverInfos) == 0 {
return nil, errors.New("not found instance")
}

serverNodes := make([]*infosync.ServerInfo, 0, len(serverInfos))
serverNodes = make([]*infosync.ServerInfo, 0, len(serverInfos))
for _, serverInfo := range serverInfos {
serverNodes = append(serverNodes, serverInfo)
}
Expand Down
28 changes: 8 additions & 20 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (*numberExampleDispatcher) IsRetryableErr(error) bool {
}

func MockDispatcherManager(t *testing.T, pool *pools.ResourcePool) (*dispatcher.Manager, *storage.TaskManager) {
ctx := context.Background()
ctx := context.WithValue(context.Background(), "etcd", true)
mgr := storage.NewTaskManager(util.WithInternalSourceType(ctx, "taskManager"), pool)
storage.SetTaskManager(mgr)
dsp, err := dispatcher.NewManager(util.WithInternalSourceType(ctx, "dispatcher"), mgr, "host:port")
Expand Down Expand Up @@ -220,31 +220,19 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc bool, isCancel bool) {
// 3s
cnt := 60
checkGetRunningTaskCnt := func(expected int) {
var retCnt int
for i := 0; i < cnt; i++ {
retCnt = dsp.GetRunningTaskCnt()
if retCnt == expected {
break
}
time.Sleep(time.Millisecond * 50)
}
require.Equal(t, retCnt, expected)
require.Eventually(t, func() bool {
return dsp.GetRunningTaskCnt() == expected
}, time.Second, 50*time.Millisecond)
}

checkTaskRunningCnt := func() []*proto.Task {
var retCnt int
var tasks []*proto.Task
var err error
for i := 0; i < cnt; i++ {
require.Eventually(t, func() bool {
var err error
tasks, err = mgr.GetGlobalTasksInStates(proto.TaskStateRunning)
require.NoError(t, err)
retCnt = len(tasks)
if retCnt == taskCnt {
break
}
time.Sleep(time.Millisecond * 50)
}
require.Equal(t, retCnt, taskCnt)
return len(tasks) == taskCnt
}, time.Second, 50*time.Millisecond)
return tasks
}

Expand Down
Loading