Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copr: add paging API for streaming-like process #29612

Merged
merged 43 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
c44f96a
add paging result
you06 Nov 2, 2021
bfc82e9
add paging result
you06 Nov 2, 2021
f5e937f
use page selection
you06 Nov 2, 2021
4528511
add paging copr
you06 Nov 3, 2021
565a6cf
fix bug
you06 Nov 3, 2021
f341ab9
there is a bug hard to fix
you06 Nov 3, 2021
a588b1b
update
you06 Nov 4, 2021
ff94658
remove incorrect error
you06 Nov 4, 2021
481a6c0
slow growth of paging size
you06 Nov 5, 2021
f000f14
make table read paging
you06 Nov 5, 2021
cb72861
clean up code
you06 Nov 9, 2021
a8ea777
remove unused code
you06 Nov 9, 2021
ff90d1a
close indexlookup executor in background
you06 Nov 10, 2021
321c7db
clean up
you06 Nov 18, 2021
7ed115f
clean up
you06 Nov 18, 2021
1c4a113
remove executor usage of paging
you06 Nov 23, 2021
ed1f80b
update go.mod
you06 Nov 23, 2021
3a389ef
add comment
you06 Nov 25, 2021
bf157df
add tests & comments
you06 Nov 25, 2021
5d46cba
add test & return error when paging and streaming are both on
you06 Nov 25, 2021
d088975
enlarge chan size for paging or streaming request
you06 Nov 25, 2021
b165410
Update store/copr/coprocessor.go
you06 Nov 26, 2021
0e57254
Update store/copr/coprocessor.go
you06 Nov 26, 2021
fe51c05
manually edit the ranges
you06 Nov 26, 2021
cf63a4f
enlarge channel size for ordered copr paging req
you06 Nov 26, 2021
4913c84
fix typo
you06 Nov 26, 2021
7c99ec8
Update store/copr/coprocessor.go
you06 Nov 26, 2021
4f54ae1
hide paging sysvar
you06 Nov 26, 2021
c08fceb
address comments
you06 Nov 29, 2021
ce7791a
fix lightning
you06 Nov 29, 2021
7127bbc
fix test of lightning
you06 Nov 29, 2021
0b0a4e3
merge master
you06 Nov 29, 2021
e2a3965
extract handleCopPagingResult & clean up
you06 Nov 29, 2021
659a65b
check exit cond
you06 Nov 29, 2021
57e0236
remove redundant build tasks
you06 Nov 29, 2021
241a4c4
clean up
you06 Nov 29, 2021
2a7e73b
add test for TestCalculateRetry
you06 Nov 29, 2021
657cb04
address comment
you06 Nov 29, 2021
8e322f0
remove build task for retry
you06 Nov 29, 2021
f5da091
handle nil split range
you06 Nov 30, 2021
2e74e45
Merge branch 'master' into add-paging-copr
ti-chi-bot Nov 30, 2021
857d64e
Merge branch 'master' into add-paging-copr
ti-chi-bot Nov 30, 2021
f199afb
Merge branch 'master' into add-paging-copr
ti-chi-bot Nov 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
return builder
}

// SetPaging sets "Paging" flag for "kv.Request".
func (builder *RequestBuilder) SetPaging(paging bool) *RequestBuilder {
builder.Request.Paging = paging
return builder
}

