Skip to content

Commit

Permalink
disttask: do not retry meta initialization if context done (#49881)
Browse files Browse the repository at this point in the history
close #49753
  • Loading branch information
tangenta authored Dec 28, 2023
1 parent ab1f518 commit 3f4db1a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
3 changes: 3 additions & 0 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ func (s *BaseScheduler) updateTask(taskState proto.TaskState, newSubTasks []*pro
if err == nil || !retryable {
break
}
if err1 := s.ctx.Err(); err1 != nil {
return err1
}
if i%10 == 0 {
logutil.Logger(s.logCtx).Warn("updateTask first failed", zap.Stringer("from", prevState), zap.Stringer("to", s.Task.State),
zap.Int("retry times", i), zap.Error(err))
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (m *Manager) initMeta() (err error) {
if err == nil {
break
}
if err1 := m.ctx.Err(); err1 != nil {
return err1
}
if i%10 == 0 {
logutil.Logger(m.logCtx).Warn("start manager failed",
zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope),
Expand Down
15 changes: 9 additions & 6 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type Domain struct {
// TODO: use Run for each process in future pr
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
cancelFns []context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
Expand Down Expand Up @@ -1036,8 +1036,8 @@ func (do *Domain) Close() {
}

do.slowQuery.Close()
if do.cancel != nil {
do.cancel()
for _, f := range do.cancelFns {
f()
}
do.wg.Wait()
do.sysSessionPool.Close()
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func (do *Domain) Init(
}
sysCtxPool := pools.NewResourcePool(sysFac, 128, 128, resourceIdleTimeout)
ctx, cancelFunc := context.WithCancel(context.Background())
do.cancel = cancelFunc
do.cancelFns = append(do.cancelFns, cancelFunc)
var callback ddl.Callback
newCallbackFunc, err := ddl.GetCustomizedHook("default_hook")
if err != nil {
Expand Down Expand Up @@ -1458,7 +1458,8 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro
}

// InitDistTaskLoop initializes the distributed task framework.
func (do *Domain) InitDistTaskLoop(ctx context.Context) error {
func (do *Domain) InitDistTaskLoop() error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDistTask)
failpoint.Inject("MockDisableDistTask", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil)
Expand All @@ -1478,7 +1479,9 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error {
errMsg := fmt.Sprintf("TiDB node ID( = %s ) not found in available TiDB nodes list", do.ddl.GetID())
return errors.New(errMsg)
}
executorManager, err := taskexecutor.NewManagerBuilder().BuildManager(ctx, serverID, taskManager)
managerCtx, cancel := context.WithCancel(ctx)
do.cancelFns = append(do.cancelFns, cancel)
executorManager, err := taskexecutor.NewManagerBuilder().BuildManager(managerCtx, serverID, taskManager)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3493,7 +3493,7 @@ func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Sto
dom.Close()
return nil, errors.New("Fail to load or parse sql file")
}
err = dom.InitDistTaskLoop(ctx)
err = dom.InitDistTaskLoop()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 3f4db1a

Please sign in to comment.