Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: fix the resolved txn status cache for pessimistic txn #21689

Merged
merged 4 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -548,7 +550,7 @@ func (s *testPessimisticSuite) TestAsyncRollBackNoWait(c *C) {
// even though async rollback for pessimistic lock may rollback later locked key if get ts failed from pd
// the txn correctness should be ensured
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/ExecStmtGetTsError", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return(100)"), IsNil)
tk.MustExec("begin pessimistic")
tk.MustExec("select * from tk where c1 > 0 for update nowait")
tk2.MustExec("begin pessimistic")
Expand Down Expand Up @@ -1833,3 +1835,71 @@ func (s *testPessimisticSuite) TestAmendForUniqueIndex(c *C) {
tk.MustExec("commit")
tk2.MustExec("admin check table t")
}

func (s *testPessimisticSuite) TestResolveStalePessimisticPrimaryLock(c *C) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's complex. I want to confirm Does it fail with the always cached version?

Copy link
Contributor Author

@cfzjywxk cfzjywxk Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested without cache change, it will fail with error reported from admin check statement. Could be treated as a reproducing case in unit-test, we need also add cases in ticase I think.

c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries", "return(\"skip\")"), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep", "return(20000)"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforeCommitSecondaries"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/AsyncRollBackSleep"), IsNil)
}()
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test")
tk.MustExec("create database test")
tk.MustExec("use test")
tk2.MustExec("use test")
tk3.MustExec("use test")

