diff --git a/executor/tiflash_test.go b/executor/tiflash_test.go index a99923712f428..7c2ba3c000c8e 100644 --- a/executor/tiflash_test.go +++ b/executor/tiflash_test.go @@ -526,9 +526,6 @@ func (s *tiflashTestSuite) TestDispatchTaskRetry(c *C) { c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "3*return(true)"), IsNil) tk.MustQuery("select count(*) from t").Check(testkit.Rows("4")) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil) - c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout", "3*return(true)"), IsNil) - tk.MustQuery("select count(*) from t").Check(testkit.Rows("4")) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout"), IsNil) } func (s *tiflashTestSuite) TestCancelMppTasks(c *C) { diff --git a/kv/mpp.go b/kv/mpp.go index fd81f2d264804..c3965831ae05b 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -15,6 +15,7 @@ package kv import ( "context" + "time" "github.com/pingcap/kvproto/pkg/mpp" ) @@ -76,7 +77,7 @@ type MPPDispatchRequest struct { type MPPClient interface { // ConstructMPPTasks schedules task for a plan fragment. // TODO:: This interface will be refined after we support more executors. - ConstructMPPTasks(context.Context, *MPPBuildTasksRequest) ([]MPPTaskMeta, error) + ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error) // DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest) Response diff --git a/planner/core/fragment.go b/planner/core/fragment.go index c4464c1d19f9b..8adcbf127ff26 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -15,6 +15,7 @@ package core import ( "context" + "time" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -346,7 +347,12 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context.Context, kvRanges []kv.KeyRange, tableID int64) ([]*kv.MPPTask, error) { req := &kv.MPPBuildTasksRequest{KeyRanges: kvRanges} - metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req) + ttl, err := time.ParseDuration(e.ctx.GetSessionVars().MPPStoreFailTTL) + if err != nil { + logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err)) + ttl = 30 * time.Second + } + metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl) if err != nil { return nil, errors.Trace(err) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b8ee35d0fcc40..2325f50fdb46e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -855,6 +855,12 @@ type SessionVars struct { // EnableStableResultMode if stabilize query results. EnableStableResultMode bool + + // MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed. + MPPStoreLastFailTime map[string]time.Time + + // MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash. + MPPStoreFailTTL string } // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's @@ -1069,6 +1075,8 @@ func NewSessionVars() *SessionVars { CTEMaxRecursionDepth: DefCTEMaxRecursionDepth, TMPTableSize: DefTMPTableSize, EnableGlobalTemporaryTable: DefTiDBEnableGlobalTemporaryTable, + MPPStoreLastFailTime: make(map[string]time.Time), + MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, } vars.KVVars = tikvstore.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ @@ -1117,6 +1125,7 @@ func NewSessionVars() *SessionVars { vars.AllowBatchCop = DefTiDBAllowBatchCop vars.allowMPPExecution = DefTiDBAllowMPPExecution vars.enforceMPPExecution = DefTiDBEnforceMPPExecution + vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL var enableChunkRPC string if config.GetGlobalConfig().TiKVClient.EnableChunkRPC { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 05da31bea6143..8c739f443a727 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -853,6 +853,10 @@ var defaultSysVars = []*SysVar{ s.allowMPPExecution = TiDBOptOn(val) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBMPPStoreFailTTL, Type: TypeStr, Value: DefTiDBMPPStoreFailTTL, SetSession: func(s *SessionVars, val string) error { + s.MPPStoreFailTTL = val + return nil + }}, {Scope: ScopeSession, Name: TiDBEnforceMPPExecution, Type: TypeBool, Value: BoolToOnOff(config.GetGlobalConfig().Performance.EnforceMPP), Validation: func(vars *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) { if TiDBOptOn(normalizedValue) && !vars.allowMPPExecution { return normalizedValue, ErrWrongValueForVar.GenWithStackByArgs("tidb_enforce_mpp", "1' but tidb_allow_mpp is 0, please activate tidb_allow_mpp at first.") diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 00c445ada3de6..65f6f01751745 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -310,6 +310,10 @@ const ( // Note if you want to set `tidb_enforce_mpp` to `true`, you must set `tidb_allow_mpp` to `true` first. TiDBEnforceMPPExecution = "tidb_enforce_mpp" + // TiDBMPPStoreFailTTL is the unavailable time when a store is detected failed. During that time, tidb will not send any task to + // TiFlash even though the failed TiFlash node has been recovered. + TiDBMPPStoreFailTTL = "tidb_mpp_store_fail_ttl" + // TiDBInitChunkSize is used to control the init chunk size during query execution. TiDBInitChunkSize = "tidb_init_chunk_size" @@ -656,6 +660,7 @@ const ( DefTiDBAllowBatchCop = 1 DefTiDBAllowMPPExecution = true DefTiDBEnforceMPPExecution = false + DefTiDBMPPStoreFailTTL = "60s" DefTiDBTxnMode = "" DefTiDBRowFormatV1 = 1 DefTiDBRowFormatV2 = 2 diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index c110d9cd34751..2b4e5eb5cc597 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -103,10 +103,11 @@ func (rs *batchCopResponse) RespTime() time.Duration { // 2. for the remaining regions: // if there is only 1 available store, then put the region to the related store // otherwise, use a greedy algorithm to put it into the store with highest weight -func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool) []*batchCopTask { +func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask { if len(originalTasks) <= 1 { return originalTasks } + isMPP := mppStoreLastFailTime != nil cache := kvStore.GetRegionCache() storeTaskMap := make(map[uint64]*batchCopTask) // storeCandidateRegionMap stores all the possible store->region map. Its content is @@ -133,25 +134,39 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] var wg sync.WaitGroup var mu sync.Mutex wg.Add(len(stores)) + cur := time.Now() for i := range stores { go func(idx int) { defer wg.Done() s := stores[idx] - aliveReq := tikvrpc.NewRequest(tikvrpc.CmdMPPAlive, &mpp.IsAliveRequest{}, kvrpcpb.Context{}) - aliveReq.StoreTp = tikvrpc.TiFlash - alive := false - resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), aliveReq, 3*time.Second) - if err != nil { + + var last time.Time + var ok bool + mu.Lock() + if last, ok = mppStoreLastFailTime[s.GetAddr()]; ok && cur.Sub(last) < 100*time.Millisecond { + // The interval time is so short that may happen in a same query, so we needn't to check again. + mu.Unlock() + return + } + mu.Unlock() + + resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{ + Type: tikvrpc.CmdMPPAlive, + StoreTp: tikvrpc.TiFlash, + Req: &mpp.IsAliveRequest{}, + Context: kvrpcpb.Context{}, + }, 2*time.Second) + + if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()), zap.String("err message", err.Error())) - } else { - rpcResp := resp.Resp.(*mpp.IsAliveResponse) - if rpcResp.Available { - alive = true - } else { - logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr())) - } + mu.Lock() + mppStoreLastFailTime[s.GetAddr()] = time.Now() + mu.Unlock() + return } - if !alive { + + if cur.Sub(last) < ttl { + logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", last)) return } @@ -292,7 +307,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] return ret } -func buildBatchCopTasks(bo *Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, isMPP bool) ([]*batchCopTask, error) { +func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) ([]*batchCopTask, error) { cache := store.GetRegionCache() start := time.Now() const cmdType = tikvrpc.CmdBatchCop @@ -367,7 +382,7 @@ func buildBatchCopTasks(bo *Backoffer, store *kvStore, ranges *KeyRanges, storeT } logutil.BgLogger().Debug(msg) } - batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP) + batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl) if log.GetLevel() <= zap.DebugLevel { msg := "After region balance:" for _, task := range batchTasks { @@ -394,7 +409,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars) ranges := NewKeyRanges(req.KeyRanges) - tasks, err := buildBatchCopTasks(bo, c.store.kvStore, ranges, req.StoreType, false) + tasks, err := buildBatchCopTasks(bo, c.store.kvStore, ranges, req.StoreType, nil, 0) if err != nil { return copErrorResponse{err} } @@ -528,19 +543,19 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task * } // Merge all ranges and request again. -func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { +func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) { var ranges []kv.KeyRange for _, ri := range batchTask.regionInfos { ri.Ranges.Do(func(ran *kv.KeyRange) { ranges = append(ranges, *ran) }) } - return buildBatchCopTasks(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false) + return buildBatchCopTasks(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0) } const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash. -func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) { +func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) { sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient()) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) for _, ri := range task.regionInfos { diff --git a/store/copr/mpp.go b/store/copr/mpp.go index ace10b904afee..f192620d82c0c 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -57,14 +57,14 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { } // ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns. -func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) { +func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) ([]kv.MPPTaskMeta, error) { ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil } ranges := NewKeyRanges(req.KeyRanges) - tasks, err := buildBatchCopTasks(bo, c.store, ranges, kv.TiFlash, true) + tasks, err := buildBatchCopTasks(bo, c.store, ranges, kv.TiFlash, mppStoreLastFailTime, ttl) if err != nil { return nil, errors.Trace(err) } @@ -331,15 +331,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, readTimeoutUltraLong) if err != nil { - logutil.BgLogger().Warn("establish mpp connection meet error, and retrying", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) - err = bo.Backoff(tikv.BoTiFlashRPC(), err) - if err != nil { - logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) - // we return timeout to trigger tikv's fallback - m.sendError(derr.ErrTiFlashServerTimeout) - return - } - m.establishMPPConns(bo, req, taskMeta) + logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) + // we return timeout to trigger tikv's fallback + m.sendError(derr.ErrTiFlashServerTimeout) return }