Skip to content

Commit

Permalink
cherry pick #20857 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Nov 9, 2020
1 parent 362c483 commit 48a2dcf
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
21 changes: 21 additions & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,27 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) {
c.Assert(kv.ErrWriteConflictInTiDB.Equal(err), IsTrue, Commentf("err: %s", err))
}

func (s *testCommitterSuite) TestContextCancelCausingUndetermined(c *C) {
// For a normal transaction, if RPC returns context.Canceled error while sending commit
// requests, the transaction should go to the undetermined state.
txn := s.begin(c)
err := txn.Set([]byte("a"), []byte("va"))
c.Assert(err, IsNil)
committer, err := newTwoPhaseCommitterWithInit(txn, 0)
c.Assert(err, IsNil)
committer.prewriteMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations)
c.Assert(err, IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/rpcContextCancelErr", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/rpcContextCancelErr"), IsNil)
}()

err = committer.commitMutations(NewBackofferWithVars(context.Background(), PrewriteMaxBackoff, nil), committer.mutations)
c.Assert(committer.mu.undeterminedErr, NotNil)
c.Assert(errors.Cause(err), Equals, context.Canceled)
}

func (s *testCommitterSuite) mustGetRegionID(c *C, key []byte) uint64 {
loc, err := s.store.regionCache.LocateKey(NewBackofferWithVars(context.Background(), getMaxBackoff, nil), key)
c.Assert(err, IsNil)
Expand Down
18 changes: 17 additions & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,15 +407,31 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext,
}
})
}

failpoint.Inject("rpcContextCancelErr", func(val failpoint.Value) {
if val.(bool) {
ctx1, cancel := context.WithCancel(context.Background())
cancel()
select {
case <-ctx1.Done():
}

ctx = ctx1
err = ctx.Err()
resp = nil
}
})

if err != nil {
s.rpcError = err

// Because in rpc logic, context.Cancel() will be transferred to rpcContext.Cancel error. For rpcContext cancel,
// we need to retry the request. But for context cancel active, for example, limitExec gets the required rows,
// we shouldn't retry the request, it will go to backoff and hang in retry logic.
if ctx.Err() != nil && errors.Cause(ctx.Err()) == context.Canceled {
return nil, false, errors.Trace(ctx.Err())
}

s.rpcError = err
if e := s.onSendFail(bo, rpcCtx, err); e != nil {
return nil, false, errors.Trace(e)
}
Expand Down

0 comments on commit 48a2dcf

Please sign in to comment.