Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: move tikv.keyranges and tikv.regioninfo to package store/… #24908

Merged
merged 9 commits into from
May 28, 2021
13 changes: 6 additions & 7 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strconv"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -32,8 +31,9 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -331,18 +331,17 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
}

maxSleep := 10000 // ms
bo := tikv.NewBackofferWithVars(context.Background(), maxSleep, nil)
tikvRange := *(*tikvstore.KeyRange)(unsafe.Pointer(&kvRange))
ranges, err := s.GetRegionCache().SplitRegionRanges(bo, []tikvstore.KeyRange{tikvRange})
bo := backoff.NewBackofferWithVars(context.Background(), maxSleep, nil)
rc := copr.NewRegionCache(s.GetRegionCache())
ranges, err := rc.SplitRegionRanges(bo, []kv.KeyRange{kvRange})
if err != nil {
return nil, errors.Trace(err)
}
if len(ranges) == 0 {
errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String())
return nil, errors.Trace(errInvalidSplitRegionRanges.GenWithStackByArgs(errMsg))
}
res := *(*[]kv.KeyRange)(unsafe.Pointer(&ranges))
return res, nil
return ranges, nil
}

func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
Expand Down
37 changes: 15 additions & 22 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -33,7 +32,6 @@ import (
"github.com/pingcap/tidb/store/driver/backoff"
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand All @@ -46,7 +44,7 @@ type batchCopTask struct {
cmdType tikvrpc.CmdType
ctx *tikv.RPCContext

regionInfos []tikv.RegionInfo
regionInfos []RegionInfo
}

type batchCopResponse struct {
Expand Down Expand Up @@ -109,7 +107,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
return originalTasks
}
storeTaskMap := make(map[uint64]*batchCopTask)
storeCandidateRegionMap := make(map[uint64]map[string]tikv.RegionInfo)
storeCandidateRegionMap := make(map[uint64]map[string]RegionInfo)
totalRegionCandidateNum := 0
totalRemainingRegionNum := 0

Expand All @@ -119,7 +117,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
storeAddr: task.storeAddr,
cmdType: task.cmdType,
ctx: task.ctx,
regionInfos: []tikv.RegionInfo{task.regionInfos[0]},
regionInfos: []RegionInfo{task.regionInfos[0]},
}
storeTaskMap[taskStoreID] = batchTask
}
Expand Down Expand Up @@ -155,7 +153,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
continue
}
if _, ok := storeCandidateRegionMap[storeID]; !ok {
candidateMap := make(map[string]tikv.RegionInfo)
candidateMap := make(map[string]RegionInfo)
storeCandidateRegionMap[storeID] = candidateMap
}
if _, duplicateRegion := storeCandidateRegionMap[storeID][taskKey]; duplicateRegion {
Expand Down Expand Up @@ -210,7 +208,7 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
break
}
var key string
var ri tikv.RegionInfo
var ri RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
Expand Down Expand Up @@ -246,13 +244,13 @@ func balanceBatchCopTask(originalTasks []*batchCopTask) []*batchCopTask {
return ret
}