// SetConcurrency sets "Concurrency" for "kv.Request".
func (builder *RequestBuilder) SetConcurrency(concurrency int) *RequestBuilder {
builder.Request.Concurrency = concurrency
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ type Request struct {
MatchStoreLabels []*metapb.StoreLabel
// ResourceGroupTagger indicates the kv request task group tagger.
ResourceGroupTagger tikvrpc.ResourceGroupTagger
// Paging indicates whether the request is a paging request.
Paging bool
}

const (
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,9 @@ type SessionVars struct {
EnableStmtOptimizeTrace bool
// Rng stores the rand_seed1 and rand_seed2 for Rand() function
Rng *utilMath.MysqlRng

// EnablePaging indicates whether enable paging in coprocessor requests.
EnablePaging bool
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,10 @@ var defaultSysVars = []*SysVar{
}, GetSession: func(s *SessionVars) (string, error) {
return "0", nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePaging, Value: Off, Type: TypeBool, Hidden: true, skipInit: true, SetSession: func(s *SessionVars, val string) error {
s.EnablePaging = TiDBOptOn(val)
return nil
}},
}

func collectAllowFuncName4ExpressionIndex() string {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ const (

// TiDBReadStaleness indicates the staleness duration for following statement
TiDBReadStaleness = "tidb_read_staleness"

// TiDBEnablePaging indicates whether paging is enabled in coprocessor requests.
TiDBEnablePaging = "tidb_enable_paging"
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
)

// TiDB system variable names that both in session and global scope.
Expand Down
172 changes: 132 additions & 40 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ const (
copNextMaxBackoff = 20000
)

// A paging request may be separated into multi requests if there are more data than a page.
// The paging size grows from min to max, it's not well tuned yet.
you06 marked this conversation as resolved.
Show resolved Hide resolved
// e.g. a paging request scans over range (r1, r200), it requires 64 rows in the first batch,
// if it's not drained, then the paging size grows, the new range is calculated like (r100, r200), then send a request again.
// Compare with the common unary request, paging request allows early access of data, it offers a streaming-like way processing data.
// TODO: may make the paging parameters configurable.
const (
minPagingSize uint64 = 64
maxPagingSize = minPagingSize * 128
pagingSizeGrow uint64 = 2
)

// CopClient is coprocessor client.
type CopClient struct {
kv.RequestTypeSupportedChecker
Expand All @@ -78,6 +90,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
if req.Streaming && req.Paging {
return copErrorResponse{errors.New("streaming and paging are both on")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
Expand Down Expand Up @@ -115,6 +130,13 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
// 2*it.concurrency to avoid deadlock in the unit test caused by the `MustExec` or `Exec`
capacity = it.concurrency * 2
}
// in streaming or paging request, a request will be returned in multi batches,
// enlarge the channel size to avoid the request blocked by buffer full.
if req.Streaming || req.Paging {
if capacity < 2048 {
capacity = 2048
}
}
it.respChan = make(chan *copResponse, capacity)
it.sendRate = util.NewRateLimit(it.concurrency)
}
Expand All @@ -140,7 +162,9 @@ type copTask struct {
cmdType tikvrpc.CmdType
storeType kv.StoreType

eventCb trxevents.EventCallback
eventCb trxevents.EventCallback
paging bool
pagingSize uint64
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -168,6 +192,14 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
if err != nil {
return nil, errors.Trace(err)
}
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
chanSize := 2
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
// in streaming or paging request, a request will be returned in multi batches,
// enlarge the channel size to avoid the request blocked by buffer full.
if req.Streaming || req.Paging {
chanSize = 128
}

var tasks []*copTask
for _, loc := range locs {
Expand All @@ -176,15 +208,21 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
rLen := loc.Ranges.Len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
// If this is a paging request, we set the paging size to minPagingSize,
// the size will grow every round.
pagingSize := uint64(0)
if req.Paging {
pagingSize = minPagingSize
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
ranges: loc.Ranges.Slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
eventCb: eventCb,
region: loc.Location.Region,
ranges: loc.Ranges.Slice(i, nextI),
respChan: make(chan *copResponse, chanSize),
cmdType: cmdType,
storeType: req.StoreType,
eventCb: eventCb,
paging: req.Paging,
pagingSize: pagingSize,
})
i = nextI
}
Expand Down Expand Up @@ -386,13 +424,8 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
worker.sendToRespCh(finCopResp, worker.respChan, false)
}
close(task.respChan)
if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
return
}
select {
case <-worker.finishCh:
if worker.finished() {
return
default:
}
}
}
Expand Down Expand Up @@ -648,11 +681,9 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask,
worker.sendToRespCh(resp, respCh, true)
return
}
// test whether the ctx is cancelled
if vars := bo.GetVars(); vars != nil && vars.Killed != nil && atomic.LoadUint32(vars.Killed) == 1 {
return
if worker.finished() {
break
}

if len(tasks) > 0 {
remainTasks = append(tasks, remainTasks[1:]...)
} else {
Expand All @@ -674,19 +705,21 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
})

copReq := coprocessor.Request{
Tp: worker.req.Tp,
StartTs: worker.req.StartTs,
Data: worker.req.Data,
Ranges: task.ranges.ToPBRanges(),
SchemaVer: worker.req.SchemaVar,
Tp: worker.req.Tp,
StartTs: worker.req.StartTs,
Data: worker.req.Data,
Ranges: task.ranges.ToPBRanges(),
SchemaVer: worker.req.SchemaVar,
PagingSize: task.pagingSize,
}

