Skip to content

Commit

Permalink
statistics: upgrade stats timeout checkpoint after it timeouts (#52424)
Browse files Browse the repository at this point in the history
close #52425
  • Loading branch information
hawkingrei authored Apr 9, 2024
1 parent 7522356 commit 9bb3697
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/planner/core/rule_collect_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPlan
return plan, planChanged, nil
}
predicateNeeded := variable.EnableColumnTracking.Load()
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait.Load()
histNeeded := syncWait > 0
predicateColumns, histNeededColumns, visitedPhysTblIDs := CollectColumnStatsUsage(plan, predicateNeeded, histNeeded)
if len(predicateColumns) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,7 @@ type SessionVars struct {
ReadConsistency ReadConsistencyLevel

// StatsLoadSyncWait indicates how long to wait for stats load before timeout.
StatsLoadSyncWait int64
StatsLoadSyncWait atomic.Int64

// EnableParallelHashaggSpill indicates if parallel hash agg could spill.
EnableParallelHashaggSpill bool
Expand Down Expand Up @@ -2026,7 +2026,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
EnableLegacyInstanceScope: DefEnableLegacyInstanceScope,
RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery,
EnableSkewDistinctAgg: DefTiDBSkewDistinctAgg,
Expand Down Expand Up @@ -2094,6 +2093,7 @@ func NewSessionVars(hctx HookContext) *SessionVars {
vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery)
vars.MemTracker.IsRootTrackerOfSess = true
vars.MemTracker.Killer = &vars.SQLKiller
vars.StatsLoadSyncWait.Store(StatsLoadSyncWait.Load())

for _, engine := range config.GetGlobalConfig().IsolationRead.Engines {
switch engine {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,7 @@ var defaultSysVars = []*SysVar{
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBStatsLoadSyncWait, Value: strconv.Itoa(DefTiDBStatsLoadSyncWait), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32,
SetSession: func(s *SessionVars, val string) error {
s.StatsLoadSyncWait = TidbOptInt64(val, DefTiDBStatsLoadSyncWait)
s.StatsLoadSyncWait.Store(TidbOptInt64(val, DefTiDBStatsLoadSyncWait))
return nil
},
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
}
}()
if lastTask == nil {
task, err = s.drainColTask(exit)
task, err = s.drainColTask(sctx, exit)
if err != nil {
if err != errExit {
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
Expand All @@ -239,6 +239,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
}
return task, result.Err
case <-time.After(timeout):
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
}
}
Expand Down Expand Up @@ -396,7 +397,7 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta
}

// drainColTask will hang until a column task can return, and either task or error will be returned.
func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*statstypes.NeededItemTask, error) {
func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) {
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
for {
select {
Expand All @@ -409,6 +410,7 @@ func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*statstypes.NeededItem
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down

0 comments on commit 9bb3697

Please sign in to comment.