func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
func buildBatchCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, storeType kv.StoreType) ([]*batchCopTask, error) {
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
rangesLen := ranges.Len()
for {

locations, err := cache.SplitKeyRangesByLocations(bo.TiKVBackoffer(), ranges)
locations, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -288,13 +286,13 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key
}
allStores := cache.GetAllValidTiFlashStores(task.region, rpcCtx.Store)
if batchCop, ok := storeTaskMap[rpcCtx.Addr]; ok {
batchCop.regionInfos = append(batchCop.regionInfos, tikv.RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
batchCop.regionInfos = append(batchCop.regionInfos, RegionInfo{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores})
} else {
batchTask := &batchCopTask{
storeAddr: rpcCtx.Addr,
cmdType: cmdType,
ctx: rpcCtx,
regionInfos: []tikv.RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
regionInfos: []RegionInfo{{Region: task.region, Meta: rpcCtx.Meta, Ranges: task.ranges, AllStores: allStores}},
}
storeTaskMap[rpcCtx.Addr] = batchTask
}
Expand Down Expand Up @@ -346,7 +344,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
if err != nil {
return copErrorResponse{err}
Expand Down Expand Up @@ -482,19 +480,19 @@ func (b *batchCopIterator) handleTask(ctx context.Context, bo *Backoffer, task *

// Merge all ranges and request again.
func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *Backoffer, batchTask *batchCopTask) ([]*batchCopTask, error) {
var ranges []tikvstore.KeyRange
var ranges []kv.KeyRange
for _, ri := range batchTask.regionInfos {
ri.Ranges.Do(func(ran *tikvstore.KeyRange) {
ri.Ranges.Do(func(ran *kv.KeyRange) {
ranges = append(ranges, *ran)
})
}
return buildBatchCopTasks(bo, b.store.GetRegionCache(), tikv.NewKeyRanges(ranges), b.req.StoreType)
return buildBatchCopTasks(bo, b.store.GetRegionCache(), NewKeyRanges(ranges), b.req.StoreType)
}

const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := tikv.NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
Expand Down Expand Up @@ -527,7 +525,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *Backoffer, ta
req.StoreTp = tikvrpc.TiFlash

logutil.BgLogger().Debug("send batch request to ", zap.String("req info", req.String()), zap.Int("cop task len", len(task.regionInfos)))
resp, retry, cancel, err := sender.SendReqToAddr(bo.TiKVBackoffer(), task.ctx, task.regionInfos, req, readTimeoutUltraLong)
resp, retry, cancel, err := sender.SendReqToAddr(bo, task.ctx, task.regionInfos, req, readTimeoutUltraLong)
// If there are store errors, we should retry for all regions.
if retry {
return b.retryBatchCopTask(ctx, bo, task)
Expand Down Expand Up @@ -625,8 +623,3 @@ func (b *batchCopIterator) sendToRespCh(resp *batchCopResponse) (exit bool) {
}
return
}

func toTiKVKeyRanges(ranges []kv.KeyRange) *tikv.KeyRanges {
res := *(*[]tikvstore.KeyRange)(unsafe.Pointer(&ranges))
return tikv.NewKeyRanges(res)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv
package copr

import (
"context"
Expand All @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/store/tikv"
tikverr "github.com/pingcap/tidb/store/tikv/error"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand All @@ -29,38 +30,38 @@ import (

// RegionInfo contains region related information for batchCopTask
type RegionInfo struct {
Region RegionVerID
Region tikv.RegionVerID
Meta *metapb.Region
Ranges *KeyRanges
AllStores []uint64
}

// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
type RegionBatchRequestSender struct {
*RegionRequestSender
*tikv.RegionRequestSender
}

// NewRegionBatchRequestSender creates a RegionBatchRequestSender object.
func NewRegionBatchRequestSender(cache *RegionCache, client Client) *RegionBatchRequestSender {
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client) *RegionBatchRequestSender {
return &RegionBatchRequestSender{
RegionRequestSender: NewRegionRequestSender(cache, client),
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client),
}
}

// SendReqToAddr send batch cop request
func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *tikv.RPCContext, regionInfos []RegionInfo, req *tikvrpc.Request, timout time.Duration) (resp *tikvrpc.Response, retry bool, cancel func(), err error) {
cancel = func() {}
if e := tikvrpc.SetContext(req, rpcCtx.Meta, rpcCtx.Peer); e != nil {
return nil, false, cancel, errors.Trace(e)
}
ctx := bo.GetCtx()
if rawHook := ctx.Value(RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx)
if rawHook := ctx.Value(tikv.RPCCancellerCtxKey{}); rawHook != nil {
ctx, cancel = rawHook.(*tikv.RPCCanceller).WithCancel(ctx)
}
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil {
RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
}
if err != nil {
cancel()
Expand All @@ -75,11 +76,11 @@ func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *RPCCont
return
}

func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *RPCContext, regionInfos []RegionInfo, err error) error {
func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx *tikv.RPCContext, regionInfos []RegionInfo, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
return errors.Trace(err)
} else if atomic.LoadUint32(&ShuttingDown) > 0 {
} else if atomic.LoadUint32(&tikv.ShuttingDown) > 0 {
return tikverr.ErrTiDBShuttingDown
}

Expand All @@ -88,7 +89,8 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx
// will change. If tiflash's replica is more than two, the "reload region" will always be false.
// Now that the batch cop and mpp has a relative low qps, it's reasonable to reload every time
// when meeting io error.
ss.GetRegionCache().OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)
rc := RegionCache{ss.GetRegionCache()}
rc.OnSendFailForBatchRegions(bo, ctx.Store, regionInfos, true, err)

// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
Expand Down
14 changes: 7 additions & 7 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
ranges := NewKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
if err != nil {
return copErrorResponse{err}
Expand Down Expand Up @@ -130,7 +130,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
// copTask contains a related Region and KeyRange for a kv.Request.
type copTask struct {
region tikv.RegionVerID
ranges *tikv.KeyRanges
ranges *KeyRanges

respChan chan *copResponse
storeAddr string
Expand All @@ -146,7 +146,7 @@ func (r *copTask) String() string {
// rangesPerTask limits the length of the ranges slice sent in one copTask.
const rangesPerTask = 25000

func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
start := time.Now()
cmdType := tikvrpc.CmdCop
if req.Streaming {
Expand All @@ -159,7 +159,7 @@ func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRange

rangesLen := ranges.Len()

locs, err := cache.SplitKeyRangesByLocations(bo.TiKVBackoffer(), ranges)
locs, err := cache.SplitKeyRangesByLocations(bo, ranges)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func buildCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.KeyRange
return tasks, nil
}

func buildTiDBMemCopTasks(ranges *tikv.KeyRanges, req *kv.Request) ([]*copTask, error) {
func buildTiDBMemCopTasks(ranges *KeyRanges, req *kv.Request) ([]*copTask, error) {
servers, err := infosync.GetAllServerInfo(context.Background())
if err != nil {
return nil, err
Expand Down Expand Up @@ -905,7 +905,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if resp.pbResp.Range != nil {
resp.startKey = resp.pbResp.Range.Start
} else if task.ranges != nil && task.ranges.Len() > 0 {
resp.startKey = kv.Key(task.ranges.At(0).StartKey)
resp.startKey = task.ranges.At(0).StartKey
}
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
Expand Down Expand Up @@ -1033,7 +1033,7 @@ func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRang
// split: [s1 --> s2)
// In normal scan order, all data before s1 is consumed, so the remain ranges should be [s1 --> r2) [r3 --> r4)
// In reverse scan order, all data after s2 is consumed, so the remain ranges should be [r1 --> r2) [r3 --> s2)
func (worker *copIteratorWorker) calculateRemain(ranges *tikv.KeyRanges, split *coprocessor.KeyRange, desc bool) *tikv.KeyRanges {
func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges {
if desc {
left, _ := ranges.Split(split.End)
return left
Expand Down
Loading