var cacheKey []byte = nil
var cacheValue *coprCacheValue = nil
var cacheKey []byte
var cacheValue *coprCacheValue

// TODO: cache paging copr
// If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since
// computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key.
if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
if task.cmdType == tikvrpc.CmdCop && !task.paging && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
cKey, err := coprCacheBuildKey(&copReq)
if err == nil {
cacheKey = cKey
Expand Down Expand Up @@ -753,6 +786,10 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch, costTime)
}

if worker.req.Paging {
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, costTime)
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, nil, costTime)
}
Expand Down Expand Up @@ -862,14 +899,38 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *ti
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
return worker.buildCopTasksFromRemain(bo, lastRange, task)
task.ranges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
return []*copTask{task}, nil
}
if resp.Range != nil {
lastRange = resp.Range
}
}
}

func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, nil, nil, task, ch, nil, costTime)
if err != nil || len(remainedTasks) != 0 {
// If there is region error or lock error, keep the paging size and retry.
for _, remainedTask := range remainedTasks {
remainedTask.pagingSize = task.pagingSize
}
return remainedTasks, errors.Trace(err)
}
pagingRange := resp.pbResp.Range
// only paging requests need to calculate the next ranges
you06 marked this conversation as resolved.
Show resolved Hide resolved
if pagingRange == nil {
return nil, errors.New("lastRange in paging should not be nil")
}
// calculate next ranges and grow the paging size
task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc)
if task.ranges.Len() == 0 {
return nil, nil
}
task.pagingSize = growPagingSize(task.pagingSize)
return []*copTask{task}, nil
}

// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
Expand Down Expand Up @@ -909,7 +970,10 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
return nil, errors.Trace(err)
}
}
return worker.buildCopTasksFromRemain(bo, lastRange, task)
if worker.req.Streaming {
task.ranges = worker.calculateRetry(task.ranges, lastRange, worker.req.Desc)
youjiali1995 marked this conversation as resolved.
Show resolved Hide resolved
}
return []*copTask{task}, nil
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1037,30 +1101,50 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask,
return nil
}

func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) {
remainedRanges := task.ranges
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
// calculateRetry splits the input ranges into two, and take one of them according to desc flag.
// It's used in streaming API, to calculate which range is consumed and what needs to be retry.
// For example:
// ranges: [r1 --> r2) [r3 --> r4)
// split: [s1 --> s2)
// In normal scan order, all data before s1 is consumed, so the retry ranges should be [s1 --> r2) [r3 --> r4)
// In reverse scan order, all data after s2 is consumed, so the retry ranges should be [r1 --> r2) [r3 --> s2)
func (worker *copIteratorWorker) calculateRetry(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges {
if desc {
left, _ := ranges.Split(split.End)
return left
}
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.eventCb)
_, right := ranges.Split(split.Start)
return right
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
// It's used in streaming API, to calculate which range is consumed and what needs to be retry.
// calculateRemain calculates the remain ranges to be processed, it's used in streaming and paging API.
// For example:
// ranges: [r1 --> r2) [r3 --> r4)
// split: [s1 --> s2)
// In normal scan order, all data before s1 is consumed, so the remain ranges should be [s1 --> r2) [r3 --> r4)
// In reverse scan order, all data after s2 is consumed, so the remain ranges should be [r1 --> r2) [r3 --> s2)
// In normal scan order, all data before s2 is consumed, so the remained ranges should be [s2 --> r4)
// In reverse scan order, all data after s1 is consumed, so the remained ranges should be [r1 --> s1)
func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges {
if desc {
left, _ := ranges.Split(split.End)
left, _ := ranges.Split(split.Start)
return left
}
_, right := ranges.Split(split.Start)
_, right := ranges.Split(split.End)
return right
}

// finished checks the flags and finished channel, it tells whether the worker is finished.
func (worker *copIteratorWorker) finished() bool {
if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
return true
}
select {
case <-worker.finishCh:
return true
default:
return false
}
}

func (it *copIterator) Close() error {
if atomic.CompareAndSwapUint32(&it.closed, 0, 1) {
close(it.finishCh)
Expand Down Expand Up @@ -1241,3 +1325,11 @@ func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel {
return kvrpcpb.IsolationLevel_SI
}
}

func growPagingSize(size uint64) uint64 {
size *= pagingSizeGrow
if size > maxPagingSize {
return maxPagingSize
}
return size
}
Loading