Skip to content

Commit

Permalink
executor: change pessimistic lock wait start for one statement (#13990)
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored and sre-bot committed Dec 10, 2019
1 parent d006af9 commit 969a020
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 23 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key)
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout,
sctx.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...)
if err == nil {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"math"
"time"

"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -431,7 +432,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, time.Now(), recordKey)
if err != nil {
return result, err
}
Expand Down
3 changes: 2 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,8 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, lockWaitTime int64,
return err
}
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, lockWaitTime, keys...)
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, lockWaitTime,
se.GetSessionVars().StmtCtx.GetLockWaitStartTime(), keys...)
}

// LimitExec represents limit executor
Expand Down
4 changes: 3 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -137,7 +138,8 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64,
lockWaitTime int64, waitStartTime time.Time, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down
3 changes: 2 additions & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

"github.com/pingcap/tidb/store/tikv/oracle"
)
Expand All @@ -39,7 +40,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ time.Time, _ ...Key) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

. "github.com/pingcap/check"
)
Expand All @@ -37,7 +38,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock"))
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, time.Now(), Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
41 changes: 41 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -562,3 +563,43 @@ func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
// clean
tk.MustExec("drop table if exists tk")
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeoutWaitStart(c *C) {
// prepare work
tk := testkit.NewTestKitWithInit(c, s.store)
defer tk.MustExec("drop table if exists tk")
tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)")
tk.MustExec("set global innodb_lock_wait_timeout = 1")

// raise pessimistic transaction in tk2 and trigger failpoint returning ErrWriteConflict
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))

// tk3 gets the pessimistic lock
tk3.MustExec("begin pessimistic")
tk3.MustQuery("select * from tk where c1 = 1 for update")

tk2.MustExec("begin pessimistic")
done := make(chan error)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict", "return"), IsNil)
start := time.Now()
go func() {
var err error
defer func() {
done <- err
}()
_, err = tk2.Exec("select * from tk where c1 = 1 for update")
}()
time.Sleep(time.Millisecond * 30)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/PessimisticLockErrWriteConflict"), IsNil)
waitErr := <-done
tk3.MustExec("commit")
tk2.MustExec("rollback")
c.Assert(waitErr, NotNil)
c.Check(waitErr.Error(), Equals, tikv.ErrLockWaitTimeout.Error())
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond))
}
12 changes: 11 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ type StatementContext struct {
normalized string
digest string
}
Tables []TableEntry
Tables []TableEntry
lockWaitStartTime *time.Time // LockWaitStartTime stores the pessimistic lock wait start time
}

// GetNowTsCached getter for nowTs, if not set get now time and cache it
Expand Down Expand Up @@ -494,6 +495,15 @@ func (sc *StatementContext) CopTasksDetails() *CopTasksDetails {
return d
}

// GetLockWaitStartTime returns the statement pessimistic lock wait start time
func (sc *StatementContext) GetLockWaitStartTime() time.Time {
if sc.lockWaitStartTime == nil {
curTime := time.Now()
sc.lockWaitStartTime = &curTime
}
return *sc.lockWaitStartTime
}

//CopTasksDetails collects some useful information of cop-tasks during execution.
type CopTasksDetails struct {
NumCopTasks int
Expand Down
18 changes: 12 additions & 6 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type actionPrewrite struct{}
type actionCommit struct{}
type actionCleanup struct{}
type actionPessimisticLock struct {
killed *uint32
lockWaitTime int64
killed *uint32
lockWaitTime int64
waitStartTime time.Time
}
type actionPessimisticRollback struct{}

Expand Down Expand Up @@ -755,7 +756,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
SyncLog: c.syncLog,
},
}
lockWaitStartTime := time.Now()
lockWaitStartTime := action.waitStartTime
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
if action.lockWaitTime > 0 {
Expand All @@ -766,6 +767,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
req.PessimisticLock.WaitTimeout = timeLeft
}
}
failpoint.Inject("PessimisticLockErrWriteConflict", func() error {
time.Sleep(300 * time.Millisecond)
return kv.ErrWriteConflict
})
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand All @@ -779,7 +784,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
if err != nil {
return errors.Trace(err)
}
err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, batch.keys)
err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, lockWaitStartTime, batch.keys)
return errors.Trace(err)
}
lockResp := resp.PessimisticLock
Expand Down Expand Up @@ -1068,8 +1073,9 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCleanup{}, keys)
}

func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime}, keys)
func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64,
waitStartTime time.Time, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime, waitStartTime}, keys)
}

func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error {
Expand Down
12 changes: 6 additions & 6 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
_, _ = txn.us.Get(key)
c.Assert(txn.Set(key, key), IsNil)
txn.DelOption(kv.PresumeKeyNotExists)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key)
c.Assert(err, NotNil)
c.Assert(txn.Delete(key), IsNil)
key2 := kv.Key("key2")
Expand All @@ -524,9 +524,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) {
func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def"))
err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def"))
err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, time.Now(), kv.Key("abc"), kv.Key("def"))
c.Assert(err, IsNil)
c.Assert(txn.lockKeys, HasLen, 2)
}
Expand All @@ -536,11 +536,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) {
txn := s.begin(c)
txn.SetOption(kv.Pessimistic, true)
time.Sleep(time.Millisecond * 100)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key)
err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key)
c.Assert(err, IsNil)
time.Sleep(time.Millisecond * 100)
key2 := kv.Key("key2")
err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2)
err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, time.Now(), key2)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
msBeforeLockExpired := s.store.GetOracle().UntilExpired(txn.StartTS(), lockInfo.LockTtl)
Expand Down Expand Up @@ -578,7 +578,7 @@ func (s *testCommitterSuite) TestElapsedTTL(c *C) {
txn.SetOption(kv.Pessimistic, true)
time.Sleep(time.Millisecond * 100)
forUpdateTS := oracle.ComposeTS(oracle.ExtractPhysical(txn.startTS)+100, 1)
err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, key)
err := txn.LockKeys(context.Background(), nil, forUpdateTS, kv.LockAlwaysWait, time.Now(), key)
c.Assert(err, IsNil)
lockInfo := s.getLockInfo(c, key)
c.Assert(lockInfo.LockTtl-PessimisticLockTTL, GreaterEqual, uint64(100))
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/ticlient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) {
txn := s.beginTxn(c)
err := txn.Set(encodeKey(s.prefix, "key"), []byte("value"))
c.Assert(err, IsNil)
err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, encodeKey(s.prefix, "key"))
err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, time.Now(), encodeKey(s.prefix, "key"))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
Expand Down
5 changes: 3 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,9 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error {
return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys)
}

// LockKeys input param lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock
func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64,
lockWaitTime int64, keysInput ...kv.Key) error {
lockWaitTime int64, waitStartTime time.Time, keysInput ...kv.Key) error {
// Exclude keys that are already locked.
keys := make([][]byte, 0, len(keysInput))
txn.mu.Lock()
Expand Down Expand Up @@ -393,7 +394,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui
// If the number of keys greater than 1, it can be on different region,
// concurrently execute on multiple regions may lead to deadlock.
txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1
err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, keys)
err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, waitStartTime, keys)
if killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
Expand Down

0 comments on commit 969a020

Please sign in to comment.