Skip to content

Commit

Permalink
store/tikv: move lock wait time to store/tikv (#24217)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Apr 25, 2021
1 parent 24d93db commit c250425
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
3 changes: 2 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -948,7 +949,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
lockWaitTime := e.ctx.GetSessionVars().LockWaitTimeout
if e.Lock.LockType == ast.SelectLockForUpdateNoWait {
lockWaitTime = kv.LockNoWait
lockWaitTime = tikv.LockNoWait
} else if e.Lock.LockType == ast.SelectLockForUpdateWaitN {
lockWaitTime = int64(e.Lock.WaitSec) * 1000
}
Expand Down
8 changes: 0 additions & 8 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,3 @@ type SplittableStore interface {
WaitScatterRegionFinish(ctx context.Context, regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}

// Used for pessimistic lock wait time
// these two constants are special for lock protocol with tikv
// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds
var (
LockAlwaysWait = int64(0)
LockNoWait = int64(-1)
)
3 changes: 2 additions & 1 deletion planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -516,7 +517,7 @@ func getLockWaitTime(ctx sessionctx.Context, lockInfo *ast.SelectLockInfo) (lock
if lockInfo.LockType == ast.SelectLockForUpdateWaitN {
waitTime = int64(lockInfo.WaitSec * 1000)
} else if lockInfo.LockType == ast.SelectLockForUpdateNoWait {
waitTime = kv.LockNoWait
waitTime = tikv.LockNoWait
}
}
}
Expand Down
15 changes: 11 additions & 4 deletions store/tikv/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/logutil"
"github.com/pingcap/tidb/store/tikv/metrics"
Expand All @@ -32,6 +31,14 @@ import (
"go.uber.org/zap"
)

// Used for pessimistic lock wait time
// these two constants are special for lock protocol with tikv
// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds
var (
LockAlwaysWait = int64(0)
LockNoWait = int64(-1)
)

type actionPessimisticLock struct {
*kv.LockCtx
}
Expand Down Expand Up @@ -99,7 +106,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
if action.LockWaitTime > 0 {
timeLeft := action.LockWaitTime - (time.Since(lockWaitStartTime)).Milliseconds()
if timeLeft <= 0 {
req.PessimisticLock().WaitTimeout = tidbkv.LockNoWait
req.PessimisticLock().WaitTimeout = LockNoWait
} else {
req.PessimisticLock().WaitTimeout = timeLeft
}
Expand Down Expand Up @@ -176,9 +183,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
// If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring
// the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary.
if msBeforeTxnExpired > 0 {
if action.LockWaitTime == tidbkv.LockNoWait {
if action.LockWaitTime == LockNoWait {
return kv.ErrLockAcquireFailAndNoWaitSet
} else if action.LockWaitTime == tidbkv.LockAlwaysWait {
} else if action.LockWaitTime == LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, we should return wait timeout if we are still blocked by a lock
Expand Down
10 changes: 5 additions & 5 deletions store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func (s *testCommitterSuite) TestAcquireFalseTimeoutLock(c *C) {
txn2.SetOption(kv.Pessimistic, true)

// test no wait
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), LockWaitTime: tidbkv.LockNoWait, WaitStartTime: time.Now()}
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), LockWaitTime: tikv.LockNoWait, WaitStartTime: time.Now()}
err = txn2.LockKeys(context.Background(), lockCtx, k2)
// cannot acquire lock immediately thus error
c.Assert(err.Error(), Equals, kv.ErrLockAcquireFailAndNoWaitSet.Error())
Expand Down Expand Up @@ -982,7 +982,7 @@ func (s *testCommitterSuite) TestPkNotFound(c *C) {
// case, the returned action of TxnStatus should be LockNotExistDoNothing, and lock on k3 could be resolved.
txn3 := s.begin(c)
txn3.SetOption(kv.Pessimistic, true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tidbkv.LockNoWait}
lockCtx = &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
err = txn3.LockKeys(ctx, lockCtx, k3)
c.Assert(err, IsNil)
status, err = resolver.GetTxnStatusFromLock(bo, lockKey3, oracle.GoTimeToTS(time.Now().Add(200*time.Millisecond)), false)
Expand Down Expand Up @@ -1018,7 +1018,7 @@ func (s *testCommitterSuite) TestPessimisticLockPrimary(c *C) {
// txn3 should locks k2 successfully using no wait
txn3 := s.begin(c)
txn3.SetOption(kv.Pessimistic, true)
lockCtx3 := &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tidbkv.LockNoWait}
lockCtx3 := &kv.LockCtx{ForUpdateTS: txn3.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/txnNotFoundRetTTL", "return"), IsNil)
err = txn3.LockKeys(context.Background(), lockCtx3, k2)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/txnNotFoundRetTTL"), IsNil)
Expand All @@ -1036,7 +1036,7 @@ func (s *testCommitterSuite) TestResolvePessimisticLock(c *C) {
txn.SetOption(kv.KVFilter, drivertxn.TiDBKVFilter{})
err := txn.Set(untouchedIndexKey, untouchedIndexValue)
c.Assert(err, IsNil)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tidbkv.LockNoWait}
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
err = txn.LockKeys(context.Background(), lockCtx, untouchedIndexKey, noValueIndexKey)
c.Assert(err, IsNil)
commit, err := txn.NewCommitter(1)
Expand Down Expand Up @@ -1204,7 +1204,7 @@ func (s *testCommitterSuite) TestResolveMixed(c *C) {
// txn2 tries to lock the pessimisticLockKey, the lock should has been resolved in clean whole region resolve
txn2 := s.begin(c)
txn2.SetOption(kv.Pessimistic, true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tidbkv.LockNoWait}
lockCtx = &kv.LockCtx{ForUpdateTS: txn2.StartTS(), WaitStartTime: time.Now(), LockWaitTime: tikv.LockNoWait}
err = txn2.LockKeys(context.Background(), lockCtx, pessimisticLockKey)
c.Assert(err, IsNil)

Expand Down

0 comments on commit c250425

Please sign in to comment.