Skip to content

Commit

Permalink
domain: fix race on cancelFns (#54701)
Browse files Browse the repository at this point in the history
close #54690
  • Loading branch information
lance6716 authored Jul 18, 2024
1 parent 0e5e9ee commit 0ad8b84
Showing 1 changed file with 18 additions and 7 deletions.
25 changes: 18 additions & 7 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ type Domain struct {
memoryUsageAlarmHandle *memoryusagealarm.Handle
serverMemoryLimitHandle *servermemorylimit.Handle
// TODO: use Run for each process in future pr
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancelFns []context.CancelFunc
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancelFns struct {
mu sync.Mutex
fns []context.CancelFunc
}
dumpFileGcChecker *dumpFileGcChecker
planReplayerHandle *planReplayerHandle
extractTaskHandle *ExtractHandle
Expand Down Expand Up @@ -1142,9 +1145,11 @@ func (do *Domain) Close() {
}

do.slowQuery.Close()
for _, f := range do.cancelFns {
do.cancelFns.mu.Lock()
for _, f := range do.cancelFns.fns {
f()
}
do.cancelFns.mu.Unlock()
do.wg.Wait()
do.sysSessionPool.Close()
variable.UnregisterStatistics(do.BindHandle())
Expand Down Expand Up @@ -1270,7 +1275,9 @@ func (do *Domain) Init(
}
sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout)
ctx, cancelFunc := context.WithCancel(context.Background())
do.cancelFns = append(do.cancelFns, cancelFunc)
do.cancelFns.mu.Lock()
do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc)
do.cancelFns.mu.Unlock()
var callback ddl.Callback
newCallbackFunc, err := ddl.GetCustomizedHook("default_hook")
if err != nil {
Expand Down Expand Up @@ -1589,7 +1596,9 @@ func (do *Domain) InitDistTaskLoop() error {
return errors.New(errMsg)
}
managerCtx, cancel := context.WithCancel(ctx)
do.cancelFns = append(do.cancelFns, cancel)
do.cancelFns.mu.Lock()
do.cancelFns.fns = append(do.cancelFns.fns, cancel)
do.cancelFns.mu.Unlock()
executorManager, err := taskexecutor.NewManager(managerCtx, serverID, taskManager)
if err != nil {
return err
Expand Down Expand Up @@ -2450,7 +2459,9 @@ func (do *Domain) loadStatsWorker() {
}()

ctx, cancelFunc := context.WithCancel(context.Background())
do.cancelFns = append(do.cancelFns, cancelFunc)
do.cancelFns.mu.Lock()
do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc)
do.cancelFns.mu.Unlock()

do.initStats(ctx)
statsHandle := do.StatsHandle()
Expand Down

0 comments on commit 0ad8b84

Please sign in to comment.