Skip to content

Commit

Permalink
store/copr: disable stale read when meeting lock for cop (#44240) (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 29, 2023
1 parent 17d97bd commit 97ef067
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,11 @@ type copTask struct {
pagingSize uint64
pagingTaskIdx uint32

partitionIndex int64 // used by balanceBatchCopTask in PartitionTableScan
requestSource util.RequestSource
RowCountHint int // used for extra concurrency of small tasks, -1 for unknown row count
batchTaskList map[uint64]*batchedCopTask
partitionIndex int64 // used by balanceBatchCopTask in PartitionTableScan
requestSource util.RequestSource
RowCountHint int // used for extra concurrency of small tasks, -1 for unknown row count
batchTaskList map[uint64]*batchedCopTask
meetLockFallback bool
}

type batchedCopTask struct {
Expand Down Expand Up @@ -1027,7 +1028,9 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
// set ReadReplicaScope and TxnScope so that req.IsStaleRead will be true when it's a global scope stale read.
req.ReadReplicaScope = worker.req.ReadReplicaScope
req.TxnScope = worker.req.TxnScope
if worker.req.IsStaleness {
if task.meetLockFallback {
req.DisableStaleReadMeetLock()
} else if worker.req.IsStaleness {
req.EnableStaleRead()
}
staleRead := req.GetStaleRead()
Expand Down Expand Up @@ -1177,6 +1180,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
task.meetLockFallback = true
return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.BatchResponses, task, ch)
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
Expand Down Expand Up @@ -1325,6 +1329,7 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
if err := worker.handleLockErr(bo, resp.pbResp.GetLocked(), task); err != nil {
return nil, err
}
task.meetLockFallback = true
remainTasks = append(remainTasks, task)
continue
}
Expand Down

0 comments on commit 97ef067

Please sign in to comment.