tk3.MustExec("drop table if exists t1")
tk3.MustExec("create table t1(c1 int key, c2 int, c3 int, unique key uk(c2), key k1(c3), key k2(c2, c3));")
tk3.MustExec("insert into t1 values(1, 1, 1);")
tk3.MustExec("insert into t1 values(2, 2, 2);")
tk3.MustExec("insert into t1 values(3, 3, 3);")
tk3.MustExec("insert into t1 values(101, 101, 101);")
tk3.MustExec("insert into t1 values(201, 201, 201);")
tk3.MustExec("insert into t1 values(301, 301, 301);")
tk3.MustExec("insert into t1 values(401, 401, 401);")
tk3.MustExec("insert into t1 values(402, 402, 402);")
tk3.MustExec("insert into t1 values(501, 501, 501);")
tbl, err := domain.GetDomain(tk3.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
tblID := tbl.Meta().ID
ukIdxID := tbl.Indices()[0].Meta().ID
k1IdxID := tbl.Indices()[1].Meta().ID
k2IdxID := tbl.Indices()[2].Meta().ID
s.cluster.SplitTable(s.mvccStore, tblID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, ukIdxID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, k1IdxID, 8)
s.cluster.SplitIndex(s.mvccStore, tblID, k2IdxID, 8)

tk.MustExec("set innodb_lock_wait_timeout = 1")
tk.MustExec("begin pessimistic")
tk3.MustExec("begin pessimistic")
tk3.MustQuery("select * from t1 where c1 = 501 for update nowait").Check(testkit.Rows("501 501 501"))
err = tk.ExecToErr("update t1 set c1 = c1 + 10, c2 = c2 + 10;")
c.Assert(err, NotNil)
tk3.MustExec("rollback")

tk2.MustExec("begin pessimistic")
tk2.MustExec("delete from t1 where c1 = 1")
tk2.MustExec("commit")

// tk will get abort error.
err = tk.ExecToErr("update t1 set c1 = c1 + 10, c2 = c2 + 10 where c1 in(1)")
c.Assert(err, NotNil)

tk.MustExec("update t1 set c1 = c1 + 10, c2 = c2 + 10 where c1 > 1;")
tk.MustExec("commit")

tk2.MustExec("begin pessimistic")
tk2.MustExec("update t1 set c3 = c3 + 7 where c1 in (3, 101, 201, 301, 401, 402, 501)")
tk2.MustExec("commit")

tk.MustExec("rollback")
tk2.MustExec("rollback")
tk3.MustExec("rollback")

c.Assert(tk2.ExecToErr("admin check table t1"), IsNil)
}
10 changes: 10 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,10 @@ func (mvcc *MVCCLevelDB) CheckTxnStatus(primaryKey []byte, lockTS, callerStartTS

// If the lock has already outdated, clean up it.
if uint64(oracle.ExtractPhysical(lock.startTS))+lock.ttl < uint64(oracle.ExtractPhysical(currentTS)) {
logutil.BgLogger().Info("rollback expired lock and write rollback record",
zap.Stringer("primary key", kv.Key(primaryKey)),
zap.Uint64("lock startTS", dec.lock.startTS),
zap.Stringer("lock op", dec.lock.op))
if err = rollbackLock(batch, primaryKey, lockTS); err != nil {
err = errors.Trace(err)
return
Expand Down Expand Up @@ -1333,6 +1337,12 @@ func (mvcc *MVCCLevelDB) ResolveLock(startKey, endKey []byte, startTS, commitTS
mvcc.mu.Lock()
defer mvcc.mu.Unlock()

if len(startKey) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temproraly solving some mvcc leveldb problem without which resolve lock will not work for splitted regions.

startKey = []byte{}
}
if len(endKey) > 0 {
endKey = []byte{}
}
iter, currKey, err := newScanIterator(mvcc.db, startKey, endKey)
defer iter.Release()
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,16 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
// by test suites.
secondaryBo := NewBackofferWithVars(context.Background(), CommitMaxBackoff, c.txn.vars)
go func() {
failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) {
if s, ok := v.(string); !ok {
logutil.Logger(bo.ctx).Info("[failpoint] sleep 2s before commit secondary keys",
zap.Uint64("connID", c.connID), zap.Uint64("startTS", c.startTS))
time.Sleep(2 * time.Second)
} else if s == "skip" {
failpoint.Return()
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to make this failpoint more general (eg. like the following) so that we can either disable it by return("skip") or delay it by sleep(1000)?

failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) {
	if s, ok := v.(string); !ok {
	} else if s == "skip" {
		failpoint.Return()
	}
})


e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
logutil.BgLogger().Debug("2PC async doActionOnBatches",
Expand Down Expand Up @@ -1218,8 +1228,10 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
}
logutil.Logger(bo.ctx).Error("2PC failed commit key after primary key committed",
zap.Error(err),
zap.Stringer("primaryKey", kv.Key(c.primaryKey)),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("commitTS", c.commitTS),
zap.Uint64("forUpdateTS", c.forUpdateTS),
zap.Strings("keys", hexBatchKeys(batch.mutations.keys)))
return errors.Trace(err)
}
Expand Down
13 changes: 12 additions & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ func (lr *LockResolver) BatchResolveLocks(bo *Backoffer, locks []*Lock, loc Regi
if err != nil {
return false, err
}
if l.LockType != kvrpcpb.Op_PessimisticLock && status.ttl == 0 {
lr.saveResolved(l.TxnID, status)
Copy link
Member

@jackysp jackysp Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you combine TxnID and Primary's LockForUpdateTS together as the key of LockResolver.resolved as a cache. I'm worried about the performance impact of removing the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The impact now is resolving pessimistic locks whose primary lock is prewrite lock type, will need more check_txn_status calls, we could make the returnd status certain or not in the future to lower the risk.

}

if status.ttl > 0 {
logutil.BgLogger().Error("BatchResolveLocks fail to clean locks, this result is not expected!")
Expand Down Expand Up @@ -457,6 +460,15 @@ func (lr *LockResolver) getTxnStatusFromLock(bo *Backoffer, l *Lock, callerStart
for {
status, err = lr.getTxnStatus(bo, l.TxnID, l.Primary, callerStartTS, currentTS, rollbackIfNotExist)
if err == nil {
// If l.LockType is pessimistic lock type:
// - if its primary lock is pessimistic too, the check txn status result should not be cached.
// - if its primary lock is prewrite lock type, the check txn status could be cached.
// If l.lockType is prewrite lock type:
// - always cache the check txn status result.
// For prewrite locks, their primary keys should ALWAYS be the correct one.
if l.LockType != kvrpcpb.Op_PessimisticLock && status.ttl == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems in the current code, we don't cache when the primary lock is prewrite lock type. Can we move this logic to where saveResolved originally was (L579)? We have more information there. And we can even cache it when the primary lock is committed with a positive ts.

Copy link
Contributor

@youjiali1995 youjiali1995 Dec 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to cache all results, we need to change kvproto.

And we can even cache it when the primary lock is committed with a positive ts.

It seldom happens...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could not get the lock type for the lock we met in the original place, which is needed for the cache check. We could add back the saving logic for commited transaction in the original place?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay i think it's optional as youjiali1995 says this case seldom happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@youjiali1995 @sticnarf
In the previous diff the resolved cache would be checked everytime which is not necessary, I put back the cache fill related logic to its original place, PTAL again, thx

lr.saveResolved(l.TxnID, status)
}
return status, nil
}
// If the error is something other than txnNotFoundErr, throw the error (network
Expand Down Expand Up @@ -576,7 +588,6 @@ func (lr *LockResolver) getTxnStatus(bo *Backoffer, txnID uint64, primary []byte
}

status.commitTS = cmdResp.CommitVersion
lr.saveResolved(txnID, status)
lysu marked this conversation as resolved.
Show resolved Hide resolved
}
return status, nil
}
Expand Down
12 changes: 10 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,16 @@ func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
failpoint.Inject("AsyncRollBackSleep", func() {
time.Sleep(100 * time.Millisecond)
failpoint.Inject("AsyncRollBackSleep", func(sleepTimeMS failpoint.Value) {
if tmp, ok := sleepTimeMS.(int); ok {
if tmp < 10000 {
logutil.Logger(ctx).Info("[failpoint] sleep before trigger asyncPessimisticRollback", zap.Int("sleep ms", tmp))
time.Sleep(time.Duration(tmp) * time.Millisecond)
} else {
logutil.Logger(ctx).Info("[failpoint] async rollback return")
failpoint.Return()
}
}
})
err := committer.pessimisticRollbackMutations(NewBackofferWithVars(ctx, pessimisticRollbackMaxBackoff, txn.vars), CommitterMutations{keys: keys})
if err != nil {
Expand Down