diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 8cafb18621eb2..a49b01c348a3a 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2458,3 +2458,35 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { tk.MustQuery("select * from t1").Check(testkit.Rows("5 12 100")) } } + +func (s *testPessimisticSuite) TestAsyncCommitCalTSFail(c *C) { + atomic.StoreUint64(&tikv.ManagedLockTTL, 5000) + defer func() { + atomic.StoreUint64(&tikv.ManagedLockTTL, 300) + }() + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.TiKVClient.AsyncCommit.SafeWindow = time.Second + conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0 + }) + + tk := s.newAsyncCommitTestKitWithInit(c) + tk2 := s.newAsyncCommitTestKitWithInit(c) + + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values (1, 1)") + + tk.MustExec("set tidb_enable_1pc = true") + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from tk for update").Check(testkit.Rows("1 1")) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/failCheckSchemaValid", "return"), IsNil) + c.Assert(tk.ExecToErr("commit"), NotNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/failCheckSchemaValid"), IsNil) + + // The lock should not be blocked. + tk2.MustExec("set innodb_lock_wait_timeout = 5") + tk2.MustExec("begin pessimistic") + tk2.MustExec("update tk set c2 = c2 + 1") + tk2.MustExec("commit") +} diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c2e75d436bf76..1ee8398576a2b 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -899,15 +899,21 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { }) cleanupKeysCtx := context.WithValue(context.Background(), TxnStartKey, ctx.Value(TxnStartKey)) - err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) + var err error + if !c.isOnePC() { + err = c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) + } else if c.isPessimistic { + err = c.pessimisticRollbackMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) + } + if err != nil { metrics.SecondaryLockCleanupFailureCounterRollback.Inc() - logutil.Logger(ctx).Info("2PC cleanup failed", - zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) + logutil.Logger(ctx).Info("2PC cleanup failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS), + zap.Bool("isPessimistic", c.isPessimistic), zap.Bool("isOnePC", c.isOnePC())) } else { logutil.Logger(ctx).Info("2PC clean up done", - zap.Uint64("txnStartTS", c.startTS)) + zap.Uint64("txnStartTS", c.startTS), zap.Bool("isPessimistic", c.isPessimistic), + zap.Bool("isOnePC", c.isOnePC())) } c.cleanWg.Done() }() @@ -920,6 +926,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { if c.isOnePC() { // The error means the 1PC transaction failed. if err != nil { + if c.getUndeterminedErr() == nil { + c.cleanup(ctx) + } metrics.OnePCTxnCounterError.Inc() } else { metrics.OnePCTxnCounterOk.Inc() @@ -1162,7 +1171,6 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Uint64("sessionID", c.sessionID)) go func() { - defer c.ttlManager.close() failpoint.Inject("asyncCommitDoNothing", func() { failpoint.Return() }) @@ -1376,6 +1384,12 @@ func (c *twoPhaseCommitter) getCommitTS(ctx context.Context, commitDetail *execd // this transaction using the related schema changes. func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64, startInfoSchema SchemaVer, tryAmend bool) (*RelatedSchemaChange, bool, error) { + failpoint.Inject("failCheckSchemaValid", func() { + logutil.Logger(ctx).Info("[failpoint] injected fail schema check", + zap.Uint64("txnStartTS", c.startTS)) + err := errors.Errorf("mock check schema valid failure") + failpoint.Return(nil, false, err) + }) checker, ok := c.txn.us.GetOption(kv.SchemaChecker).(schemaLeaseChecker) if !ok { if c.sessionID > 0 { @@ -1411,7 +1425,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error { currentTS := oracle.EncodeTSO(int64(time.Since(c.txn.startTime)/time.Millisecond)) + c.startTS _, _, err := c.checkSchemaValid(ctx, currentTS, c.txn.txnInfoSchema, true) if err != nil { - logutil.Logger(ctx).Error("Schema changed for async commit txn", + logutil.Logger(ctx).Info("Schema changed for async commit txn", zap.Error(err), zap.Uint64("startTS", c.startTS)) return errors.Trace(err) diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 5279b74beeba7..af40548ebf970 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -171,7 +171,8 @@ func (l *Lock) String() string { prettyWriteKey(buf, l.Key) buf.WriteString(", primary: ") prettyWriteKey(buf, l.Primary) - return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s", buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType) + return fmt.Sprintf("%s, txnStartTS: %d, lockForUpdateTS:%d, minCommitTs:%d, ttl: %d, type: %s, UseAsyncCommit: %t", + buf.String(), l.TxnID, l.LockForUpdateTS, l.MinCommitTS, l.TTL, l.LockType, l.UseAsyncCommit) } // NewLock creates a new *Lock. diff --git a/store/tikv/txn.go b/store/tikv/txn.go index d0d4b0703880b..90393147c1059 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -248,12 +248,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error { } txn.committer = committer } - defer func() { - // For async commit transactions, the ttl manager will be closed in the asynchronous commit goroutine. - if !committer.isAsyncCommit() { - committer.ttlManager.close() - } - }() + defer committer.ttlManager.close() initRegion := trace.StartRegion(ctx, "InitKeys") err = committer.initKeysAndMutations()