Skip to content

Commit

Permalink
store/copr: fix build batchCop in disaggregated tiflash mode (#40008)
Browse files Browse the repository at this point in the history
close #40035
  • Loading branch information
guo-shaoge authored Jan 4, 2023
1 parent 7fafb6d commit ccee532
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 98 deletions.
1 change: 0 additions & 1 deletion store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ go_library(
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stathat_consistent//:consistent",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
Expand Down
216 changes: 147 additions & 69 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/util/logutil"
"github.com/stathat/consistent"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -323,40 +323,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
storeTaskMap[taskStoreID] = batchTask
}
} else {
logutil.BgLogger().Info("detecting available mpp stores")
// decide the available stores
stores := cache.RegionCache.GetTiFlashStores()
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]

// check if store is failed already.
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
if !ok {
return
}

tikvClient := kvStore.GetTiKVClient()
ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

mu.Lock()
defer mu.Unlock()
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s},
}
}(i)
aliveStores := filterAliveStores(ctx, stores, ttl, kvStore)
for _, s := range aliveStores {
storeTaskMap[s.StoreID()] = &batchCopTask{
storeAddr: s.GetAddr(),
cmdType: originalTasks[0].cmdType,
ctx: &tikv.RPCContext{Addr: s.GetAddr(), Store: s},
}
}
wg.Wait()
}

var candidateRegionInfos []RegionInfo
Expand Down Expand Up @@ -513,7 +488,7 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer,
balanceWithContinuity bool,
balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, ttl)
}
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand All @@ -528,7 +503,7 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
balanceContinuousRegionCount int64,
partitionIDs []int64) (batchTasks []*batchCopTask, err error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, ttl)
} else {
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
Expand All @@ -540,49 +515,152 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
return batchTasks, nil
}

func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
if err != nil {
return nil, err
}
cache := store.GetRegionCache()
stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
}
if len(stores) == 0 {
return nil, errors.New("No available tiflash_compute node")
}
func filterAliveStores(ctx context.Context, stores []*tikv.Store, ttl time.Duration, kvStore *kvStore) []*tikv.Store {
var aliveStores []*tikv.Store
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]

// Check if store is failed already.
if ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl); !ok {
return
}

tikvClient := kvStore.GetTiKVClient()
if ok := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit); !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

hasher := consistent.New()
for _, store := range stores {
hasher.Add(store.GetAddr())
mu.Lock()
defer mu.Unlock()
aliveStores = append(aliveStores, s)
}(i)
}
for _, task := range batchTasks {
addr, err := hasher.Get(task.storeAddr)
wg.Wait()

logutil.BgLogger().Info("detecting available mpp stores", zap.Any("total", len(stores)), zap.Any("alive", len(aliveStores)))
return aliveStores
}

// 1. Split range by region location to build copTasks.
// 2. For each copTask build its rpcCtx , the target tiflash_compute node will be chosen using consistent hash.
// 3. All copTasks that will be sent to one tiflash_compute node are put in one batchCopTask.
func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer,
kvStore *kvStore,
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
const cmdType = tikvrpc.CmdBatchCop
var retryNum int
cache := kvStore.GetRegionCache()

for {
retryNum++
var rangesLen int
tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)

for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
for _, lo := range locations {
tasks = append(tasks, &copTask{
region: lo.Location.Region,
ranges: lo.Ranges,
cmdType: cmdType,
storeType: storeType,
partitionIndex: int64(i),
})
regionIDs = append(regionIDs, lo.Location.Region)
}
}

stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
}
var store *tikv.Store
for _, s := range stores {
if s.GetAddr() == addr {
store = s
break
stores = filterAliveStores(bo.GetCtx(), stores, ttl, kvStore)
if len(stores) == 0 {
return nil, errors.New("tiflash_compute node is unavailable")
}

rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores)
if err != nil {
return nil, err
}
if rpcCtxs == nil {
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
if store == nil {
return nil, errors.New("cannot find tiflash_compute store: " + addr)
if len(rpcCtxs) != len(tasks) {
return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks))
}

task.storeAddr = addr
task.ctx.Store = store
task.ctx.Addr = addr
}
logutil.BgLogger().Info("build batchCop tasks for disaggregated tiflash using ConsistentHash done.", zap.Int("len(tasks)", len(batchTasks)))
for _, task := range batchTasks {
logutil.BgLogger().Debug("batchTasks detailed info", zap.String("addr", task.storeAddr), zap.Int("RegionInfo number", len(task.regionInfos)))
taskMap := make(map[string]*batchCopTask)
for i, rpcCtx := range rpcCtxs {
regionInfo := RegionInfo{
// tasks and rpcCtxs are correspond to each other.
Region: tasks[i].region,
Meta: rpcCtx.Meta,
Ranges: tasks[i].ranges,
AllStores: []uint64{rpcCtx.Store.StoreID()},
PartitionIndex: tasks[i].partitionIndex,
}
if batchTask, ok := taskMap[rpcCtx.Addr]; ok {
batchTask.regionInfos = append(batchTask.regionInfos, regionInfo)
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []RegionInfo{regionInfo},
}
taskMap[rpcCtx.Addr] = batchTask
res = append(res, batchTask)
}
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
break
}
return batchTasks, nil

