diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index db1ce74917983..bb55529788de7 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -162,9 +162,12 @@ type Domain struct { memoryUsageAlarmHandle *memoryusagealarm.Handle serverMemoryLimitHandle *servermemorylimit.Handle // TODO: use Run for each process in future pr - wg *util.WaitGroupEnhancedWrapper - statsUpdating atomicutil.Int32 - cancelFns []context.CancelFunc + wg *util.WaitGroupEnhancedWrapper + statsUpdating atomicutil.Int32 + cancelFns struct { + mu sync.Mutex + fns []context.CancelFunc + } dumpFileGcChecker *dumpFileGcChecker planReplayerHandle *planReplayerHandle extractTaskHandle *ExtractHandle @@ -1142,9 +1145,11 @@ func (do *Domain) Close() { } do.slowQuery.Close() - for _, f := range do.cancelFns { + do.cancelFns.mu.Lock() + for _, f := range do.cancelFns.fns { f() } + do.cancelFns.mu.Unlock() do.wg.Wait() do.sysSessionPool.Close() variable.UnregisterStatistics(do.BindHandle()) @@ -1270,7 +1275,9 @@ func (do *Domain) Init( } sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout) ctx, cancelFunc := context.WithCancel(context.Background()) - do.cancelFns = append(do.cancelFns, cancelFunc) + do.cancelFns.mu.Lock() + do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc) + do.cancelFns.mu.Unlock() var callback ddl.Callback newCallbackFunc, err := ddl.GetCustomizedHook("default_hook") if err != nil { @@ -1589,7 +1596,9 @@ func (do *Domain) InitDistTaskLoop() error { return errors.New(errMsg) } managerCtx, cancel := context.WithCancel(ctx) - do.cancelFns = append(do.cancelFns, cancel) + do.cancelFns.mu.Lock() + do.cancelFns.fns = append(do.cancelFns.fns, cancel) + do.cancelFns.mu.Unlock() executorManager, err := taskexecutor.NewManager(managerCtx, serverID, taskManager) if err != nil { return err @@ -2450,7 +2459,9 @@ func (do *Domain) loadStatsWorker() { }() ctx, cancelFunc := context.WithCancel(context.Background()) - do.cancelFns = append(do.cancelFns, cancelFunc) + do.cancelFns.mu.Lock() + do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc) + do.cancelFns.mu.Unlock() do.initStats(ctx) statsHandle := do.StatsHandle()