From 65ce3d21af8c8ef100152d21fdeb2498c66c1921 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Thu, 5 Dec 2019 19:42:02 +0800 Subject: [PATCH 1/8] session: if txn invalid do not active it and return an error Signed-off-by: Shuaipeng Yu --- session/session.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/session/session.go b/session/session.go index 79307a9bb74bb..2d8c2bd5c2ea7 100644 --- a/session/session.go +++ b/session/session.go @@ -1314,6 +1314,9 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { } func (s *session) Txn(active bool) (kv.Transaction, error) { + if !s.txn.validOrPending() { + return nil, kv.ErrInvalidTxn + } if s.txn.pending() && active { // Transaction is lazy initialized. // PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn, From ce55f69bc2388c1d683ab7877ee8960e5a4f3b6f Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Thu, 5 Dec 2019 19:45:10 +0800 Subject: [PATCH 2/8] add a test Signed-off-by: Shuaipeng Yu --- session/session_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/session/session_test.go b/session/session_test.go index bf8502149b9fe..80de85ff78712 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -389,7 +389,9 @@ func (s *testSessionSuite) TestTxnLazyInitialize(c *C) { tk.MustExec("create table t (id int)") tk.MustExec("set @@autocommit = 0") - txn, err := tk.Se.Txn(false) + txn, err := tk.Se.Txn(true) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) + txn, err = tk.Se.Txn(false) c.Assert(err, IsNil) c.Assert(txn.Valid(), IsFalse) tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) From 8a805096915b95364e33d0d80b74b1344fb3a1d4 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Thu, 5 Dec 2019 22:00:20 +0800 Subject: [PATCH 3/8] fix bootstrap Signed-off-by: Shuaipeng Yu --- executor/batch_point_get.go | 2 +- executor/point_get.go | 2 +- session/session.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index b2806a2c5dfa0..ed3af9a9d9db2 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -82,7 +82,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { return err } - txn, err := e.ctx.Txn(true) + txn, err := e.ctx.Txn(false) if err != nil { return err } diff --git a/executor/point_get.go b/executor/point_get.go index f055d2937c474..5f27eeda2af61 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -162,7 +162,7 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro } func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) (val []byte, err error) { - txn, err := e.ctx.Txn(true) + txn, err := e.ctx.Txn(false) if err != nil { return nil, err } diff --git a/session/session.go b/session/session.go index 2d8c2bd5c2ea7..8d1f3a5c57219 100644 --- a/session/session.go +++ b/session/session.go @@ -1314,7 +1314,7 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { } func (s *session) Txn(active bool) (kv.Transaction, error) { - if !s.txn.validOrPending() { + if !s.txn.validOrPending() && active { return nil, kv.ErrInvalidTxn } if s.txn.pending() && active { From 2c8d5ac0e25bf43eb2d86b2d24120453d9491503 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Thu, 5 Dec 2019 22:14:28 +0800 Subject: [PATCH 4/8] fix test Signed-off-by: Shuaipeng Yu --- executor/builder.go | 2 +- executor/executor_test.go | 2 +- executor/simple.go | 17 +++++++---------- session/session_test.go | 4 ++-- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 48d5c21e40a83..160a6af2c9c08 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1230,7 +1230,7 @@ func (b *executorBuilder) getStartTS() (uint64, error) { if err != nil { return 0, err } - if startTS == 0 && txn.Valid() { + if startTS == 0 { startTS = txn.StartTS() } b.startTS = startTS diff --git a/executor/executor_test.go b/executor/executor_test.go index 8fb7d74f28a1d..327509c6c9bf7 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2652,7 +2652,7 @@ func (s *testSuite) TestSelectForUpdate(c *C) { tk.MustExec("drop table if exists t, t1") txn, err := tk.Se.Txn(true) - c.Assert(err, IsNil) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) c.Assert(txn.Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") diff --git a/executor/simple.go b/executor/simple.go index aea375ab54e85..c92308a10d2c8 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -625,17 +625,14 @@ func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error { if err != nil { return err } - if txn.Valid() { - duration := time.Since(sessVars.TxnCtx.CreateTime).Seconds() - if sessVars.InRestrictedSQL { - transactionDurationInternalRollback.Observe(duration) - } else { - transactionDurationGeneralRollback.Observe(duration) - } - sessVars.TxnCtx.ClearDelta() - return txn.Rollback() + duration := time.Since(sessVars.TxnCtx.CreateTime).Seconds() + if sessVars.InRestrictedSQL { + transactionDurationInternalRollback.Observe(duration) + } else { + transactionDurationGeneralRollback.Observe(duration) } - return nil + sessVars.TxnCtx.ClearDelta() + return txn.Rollback() } func (e *SimpleExec) executeCreateUser(ctx context.Context, s *ast.CreateUserStmt) error { diff --git a/session/session_test.go b/session/session_test.go index 80de85ff78712..44487241af18d 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -300,7 +300,7 @@ func (s *testSessionSuite) TestRowLock(c *C) { tk.MustExec("drop table if exists t") txn, err := tk.Se.Txn(true) - c.Assert(err, IsNil) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) c.Assert(txn.Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") @@ -721,7 +721,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) { tk.MustExec("drop table if exists t") txn, err := tk.Se.Txn(true) - c.Assert(err, IsNil) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) c.Assert(txn.Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") From 19248006f3e0924c5d6e199d44bd0052fed907c4 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Thu, 5 Dec 2019 22:28:13 +0800 Subject: [PATCH 5/8] fix test Signed-off-by: Shuaipeng Yu --- session/session.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/session/session.go b/session/session.go index 8d1f3a5c57219..4ac6a322c330d 100644 --- a/session/session.go +++ b/session/session.go @@ -1315,7 +1315,7 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { func (s *session) Txn(active bool) (kv.Transaction, error) { if !s.txn.validOrPending() && active { - return nil, kv.ErrInvalidTxn + return &s.txn, kv.ErrInvalidTxn } if s.txn.pending() && active { // Transaction is lazy initialized. From 331f5a10e39b5362ac3a492743b595119760eff5 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Fri, 6 Dec 2019 19:40:38 +0800 Subject: [PATCH 6/8] address comments Signed-off-by: Shuaipeng Yu --- executor/simple.go | 19 +++++++++++-------- store/tikv/2pc.go | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/executor/simple.go b/executor/simple.go index c92308a10d2c8..02ddacff54f22 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -621,18 +621,21 @@ func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error { sessVars := e.ctx.GetSessionVars() logutil.BgLogger().Debug("execute rollback statement", zap.Uint64("conn", sessVars.ConnectionID)) sessVars.SetStatusFlag(mysql.ServerStatusInTrans, false) - txn, err := e.ctx.Txn(true) + txn, err := e.ctx.Txn(false) if err != nil { return err } - duration := time.Since(sessVars.TxnCtx.CreateTime).Seconds() - if sessVars.InRestrictedSQL { - transactionDurationInternalRollback.Observe(duration) - } else { - transactionDurationGeneralRollback.Observe(duration) + if txn.Valid() { + duration := time.Since(sessVars.TxnCtx.CreateTime).Seconds() + if sessVars.InRestrictedSQL { + transactionDurationInternalRollback.Observe(duration) + } else { + transactionDurationGeneralRollback.Observe(duration) + } + sessVars.TxnCtx.ClearDelta() + return txn.Rollback() } - sessVars.TxnCtx.ClearDelta() - return txn.Rollback() + return nil } func (e *SimpleExec) executeCreateUser(ctx context.Context, s *ast.CreateUserStmt) error { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 2e74fe816f8c7..9c2802d585be3 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -638,7 +638,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { } uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS)) - const c10min = 10 * 60 * 1000 + const c10min = 60 * 1000 if uptime > c10min { // Set a 10min maximum lifetime for the ttlManager, so when something goes wrong // the key will not be locked forever. From 77a5da85c5f93c2b7c240577249e6d839dc27bcb Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Fri, 6 Dec 2019 19:45:41 +0800 Subject: [PATCH 7/8] fix wrong modify Signed-off-by: Shuaipeng Yu --- store/tikv/2pc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 9c2802d585be3..2e74fe816f8c7 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -638,7 +638,7 @@ func (tm *ttlManager) keepAlive(c *twoPhaseCommitter) { } uptime := uint64(oracle.ExtractPhysical(now) - oracle.ExtractPhysical(c.startTS)) - const c10min = 60 * 1000 + const c10min = 10 * 60 * 1000 if uptime > c10min { // Set a 10min maximum lifetime for the ttlManager, so when something goes wrong // the key will not be locked forever. From b1355ae410fd020522827a660077ac6ce5302117 Mon Sep 17 00:00:00 2001 From: Shuaipeng Yu Date: Fri, 6 Dec 2019 20:05:29 +0800 Subject: [PATCH 8/8] address comments Signed-off-by: Shuaipeng Yu --- executor/adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/adapter.go b/executor/adapter.go index 4e6b47d3d84d6..409fdca2c2fd1 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -594,7 +594,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E if err != nil { tsErr := UpdateForUpdateTS(a.Ctx, 0) if tsErr != nil { - return nil, tsErr + logutil.Logger(ctx).Warn("UpdateForUpdateTS failed", zap.Error(tsErr)) } } return nil, err