diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index d4e6bb045d974..8230266c6e2a1 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -190,6 +190,17 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars buildTaskElapsed: *buildOpt.elapsed, runawayChecker: req.RunawayChecker, } + // Pipelined-dml can flush locks when it is still reading. + // The coprocessor of the txn should not be blocked by itself. + // It should be the only case where a coprocessor can read locks of the same ts. + // + // But when start_ts is not obtained from PD, + // the start_ts could conflict with another pipelined-txn's start_ts. + // in which case the locks of same ts cannot be ignored. + // We rely on the assumption: start_ts is not from PD => this is a stale read. + if !req.IsStaleness { + it.resolvedLocks.Put(req.StartTs) + } it.tasks = tasks if it.concurrency > len(tasks) { it.concurrency = len(tasks) diff --git a/pkg/store/mockstore/unistore/cophandler/closure_exec.go b/pkg/store/mockstore/unistore/cophandler/closure_exec.go index 34bd8b0e38780..0a19605d34a99 100644 --- a/pkg/store/mockstore/unistore/cophandler/closure_exec.go +++ b/pkg/store/mockstore/unistore/cophandler/closure_exec.go @@ -1185,7 +1185,7 @@ func safeCopy(b []byte) []byte { } func checkLock(lock mvcc.Lock, key []byte, startTS uint64, resolved []uint64) error { - if isResolved(startTS, resolved) { + if isResolved(lock.StartTS, resolved) { return nil } lockVisible := lock.StartTS < startTS