Skip to content

Commit

Permalink
store/copr: block the tiflash node for a period when it fails before. (
Browse files Browse the repository at this point in the history
  • Loading branch information
hanfei1991 authored Aug 5, 2021
1 parent d9bb795 commit 5991ba4
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 34 deletions.
3 changes: 0 additions & 3 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,6 @@ func (s *tiflashTestSuite) TestDispatchTaskRetry(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppDispatchTimeout"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout", "3*return(true)"), IsNil)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("4"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/mppConnTimeout"), IsNil)
}

func (s *tiflashTestSuite) TestCancelMppTasks(c *C) {
Expand Down
3 changes: 2 additions & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
)
Expand Down Expand Up @@ -76,7 +77,7 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest) ([]MPPTaskMeta, error)
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, map[string]time.Time, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(context.Context, *Variables, []*MPPDispatchRequest) Response
Expand Down
8 changes: 7 additions & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -346,7 +347,12 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic

func (e *mppTaskGenerator) constructMPPTasksForSinglePartitionTable(ctx context.Context, kvRanges []kv.KeyRange, tableID int64) ([]*kv.MPPTask, error) {
req := &kv.MPPBuildTasksRequest{KeyRanges: kvRanges}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req)
ttl, err := time.ParseDuration(e.ctx.GetSessionVars().MPPStoreFailTTL)
if err != nil {
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2503,6 +2503,7 @@ var builtinGlobalVariable = []string{
variable.TiDBTxnMode,
variable.TiDBAllowBatchCop,
variable.TiDBAllowMPPExecution,
variable.TiDBMPPStoreFailTTL,
variable.TiDBOptBCJ,
variable.TiDBBCJThresholdSize,
variable.TiDBBCJThresholdCount,
Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,12 @@ type SessionVars struct {

// EnableStableResultMode if stabilize query results.
EnableStableResultMode bool

// MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed.
MPPStoreLastFailTime map[string]time.Time

// MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash.
MPPStoreFailTTL string
}

// AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's
Expand Down Expand Up @@ -1058,6 +1064,7 @@ func NewSessionVars() *SessionVars {
AnalyzeVersion: DefTiDBAnalyzeVersion,
EnableIndexMergeJoin: DefTiDBEnableIndexMergeJoin,
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
MPPStoreLastFailTime: make(map[string]time.Time),
}
vars.KVVars = kv.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down Expand Up @@ -1106,6 +1113,7 @@ func NewSessionVars() *SessionVars {
vars.AllowBatchCop = DefTiDBAllowBatchCop
vars.allowMPPExecution = DefTiDBAllowMPPExecution
vars.enforceMPPExecution = DefTiDBEnforceMPPExecution
vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL

var enableChunkRPC string
if config.GetGlobalConfig().TiKVClient.EnableChunkRPC {
Expand Down Expand Up @@ -1532,6 +1540,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.allowMPPExecution = TiDBOptOn(val)
case TiDBEnforceMPPExecution:
s.enforceMPPExecution = TiDBOptOn(val)
case TiDBMPPStoreFailTTL:
s.MPPStoreFailTTL = val
case TiDBIndexLookupSize:
s.IndexLookupSize = tidbOptPositiveInt32(val, DefIndexLookupSize)
case TiDBHashJoinConcurrency:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ var defaultSysVars = []*SysVar{
/* TiDB specific variables */
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAllowMPPExecution, Type: TypeBool, Value: BoolToOnOff(DefTiDBAllowMPPExecution)},
{Scope: ScopeSession, Name: TiDBEnforceMPPExecution, Type: TypeBool, Value: BoolToOnOff(config.GetGlobalConfig().Performance.EnforceMPP)},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMPPStoreFailTTL, Type: TypeStr, Value: DefTiDBMPPStoreFailTTL},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdCount, Value: strconv.Itoa(DefBroadcastJoinThresholdCount), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBCJThresholdSize, Value: strconv.Itoa(DefBroadcastJoinThresholdSize), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64},
{Scope: ScopeSession, Name: TiDBSnapshot, Value: ""},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ const (
// Note if you want to set `tidb_enforce_mpp` to `true`, you must set `tidb_allow_mpp` to `true` first.
TiDBEnforceMPPExecution = "tidb_enforce_mpp"

// TiDBMPPStoreFailTTL is the unavailable time when a store is detected failed. During that time, tidb will not send any task to
// TiFlash even though the failed TiFlash node has been recovered.
TiDBMPPStoreFailTTL = "tidb_mpp_store_fail_ttl"

// TiDBInitChunkSize is used to control the init chunk size during query execution.
TiDBInitChunkSize = "tidb_init_chunk_size"

Expand Down Expand Up @@ -633,6 +637,7 @@ const (
DefTiDBAllowBatchCop = 1
DefTiDBAllowMPPExecution = true
DefTiDBEnforceMPPExecution = false
DefTiDBMPPStoreFailTTL = "60s"
DefTiDBTxnMode = ""
DefTiDBRowFormatV1 = 1
DefTiDBRowFormatV2 = 2
Expand Down
48 changes: 30 additions & 18 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// 2. for the remaining regions:
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTasks []*batchCopTask, isMPP bool) []*batchCopTask {
func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask {
if len(originalTasks) <= 1 {
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
cache := kvStore.GetRegionCache()
storeTaskMap := make(map[uint64]*batchCopTask)
// storeCandidateRegionMap stores all the possible store->region map. Its content is
Expand All @@ -132,25 +133,36 @@ func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTas
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
cur := time.Now()
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]
aliveReq := tikvrpc.NewRequest(tikvrpc.CmdMPPAlive, &mpp.IsAliveRequest{}, kvrpcpb.Context{})
aliveReq.StoreTp = kv.TiFlash
alive := false
resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), aliveReq, 3*time.Second)
if err != nil {
var last time.Time
var ok bool
mu.Lock()
if last, ok = mppStoreLastFailTime[s.GetAddr()]; ok && cur.Sub(last) < 100*time.Millisecond {
mu.Unlock()
return
}
mu.Unlock()
resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{
Type: tikvrpc.CmdMPPAlive,
StoreTp: kv.TiFlash,
Req: &mpp.IsAliveRequest{},
Context: kvrpcpb.Context{},
}, 2*time.Second)

if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available {
logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()), zap.String("err message", err.Error()))
} else {
rpcResp := resp.Resp.(*mpp.IsAliveResponse)
if rpcResp.Available {
alive = true
} else {
logutil.BgLogger().Warn("Cannot detect store's availability", zap.String("store address", s.GetAddr()))
}
mu.Lock()
mppStoreLastFailTime[s.GetAddr()] = time.Now()
mu.Unlock()
return
}
if !alive {

if cur.Sub(last) < ttl {
logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", last))
return
}

Expand Down Expand Up @@ -291,7 +303,7 @@ func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTas
return ret
}

func buildBatchCopTasks(bo *tikv.Backoffer, store *tikv.KVStore, ranges *tikv.KeyRanges, storeType kv.StoreType, isMPP bool) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *tikv.Backoffer, store *tikv.KVStore, ranges *tikv.KeyRanges, storeType kv.StoreType, blockAddrs map[string]time.Time, ttl time.Duration) ([]*batchCopTask, error) {
cache := store.GetRegionCache()
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
Expand Down Expand Up @@ -366,7 +378,7 @@ func buildBatchCopTasks(bo *tikv.Backoffer, store *tikv.KVStore, ranges *tikv.Ke
}
logutil.BgLogger().Debug(msg)
}
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP)
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, blockAddrs, ttl)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
for _, task := range batchTasks {
Expand All @@ -392,7 +404,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *kv.Var
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
bo := tikv.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
tasks, err := buildBatchCopTasks(bo, c.store.KVStore, tikv.NewKeyRanges(req.KeyRanges), req.StoreType, false)
tasks, err := buildBatchCopTasks(bo, c.store.KVStore, tikv.NewKeyRanges(req.KeyRanges), req.StoreType, nil, 0)
if err != nil {
return copErrorResponse{err}
}
Expand Down Expand Up @@ -533,7 +545,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *tikv.Backo
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store, tikv.NewKeyRanges(ranges), b.req.StoreType, false)
return buildBatchCopTasks(bo, b.store, tikv.NewKeyRanges(ranges), b.req.StoreType, nil, 0)
}

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
Expand Down
16 changes: 5 additions & 11 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {
}

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, storeStatus map[string]time.Time, ttl time.Duration) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTS)
bo := tikv.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
}
tasks, err := buildBatchCopTasks(bo, c.store, tikv.NewKeyRanges(req.KeyRanges), kv.TiFlash, true)
tasks, err := buildBatchCopTasks(bo, c.store, tikv.NewKeyRanges(req.KeyRanges), kv.TiFlash, storeStatus, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -331,15 +331,9 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR
rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong)

if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error, and retrying", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
err = bo.Backoff(tikv.BoTiFlashRPC, err)
if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}
m.establishMPPConns(bo, req, taskMeta)
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
// we return timeout to trigger tikv's fallback
m.sendError(tikv.ErrTiFlashServerTimeout)
return
}

Expand Down

0 comments on commit 5991ba4

Please sign in to comment.