failpointCheckForConsistentHash(res)
return res, nil
}

func failpointCheckForConsistentHash(tasks []*batchCopTask) {
failpoint.Inject("checkOnlyDispatchToTiFlashComputeNodes", func(val failpoint.Value) {
logutil.BgLogger().Debug("in checkOnlyDispatchToTiFlashComputeNodes")

// This failpoint will be tested in test-infra case, because we needs setup a cluster.
// All tiflash_compute nodes addrs are stored in val, separated by semicolon.
str := val.(string)
addrs := strings.Split(str, ";")
if len(addrs) < 1 {
err := fmt.Sprintf("unexpected length of tiflash_compute node addrs: %v, %s", len(addrs), str)
panic(err)
}
addrMap := make(map[string]struct{})
for _, addr := range addrs {
addrMap[addr] = struct{}{}
}
for _, batchTask := range tasks {
if _, ok := addrMap[batchTask.storeAddr]; !ok {
err := errors.Errorf("batchCopTask send to node which is not tiflash_compute: %v(tiflash_compute nodes: %s)", batchTask.storeAddr, str)
panic(err)
}
}
})
}

// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan.
Expand Down
30 changes: 12 additions & 18 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand All @@ -36,7 +35,6 @@ type RegionInfo struct {
Ranges *KeyRanges
AllStores []uint64
PartitionIndex int64 // used by PartitionTableScan, indicates the n-th partition of the partition table
Addr string
}

func (ri *RegionInfo) toCoprocessorRegionInfo() *coprocessor.RegionInfo {
Expand Down Expand Up @@ -100,22 +98,18 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx
return tikverr.ErrTiDBShuttingDown
}

if config.GetGlobalConfig().DisaggregatedTiFlash {
ss.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
} else {
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)
// The reload region param is always true. Because that every time we try, we must
// re-build the range then re-create the batch sender. As a result, the len of "failStores"
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)

// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
}
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(tikv.BoTiFlashRPC(), errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos))
return errors.Trace(err)
}
22 changes: 12 additions & 10 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,14 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
if originalTask != nil {
sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo)
rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
if err != nil && disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
}
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
// That's a hard job but we can try it in the future.
if sender.GetRPCError() != nil {
logutil.BgLogger().Warn("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
err = derr.ErrTiFlashServerTimeout
Expand All @@ -275,7 +275,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req
retry = false
} else if err != nil {
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
if bo.Backoff(tikv.BoTiFlashRPC(), err) == nil {
retry = true
Expand Down Expand Up @@ -355,6 +355,7 @@ func (m *mppIterator) cancelMppTasks() {
}

// send cancel cmd to all stores where tasks run
gotErr := atomic.Bool{}
wg := util.WaitGroupWrapper{}
for addr := range usedStoreAddrs {
storeAddr := addr
Expand All @@ -363,13 +364,14 @@ func (m *mppIterator) cancelMppTasks() {
logutil.BgLogger().Debug("cancel task", zap.Uint64("query id ", m.startTs), zap.String("on addr", storeAddr))
if err != nil {
logutil.BgLogger().Error("cancel task error", zap.Error(err), zap.Uint64("query id", m.startTs), zap.String("on addr", storeAddr))
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
}
gotErr.CompareAndSwap(false, true)
}
})
}
wg.Wait()
if gotErr.Load() && disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
}

func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchRequest, taskMeta *mpp.TaskMeta) {
Expand All @@ -396,15 +398,15 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques

if err != nil {
logutil.BgLogger().Warn("establish mpp connection meet error and cannot retry", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId))
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStores()
}
// if needTriggerFallback is true, we return timeout to trigger tikv's fallback
if m.needTriggerFallback {
m.sendError(derr.ErrTiFlashServerTimeout)
} else {
m.sendError(err)
}
if disaggregatedTiFlash {
m.store.GetRegionCache().InvalidateTiFlashComputeStoresIfGRPCError(err)
}
return
}

Expand Down

0 comments on commit ccee532

Please sign in to comment.