Skip to content

Commit

Permalink
resource_control: dont check runaway after resp (#51160) (#51199)
Browse files Browse the repository at this point in the history
close #51161
  • Loading branch information
ti-chi-bot authored Feb 20, 2024
1 parent 15064f4 commit c5b989c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 19 deletions.
19 changes: 15 additions & 4 deletions pkg/ddl/tests/resourcegroup/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,10 @@ func TestResourceGroupRunaway(t *testing.T) {
tk.MustQuery("select /*+ resource_group(rg3) */ * from t").Check(testkit.Rows("1"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest", fmt.Sprintf("return(%d)", 60)))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest"))
}()
err := tk.QueryToErr("select /*+ resource_group(rg1) */ * from t")
require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query")

tryInterval := time.Millisecond * 200
tryInterval := time.Millisecond * 100
maxWaitDuration := time.Second * 5
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows("rg1 select /*+ resource_group(rg1) */ * from t identify"), maxWaitDuration, tryInterval)
Expand Down Expand Up @@ -342,6 +339,20 @@ func TestResourceGroupRunaway(t *testing.T) {
tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=DRYRUN)")
tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1"))
tk.MustGetErrCode("select /*+ resource_group(rg3) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest"))

tk.MustExec("create resource group rg4 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 50)))
tk.MustQuery("select /*+ resource_group(rg4) */ * from t").Check(testkit.Rows("1"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 60)))
err = tk.QueryToErr("select /*+ resource_group(rg4) */ * from t")
require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query")
tk.MustGetErrCode("select /*+ resource_group(rg4) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine)
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch", nil,
testkit.Rows("rg3 select /*+ resource_group(rg3) */ * from t", "rg4 select /*+ resource_group(rg4) */ * from t"), maxWaitDuration, tryInterval)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq"))
}

func TestAlreadyExistsDefaultResourceGroup(t *testing.T) {
Expand Down
30 changes: 18 additions & 12 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
marked := r.marked.Load()
if !marked {
// note: now we don't check whether query is in watch list again.
until := time.Until(r.deadline)
now := time.Now()
until := r.deadline.Sub(now)
if until > 0 {
if r.setting.Action == rmpb.RunawayAction_Kill {
// if the execution time is close to the threshold, set a timeout
Expand All @@ -500,7 +501,6 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
}
// execution time exceeds the threshold, mark the query as runaway
if r.marked.CompareAndSwap(false, true) {
now := time.Now()
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
r.markQuarantine(&now)
}
Expand All @@ -518,20 +518,26 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
}
}

// AfterCopRequest checks runaway after receiving coprocessor response.
func (r *RunawayChecker) AfterCopRequest() {
if r.setting == nil {
return
// CheckCopRespError checks TiKV error after receiving coprocessor response.
func (r *RunawayChecker) CheckCopRespError(err error) error {
if err == nil || r.setting == nil || r.setting.Action != rmpb.RunawayAction_Kill {
return err
}
// Do not perform action here as it may be the last cop request and just let it finish. If it's not the last cop request, action would be performed in `BeforeCopRequest` when handling the next cop request.
// Here only marks the query as runaway
if !r.marked.Load() && r.deadline.Before(time.Now()) {
if r.marked.CompareAndSwap(false, true) {
if strings.HasPrefix(err.Error(), "Coprocessor task terminated due to exceeding the deadline") {
if !r.marked.Load() {
now := time.Now()
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
r.markQuarantine(&now)
if r.deadline.Before(now) && r.marked.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
r.markQuarantine(&now)
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
}
}
// Due to concurrency, check again.
if r.marked.Load() {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
}
}
return err
}

func (r *RunawayChecker) markQuarantine(now *time.Time) {
Expand Down
14 changes: 11 additions & 3 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,17 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region,
timeout, getEndPointType(task.storeType), task.storeAddr, ops...)
err = derr.ToTiDBErr(err)
if worker.req.RunawayChecker != nil {
failpoint.Inject("sleepCoprAfterReq", func(v failpoint.Value) {
//nolint:durationcheck
value := v.(int)
time.Sleep(time.Millisecond * time.Duration(value))
if value > 50 {
err = errors.Errorf("Coprocessor task terminated due to exceeding the deadline")
}
})
err = worker.req.RunawayChecker.CheckCopRespError(err)
}
if err != nil {
if task.storeType == kv.TiDB {
err = worker.handleTiDBSendReqErr(err, task, ch)
Expand All @@ -1260,9 +1271,6 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
if costTime > minLogCopTaskTime {
worker.logTimeCopTask(costTime, task, bo, copResp)
}
if worker.req.RunawayChecker != nil {
worker.req.RunawayChecker.AfterCopRequest()
}

storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
isInternal := util.IsRequestSourceInternal(&task.requestSource)
Expand Down

0 comments on commit c5b989c

Please sign in to comment.