From 3f4db1afa8f55a7e0fdf1b54f1e55b78e9432bf2 Mon Sep 17 00:00:00 2001 From: tangenta Date: Thu, 28 Dec 2023 23:36:28 +0800 Subject: [PATCH] disttask: do not retry meta initialization if context done (#49881) close pingcap/tidb#49753 --- pkg/disttask/framework/scheduler/scheduler.go | 3 +++ pkg/disttask/framework/taskexecutor/manager.go | 3 +++ pkg/domain/domain.go | 15 +++++++++------ pkg/session/session.go | 2 +- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 0e40199d9730b..c901d6cca5a52 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -491,6 +491,9 @@ func (s *BaseScheduler) updateTask(taskState proto.TaskState, newSubTasks []*pro if err == nil || !retryable { break } + if err1 := s.ctx.Err(); err1 != nil { + return err1 + } if i%10 == 0 { logutil.Logger(s.logCtx).Warn("updateTask first failed", zap.Stringer("from", prevState), zap.Stringer("to", s.Task.State), zap.Int("retry times", i), zap.Error(err)) diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 825b2b9d0e4d6..84635d1608a28 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -118,6 +118,9 @@ func (m *Manager) initMeta() (err error) { if err == nil { break } + if err1 := m.ctx.Err(); err1 != nil { + return err1 + } if i%10 == 0 { logutil.Logger(m.logCtx).Warn("start manager failed", zap.String("scope", config.GetGlobalConfig().Instance.TiDBServiceScope), diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 7194e71b1f33b..87bc2160c6cb9 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -159,7 +159,7 @@ type Domain struct { // TODO: use Run for each process in future pr wg *util.WaitGroupEnhancedWrapper statsUpdating atomicutil.Int32 - cancel context.CancelFunc + cancelFns []context.CancelFunc indexUsageSyncLease time.Duration dumpFileGcChecker *dumpFileGcChecker planReplayerHandle *planReplayerHandle @@ -1036,8 +1036,8 @@ func (do *Domain) Close() { } do.slowQuery.Close() - if do.cancel != nil { - do.cancel() + for _, f := range do.cancelFns { + f() } do.wg.Wait() do.sysSessionPool.Close() @@ -1164,7 +1164,7 @@ func (do *Domain) Init( } sysCtxPool := pools.NewResourcePool(sysFac, 128, 128, resourceIdleTimeout) ctx, cancelFunc := context.WithCancel(context.Background()) - do.cancel = cancelFunc + do.cancelFns = append(do.cancelFns, cancelFunc) var callback ddl.Callback newCallbackFunc, err := ddl.GetCustomizedHook("default_hook") if err != nil { @@ -1458,7 +1458,8 @@ func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) erro } // InitDistTaskLoop initializes the distributed task framework. -func (do *Domain) InitDistTaskLoop(ctx context.Context) error { +func (do *Domain) InitDistTaskLoop() error { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDistTask) failpoint.Inject("MockDisableDistTask", func(val failpoint.Value) { if val.(bool) { failpoint.Return(nil) @@ -1478,7 +1479,9 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error { errMsg := fmt.Sprintf("TiDB node ID( = %s ) not found in available TiDB nodes list", do.ddl.GetID()) return errors.New(errMsg) } - executorManager, err := taskexecutor.NewManagerBuilder().BuildManager(ctx, serverID, taskManager) + managerCtx, cancel := context.WithCancel(ctx) + do.cancelFns = append(do.cancelFns, cancel) + executorManager, err := taskexecutor.NewManagerBuilder().BuildManager(managerCtx, serverID, taskManager) if err != nil { return err } diff --git a/pkg/session/session.go b/pkg/session/session.go index d85f4b0e494de..04b07e831838b 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3493,7 +3493,7 @@ func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Sto dom.Close() return nil, errors.New("Fail to load or parse sql file") } - err = dom.InitDistTaskLoop(ctx) + err = dom.InitDistTaskLoop() if err != nil { return nil, err }