Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: use a ttl duration to protect a new recovered tiflash node from processing mpp tasks. #26793

Merged
merged 13 commits into from
Aug 4, 2021
3 changes: 0 additions & 3 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ func (s *tiflashTestSuite) TestDispatchTaskRetry(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout"), IsNil)
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
Expand Down
3 changes: 2 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
)
Expand Down Expand Up @@ -76,7 +77,7 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest) ([]MPPTaskMeta, error)
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest) Response
Expand Down
8 changes: 7 additions & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -346,7 +347,12 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic

func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context.Context, kvRanges []kv.KeyRange, tableID int64) ([]*kv.MPPTask, error) {
req := &kv.MPPBuildTasksRequest{KeyRanges: kvRanges}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req)
ttl, err := time.ParseDuration(e.ctx.GetSessionVars().MPPStoreFailTTL)
if err != nil {
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
9 changes: 9 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@ type SessionVars struct {

// TemporaryTableData stores committed kv values for temporary table for current session.
TemporaryTableData kv.MemBuffer

// MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed.
MPPStoreLastFailTime map[string]time.Time

// MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash.
MPPStoreFailTTL string
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's
Expand Down Expand Up @@ -1097,6 +1103,8 @@ func NewSessionVars() *SessionVars {
CTEMaxRecursionDepth: DefCTEMaxRecursionDepth,
TMPTableSize: DefTMPTableSize,
EnableGlobalTemporaryTable: DefTiDBEnableGlobalTemporaryTable,
MPPStoreLastFailTime: make(map[string]time.Time),
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down Expand Up @@ -1146,6 +1154,7 @@ func NewSessionVars() *SessionVars {
vars.allowMPPExecution = DefTiDBAllowMPPExecution
vars.HashExchangeWithNewCollation = DefTiDBHashExchangeWithNewCollation
vars.enforceMPPExecution = DefTiDBEnforceMPPExecution
vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,10 @@ var defaultSysVars = []*SysVar{
s.allowMPPExecution = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMPPStoreFailTTL, Type: TypeStr, Value: DefTiDBMPPStoreFailTTL, SetSession: func(s *SessionVars, val string) error {
s.MPPStoreFailTTL = val
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBHashExchangeWithNewCollation, Type: TypeBool, Value: BoolToOnOff(DefTiDBHashExchangeWithNewCollation), SetSession: func(s *SessionVars, val string) error {
s.HashExchangeWithNewCollation = TiDBOptOn(val)
return nil
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ const (
// Note if you want to set `tidb_enforce_mpp` to `true`, you must set `tidb_allow_mpp` to `true` first.
TiDBEnforceMPPExecution = "tidb_enforce_mpp"

// TiDBMPPStoreFailTTL is the unavailable time when a store is detected failed. During that time, tidb will not send any task to
// TiFlash even though the failed TiFlash node has been recovered.
TiDBMPPStoreFailTTL = "tidb_mpp_store_fail_ttl"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

Expand Down Expand Up @@ -668,6 +672,7 @@ const (
DefTiDBAllowMPPExecution = true
DefTiDBHashExchangeWithNewCollation = true
DefTiDBEnforceMPPExecution = false
DefTiDBMPPStoreFailTTL = "60s"
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down
57 changes: 35 additions & 22 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool) []*batchCopTask {
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask {
if len(originalTasks) <= 1 {
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
cache := kvStore.GetRegionCache()
storeTaskMap := make(map[uint64]*batchCopTask)
// storeCandidateRegionMap stores all the possible store->region map. Its content is
Expand All @@ -133,30 +134,42 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
cur := time.Now()
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]
aliveReq := tikvrpc.NewRequest(tikvrpc.CmdMPPAlive, &mpp.IsAliveRequest{}, kvrpcpb.Context{})
aliveReq.StoreTp = tikvrpc.TiFlash
alive := false
resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), aliveReq, 3*time.Second)
if err != nil {

var last time.Time
var ok bool
mu.Lock()
if last, ok = mppStoreLastFailTime[s.GetAddr()]; ok && cur.Sub(last) < 100*time.Millisecond {
// The interval time is so short that may happen in a same query, so we needn't to check again.
mu.Unlock()
return
}
mu.Unlock()

resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{
Type: tikvrpc.CmdMPPAlive,
StoreTp: tikvrpc.TiFlash,
Req: &mpp.IsAliveRequest{},
Context: kvrpcpb.Context{},
}, 2*time.Second)

if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available {
logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()), zap.String("err message", err.Error()))
} else {
rpcResp := resp.Resp.(*mpp.IsAliveResponse)
if rpcResp.Available {
alive = true
} else {
logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()))
}
mu.Lock()
mppStoreLastFailTime[s.GetAddr()] = time.Now()
mu.Unlock()
return
}
if !alive {

if cur.Sub(last) < ttl {
logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", last))
return
}

mu.Lock()
defer mu.Unlock()
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
Expand Down Expand Up @@ -292,7 +305,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
return ret
}

func buildBatchCopTasks(bo *Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, isMPP bool) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) ([]*batchCopTask, error) {
cache := store.GetRegionCache()
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
Expand Down Expand Up @@ -367,7 +380,7 @@ func buildBatchCopTasks(bo *Backoffer, store *kvStore, ranges *KeyRanges, storeT
}
logutil.BgLogger().Debug(msg)
}
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP)
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
Expand All @@ -394,7 +407,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.kvStore, ranges, req.StoreType, false)
tasks, err := buildBatchCopTasks(bo, c.store.kvStore, ranges, req.StoreType, nil, 0)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -528,19 +541,19 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *
}

// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []kv.KeyRange
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false)
return buildBatchCopTasks(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0)
}

const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
Expand Down
16 changes: 5 additions & 11 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {
}

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
}
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store, ranges, kv.TiFlash, true)
tasks, err := buildBatchCopTasks(bo, c.store, ranges, kv.TiFlash, mppStoreLastFailTime, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -331,15 +331,9 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques
rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, readTimeoutUltraLong)

if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error, and retrying", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
err = bo.Backoff(tikv.BoTiFlashRPC(), err)
if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
return
}
m.establishMPPConns(bo, req, taskMeta)
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(derr.ErrTiFlashServerTimeout)
return
}

Expand Down