diff --git a/pkg/ddl/tests/resourcegroup/resource_group_test.go b/pkg/ddl/tests/resourcegroup/resource_group_test.go index dbf4e88010968..9b32acc40d023 100644 --- a/pkg/ddl/tests/resourcegroup/resource_group_test.go +++ b/pkg/ddl/tests/resourcegroup/resource_group_test.go @@ -299,13 +299,10 @@ func TestResourceGroupRunaway(t *testing.T) { tk.MustQuery("select /*+ resource_group(rg3) */ * from t").Check(testkit.Rows("1")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest", fmt.Sprintf("return(%d)", 60))) - defer func() { - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest")) - }() err := tk.QueryToErr("select /*+ resource_group(rg1) */ * from t") require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query") - tryInterval := time.Millisecond * 200 + tryInterval := time.Millisecond * 100 maxWaitDuration := time.Second * 5 tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil, testkit.Rows("rg1 select /*+ resource_group(rg1) */ * from t identify"), maxWaitDuration, tryInterval) @@ -342,6 +339,20 @@ func TestResourceGroupRunaway(t *testing.T) { tk.MustExec("alter resource group rg2 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=DRYRUN)") tk.MustQuery("select /*+ resource_group(rg2) */ * from t").Check(testkit.Rows("1")) tk.MustGetErrCode("select /*+ resource_group(rg3) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest")) + + tk.MustExec("create resource group rg4 BURSTABLE RU_PER_SEC=2000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' action KILL WATCH EXACT)") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 50))) + tk.MustQuery("select /*+ resource_group(rg4) */ * from t").Check(testkit.Rows("1")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq")) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq", fmt.Sprintf("return(%d)", 60))) + err = tk.QueryToErr("select /*+ resource_group(rg4) */ * from t") + require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") + tk.MustGetErrCode("select /*+ resource_group(rg4) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine) + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch", nil, + testkit.Rows("rg3 select /*+ resource_group(rg3) */ * from t", "rg4 select /*+ resource_group(rg4) */ * from t"), maxWaitDuration, tryInterval) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprAfterReq")) } func TestAlreadyExistsDefaultResourceGroup(t *testing.T) { diff --git a/pkg/domain/resourcegroup/runaway.go b/pkg/domain/resourcegroup/runaway.go index 3f6b423040a1c..a7313324d4bbb 100644 --- a/pkg/domain/resourcegroup/runaway.go +++ b/pkg/domain/resourcegroup/runaway.go @@ -499,7 +499,8 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error { marked := r.marked.Load() if !marked { // note: now we don't check whether query is in watch list again. - until := time.Until(r.deadline) + now := time.Now() + until := r.deadline.Sub(now) if until > 0 { if r.setting.Action == rmpb.RunawayAction_Kill { // if the execution time is close to the threshold, set a timeout @@ -511,7 +512,6 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error { } // execution time exceeds the threshold, mark the query as runaway if r.marked.CompareAndSwap(false, true) { - now := time.Now() r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now) r.markQuarantine(&now) } @@ -529,20 +529,26 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error { } } -// AfterCopRequest checks runaway after receiving coprocessor response. -func (r *RunawayChecker) AfterCopRequest() { - if r.setting == nil { - return +// CheckCopRespError checks TiKV error after receiving coprocessor response. +func (r *RunawayChecker) CheckCopRespError(err error) error { + if err == nil || r.setting == nil || r.setting.Action != rmpb.RunawayAction_Kill { + return err } - // Do not perform action here as it may be the last cop request and just let it finish. If it's not the last cop request, action would be performed in `BeforeCopRequest` when handling the next cop request. - // Here only marks the query as runaway - if !r.marked.Load() && r.deadline.Before(time.Now()) { - if r.marked.CompareAndSwap(false, true) { + if strings.HasPrefix(err.Error(), "Coprocessor task terminated due to exceeding the deadline") { + if !r.marked.Load() { now := time.Now() - r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now) - r.markQuarantine(&now) + if r.deadline.Before(now) && r.marked.CompareAndSwap(false, true) { + r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now) + r.markQuarantine(&now) + return exeerrors.ErrResourceGroupQueryRunawayInterrupted + } + } + // Due to concurrency, check again. + if r.marked.Load() { + return exeerrors.ErrResourceGroupQueryRunawayInterrupted } } + return err } func (r *RunawayChecker) markQuarantine(now *time.Time) { diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index b554ef65af392..3abfba960f186 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -1245,6 +1245,17 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(bo.TiKVBackoffer(), req, task.region, timeout, getEndPointType(task.storeType), task.storeAddr, ops...) err = derr.ToTiDBErr(err) + if worker.req.RunawayChecker != nil { + failpoint.Inject("sleepCoprAfterReq", func(v failpoint.Value) { + //nolint:durationcheck + value := v.(int) + time.Sleep(time.Millisecond * time.Duration(value)) + if value > 50 { + err = errors.Errorf("Coprocessor task terminated due to exceeding the deadline") + } + }) + err = worker.req.RunawayChecker.CheckCopRespError(err) + } if err != nil { if task.storeType == kv.TiDB { err = worker.handleTiDBSendReqErr(err, task, ch) @@ -1262,9 +1273,6 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch if costTime > minLogCopTaskTime { worker.logTimeCopTask(costTime, task, bo, copResp) } - if worker.req.RunawayChecker != nil { - worker.req.RunawayChecker.AfterCopRequest() - } storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) isInternal := util.IsRequestSourceInternal(&task.requestSource)