Skip to content

Commit

Permalink
domain: avoit to print too many log if the ddl job of runaway table i…
Browse files Browse the repository at this point in the history
…s not finished (#52283)

close #52048
  • Loading branch information
CabinfeverB authored Apr 9, 2024
1 parent 9b78a23 commit 156432d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 6 deletions.
3 changes: 1 addition & 2 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,8 +1290,7 @@ func (do *Domain) Init(
do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop")
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop")
do.wg.Run(do.runawayWatchSyncLoop, "runawayWatchSyncLoop")
do.wg.Run(do.runawayStartLoop, "runawayStartLoop")
do.wg.Run(do.requestUnitsWriterLoop, "requestUnitsWriterLoop")
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
Expand Down
21 changes: 21 additions & 0 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (r *QuarantineRecord) GenDeletionStmt() (string, []any) {

// RunawayManager is used to detect and record runaway queries.
type RunawayManager struct {
syncerInitialized atomic.Bool
logOnce sync.Once

// queryLock is used to avoid repeated additions. Since we will add new items to the system table,
// in order to avoid repeated additions, we need a lock to ensure that
// action "judging whether there is this record in the current watch list and adding records" have atomicity.
Expand Down Expand Up @@ -219,6 +222,7 @@ func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController, serv
go watchList.Start()
staleQuarantineChan := make(chan *QuarantineRecord, maxWatchRecordChannelSize)
m := &RunawayManager{
syncerInitialized: atomic.Bool{},
resourceGroupCtl: resourceGroupCtl,
watchList: watchList,
serverID: serverAddr,
Expand All @@ -245,6 +249,11 @@ func NewRunawayManager(resourceGroupCtl *rmclient.ResourceGroupsController, serv
return m
}

// MarkSyncerInitialized is used to mark the syncer is initialized.
func (rm *RunawayManager) MarkSyncerInitialized() {
rm.syncerInitialized.Store(true)
}

// DeriveChecker derives a RunawayChecker from the given resource group
func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, planDigest string) *RunawayChecker {
group, err := rm.resourceGroupCtl.GetResourceGroup(resourceGroupName)
Expand Down Expand Up @@ -282,6 +291,12 @@ func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watc
}
// Add record without ID into watch list in this TiDB right now.
rm.addWatchList(record, ttl, false)
if !rm.syncerInitialized.Load() {
rm.logOnce.Do(func() {
logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway")
})
return
}
select {
case rm.quarantineChan <- record:
default:
Expand Down Expand Up @@ -380,6 +395,12 @@ func (rm *RunawayManager) getWatchFromWatchList(key string) *QuarantineRecord {

func (rm *RunawayManager) markRunaway(resourceGroupName, originalSQL, planDigest string, action string, matchType RunawayMatchType, now *time.Time) {
source := rm.serverID
if !rm.syncerInitialized.Load() {
rm.logOnce.Do(func() {
logutil.BgLogger().Warn("runaway syncer is not initialized, so can't records about runaway")
})
return
}
select {
case rm.runawayQueriesChan <- &RunawayRecord{
ResourceGroupName: resourceGroupName,
Expand Down
41 changes: 37 additions & 4 deletions pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const (
runawayRecordGCBatchSize = 100
runawayRecordGCSelectBatchSize = runawayRecordGCBatchSize * 5

maxIDRetries = 3
maxIDRetries = 3
runawayLoopLogErrorIntervalCount = 1800
)

var systemSchemaCIStr = model.NewCIStr("mysql")
Expand Down Expand Up @@ -140,20 +141,48 @@ func (do *Domain) deleteExpiredRows(tableName, colName string, expiredDuration t
}
}

func (do *Domain) runawayStartLoop() {
defer util.Recover(metrics.LabelDomain, "runawayStartLoop", nil, false)
runawayWatchSyncTicker := time.NewTicker(runawayWatchSyncInterval)
count := 0
var err error
logutil.BgLogger().Info("try to start runaway manager loop")
for {
select {
case <-do.exit:
return
case <-runawayWatchSyncTicker.C:
// Due to the watch and watch done tables is created later than runaway queries table
err = do.updateNewAndDoneWatch()
if err == nil {
logutil.BgLogger().Info("preparations for the runaway manager are finished and start runaway manager loop")
do.wg.Run(do.runawayRecordFlushLoop, "runawayRecordFlushLoop")
do.wg.Run(do.runawayWatchSyncLoop, "runawayWatchSyncLoop")
do.runawayManager.MarkSyncerInitialized()
return
}
}
if count %= runawayLoopLogErrorIntervalCount; count == 0 {
logutil.BgLogger().Warn(
"failed to start runaway manager loop, please check whether the bootstrap or update is finished",
zap.Error(err))
}
count++
}
}

func (do *Domain) updateNewAndDoneWatch() error {
do.runawaySyncer.mu.Lock()
defer do.runawaySyncer.mu.Unlock()
records, err := do.runawaySyncer.getNewWatchRecords()
if err != nil {
logutil.BgLogger().Error("try to get new runaway watch", zap.Error(err))
return err
}
for _, r := range records {
do.runawayManager.AddWatch(r)
}
doneRecords, err := do.runawaySyncer.getNewWatchDoneRecords()
if err != nil {
logutil.BgLogger().Error("try to get done runaway watch", zap.Error(err))
return err
}
for _, r := range doneRecords {
Expand All @@ -165,14 +194,18 @@ func (do *Domain) updateNewAndDoneWatch() error {
func (do *Domain) runawayWatchSyncLoop() {
defer util.Recover(metrics.LabelDomain, "runawayWatchSyncLoop", nil, false)
runawayWatchSyncTicker := time.NewTicker(runawayWatchSyncInterval)
count := 0
for {
select {
case <-do.exit:
return
case <-runawayWatchSyncTicker.C:
err := do.updateNewAndDoneWatch()
if err != nil {
logutil.BgLogger().Warn("get runaway watch record failed", zap.Error(err))
if count %= runawayLoopLogErrorIntervalCount; count == 0 {
logutil.BgLogger().Warn("get runaway watch record failed", zap.Error(err))
}
count++
}
}
}
Expand Down

0 comments on commit 156432d

Please sign in to comment.