Skip to content

Commit

Permalink
store/copr: move tikv.keyranges and tikv.regioninfo to package store/… (
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored May 28, 2021
1 parent cea4e20 commit 7a15d64
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 243 deletions.
13 changes: 6 additions & 7 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strconv"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -32,8 +31,9 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -331,18 +331,17 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
}

maxSleep := 10000 // ms
bo := tikv.NewBackofferWithVars(context.Background(), maxSleep, nil)
tikvRange := *(*tikvstore.KeyRange)(unsafe.Pointer(&kvRange))
ranges, err := s.GetRegionCache().SplitRegionRanges(bo, []tikvstore.KeyRange{tikvRange})
bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil)
rc := copr.NewRegionCache(s.GetRegionCache())
ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange})
if err != nil {
return nil, errors.Trace(err)
}
if len(ranges) == 0 {
errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String())
return nil, errors.Trace(errInvalidSplitRegionRanges.GenWithStackByArgs(errMsg))
}
res := *(*[]kv.KeyRange)(unsafe.Pointer(&ranges))
return res, nil
return ranges, nil
}

func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
Expand Down
37 changes: 15 additions & 22 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -33,7 +32,6 @@ import (
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand All @@ -46,7 +44,7 @@ type batchCopTask struct {
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext

regionInfos []tikv.RegionInfo
regionInfos []RegionInfo
}

type batchCopResponse struct {
Expand Down Expand Up @@ -109,7 +107,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
return originalTasks
}
storeTaskMap := make(map[uint64]*batchCopTask)
storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo)
storeCandidateRegionMap := make(map[uint64]map[string]RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0

Expand All @@ -119,7 +117,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []tikv.RegionInfo{task.regionInfos[0]},
regionInfos: []RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
}
Expand Down Expand Up @@ -155,7 +153,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
continue
}
if _, ok := storeCandidateRegionMap[storeID]; !ok {
candidateMap := make(map[string]tikv.RegionInfo)
candidateMap := make(map[string]RegionInfo)
storeCandidateRegionMap[storeID] = candidateMap
}
if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion {
Expand Down Expand Up @@ -210,7 +208,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
break
}
var key string
var ri tikv.RegionInfo
var ri RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
Expand Down Expand Up @@ -246,13 +244,13 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
return ret
}

func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
rangesLen := ranges.Len()
for {

locations, err := cache.SplitKeyRangesByLocations(bo.TiKVBackoffer(), ranges)
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -288,13 +286,13 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
batchCop.regionInfos = append(batchCop.regionInfos, RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
regionInfos: []RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
Expand Down Expand Up @@ -346,7 +344,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
if err != nil {
return copErrorResponse{err}
Expand Down Expand Up @@ -482,19 +480,19 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *

// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
var ranges []kv.KeyRange
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *tikvstore.KeyRange) {
ri.Ranges.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
return buildBatchCopTasks(bo, b.store.GetRegionCache(), NewKeyRanges(ranges), b.req.StoreType)
}

const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
Expand Down Expand Up @@ -527,7 +525,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
req.StoreTp = tikvrpc.TiFlash

logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, readTimeoutUltraLong)
resp, retry, cancel, err := sender.SendReqToAddr(bo, task.ctx, task.regionInfos, req, readTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
Expand Down Expand Up @@ -625,8 +623,3 @@ func (b *batchCopIterator) sendToRespCh(resp *batchCopResponse) (exit bool) {
}
return
}

func toTiKVKeyRanges(ranges []kv.KeyRange) *tikv.KeyRanges {
res := *(*[]tikvstore.KeyRange)(unsafe.Pointer(&ranges))
return tikv.NewKeyRanges(res)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv
package copr

import (
"context"
Expand All @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand All @@ -29,38 +30,38 @@ import (

// RegionInfo contains region related information for batchCopTask
type RegionInfo struct {
Region RegionVerID
Region tikv.RegionVerID
Meta *metapb.Region
Ranges *KeyRanges
AllStores []uint64
}

// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
type RegionBatchRequestSender struct {
*RegionRequestSender
*tikv.RegionRequestSender
}

// NewRegionBatchRequestSender creates a RegionBatchRequestSender object.
func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatchRequestSender {
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client) *RegionBatchRequestSender {
return &RegionBatchRequestSender{
RegionRequestSender: NewRegionRequestSender(cache, client),
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client),
}
}

// SendReqToAddr send batch cop request
func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *tikv.RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
cancel = func() {}
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, cancel, errors.Trace(e)
}
ctx := bo.GetCtx()
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
if rawHook := ctx.Value(tikv.RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*tikv.RPCCanceller).WithCancel(ctx)
}
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil {
RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
}
if err != nil {
cancel()
Expand All @@ -75,11 +76,11 @@ func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCCont
return
}

func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *RPCContext, regionInfos []RegionInfo, err error) error {
func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *tikv.RPCContext, regionInfos []RegionInfo, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
return errors.Trace(err)
} else if atomic.LoadUint32(&ShuttingDown) > 0 {
} else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 {
return tikverr.ErrTiDBShuttingDown
}

Expand All @@ -88,7 +89,8 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)

// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
Expand Down
14 changes: 7 additions & 7 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
if err != nil {
return copErrorResponse{err}
Expand Down Expand Up @@ -130,7 +130,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
// copTask contains a related Region and KeyRange for a kv.Request.
type copTask struct {
region tikv.RegionVerID
ranges *tikv.KeyRanges
ranges *KeyRanges

respChan chan *copResponse
storeAddr string
Expand All @@ -146,7 +146,7 @@ func (r *copTask) String() string {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand All @@ -159,7 +159,7 @@ func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRange

rangesLen := ranges.Len()

locs, err := cache.SplitKeyRangesByLocations(bo.TiKVBackoffer(), ranges)
locs, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRange
return tasks, nil
}

func buildTiDBMemCopTasks(ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
servers, err := infosync.GetAllServerInfo(context.Background())
if err != nil {
return nil, err
Expand Down Expand Up @@ -905,7 +905,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if resp.pbResp.Range != nil {
resp.startKey = resp.pbResp.Range.Start
} else if task.ranges != nil && task.ranges.Len() > 0 {
resp.startKey = kv.Key(task.ranges.At(0).StartKey)
resp.startKey = task.ranges.At(0).StartKey
}
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang
// split: [s1 --> s2)
// In normal scan order, all data before s1 is consumed, so the remain ranges should be [s1 --> r2) [r3 --> r4)
// In reverse scan order, all data after s2 is consumed, so the remain ranges should be [r1 --> r2) [r3 --> s2)
func (worker *copIteratorWorker) calculateRemain(ranges *tikv.KeyRanges, split *coprocessor.KeyRange, desc bool) *tikv.KeyRanges {
func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges {
if desc {
left, _ := ranges.Split(split.End)
return left
Expand Down
Loading

0 comments on commit 7a15d64

Please sign in to comment.