Skip to content

Commit

Permalink
txn: fix the ttlmanager and cleanup logic for 1pc and async commit (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored and SabaPing committed Mar 25, 2021
1 parent 9f91ce7 commit 216b760
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 14 deletions.
32 changes: 32 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2458,3 +2458,35 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) {
tk.MustQuery("select * from t1").Check(testkit.Rows("5 12 100"))
}
}

func (s *testPessimisticSuite) TestAsyncCommitCalTSFail(c *C) {
atomic.StoreUint64(&tikv.ManagedLockTTL, 5000)
defer func() {
atomic.StoreUint64(&tikv.ManagedLockTTL, 300)
}()
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = time.Second
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
})

tk := s.newAsyncCommitTestKitWithInit(c)
tk2 := s.newAsyncCommitTestKitWithInit(c)

tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert into tk values (1, 1)")

tk.MustExec("set tidb_enable_1pc = true")
tk.MustExec("begin pessimistic")
tk.MustQuery("select * from tk for update").Check(testkit.Rows("1 1"))
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/failCheckSchemaValid", "return"), IsNil)
c.Assert(tk.ExecToErr("commit"), NotNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/failCheckSchemaValid"), IsNil)

// The lock should not be blocked.
tk2.MustExec("set innodb_lock_wait_timeout = 5")
tk2.MustExec("begin pessimistic")
tk2.MustExec("update tk set c2 = c2 + 1")
tk2.MustExec("commit")
}
28 changes: 21 additions & 7 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,15 +899,21 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
})

cleanupKeysCtx := context.WithValue(context.Background(), TxnStartKey, ctx.Value(TxnStartKey))
err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
var err error
if !c.isOnePC() {
err = c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
} else if c.isPessimistic {
err = c.pessimisticRollbackMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
}

if err != nil {
metrics.SecondaryLockCleanupFailureCounterRollback.Inc()
logutil.Logger(ctx).Info("2PC cleanup failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
logutil.Logger(ctx).Info("2PC cleanup failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS),
zap.Bool("isPessimistic", c.isPessimistic), zap.Bool("isOnePC", c.isOnePC()))
} else {
logutil.Logger(ctx).Info("2PC clean up done",
zap.Uint64("txnStartTS", c.startTS))
zap.Uint64("txnStartTS", c.startTS), zap.Bool("isPessimistic", c.isPessimistic),
zap.Bool("isOnePC", c.isOnePC()))
}
c.cleanWg.Done()
}()
Expand All @@ -920,6 +926,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
if c.isOnePC() {
// The error means the 1PC transaction failed.
if err != nil {
if c.getUndeterminedErr() == nil {
c.cleanup(ctx)
}
metrics.OnePCTxnCounterError.Inc()
} else {
metrics.OnePCTxnCounterOk.Inc()
Expand Down Expand Up @@ -1162,7 +1171,6 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
zap.Uint64("sessionID", c.sessionID))
go func() {
defer c.ttlManager.close()
failpoint.Inject("asyncCommitDoNothing", func() {
failpoint.Return()
})
Expand Down Expand Up @@ -1376,6 +1384,12 @@ func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *execd
// this transaction using the related schema changes.
func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64, startInfoSchema SchemaVer,
tryAmend bool) (*RelatedSchemaChange, bool, error) {
failpoint.Inject("failCheckSchemaValid", func() {
logutil.Logger(ctx).Info("[failpoint] injected fail schema check",
zap.Uint64("txnStartTS", c.startTS))
err := errors.Errorf("mock check schema valid failure")
failpoint.Return(nil, false, err)
})
checker, ok := c.txn.us.GetOption(kv.SchemaChecker).(schemaLeaseChecker)
if !ok {
if c.sessionID > 0 {
Expand Down Expand Up @@ -1411,7 +1425,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error {
currentTS := oracle.EncodeTSO(int64(time.Since(c.txn.startTime)/time.Millisecond)) + c.startTS
_, _, err := c.checkSchemaValid(ctx, currentTS, c.txn.txnInfoSchema, true)
if err != nil {
logutil.Logger(ctx).Error("Schema changed for async commit txn",
logutil.Logger(ctx).Info("Schema changed for async commit txn",
zap.Error(err),
zap.Uint64("startTS", c.startTS))
return errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func (l *Lock) String() string {
prettyWriteKey(buf, l.Key)
buf.WriteString(", primary: ")
prettyWriteKey(buf, l.Primary)
return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s", buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType)
return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t",
buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit)
}

// NewLock creates a new *Lock.
Expand Down
7 changes: 1 addition & 6 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
}
txn.committer = committer
}
defer func() {
// For async commit transactions, the ttl manager will be closed in the asynchronous commit goroutine.
if !committer.isAsyncCommit() {
committer.ttlManager.close()
}
}()
defer committer.ttlManager.close()

initRegion := trace.StartRegion(ctx, "InitKeys")
err = committer.initKeysAndMutations()
Expand Down

0 comments on commit 216b760

Please sign in to comment.