From 7b49e431c16606dc7da5806c60590443fc767f74 Mon Sep 17 00:00:00 2001 From: Jack Yu Date: Mon, 9 Dec 2019 13:41:09 +0800 Subject: [PATCH 1/3] session: if txn invalid do not active it and return an error (#13935) --- executor/builder.go | 2 +- executor/executor_test.go | 2 +- executor/point_get.go | 2 +- executor/simple.go | 2 +- session/session.go | 3 +++ session/session_test.go | 8 +++++--- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 902b1d42d7e28..be008f078a0c5 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1269,7 +1269,7 @@ func (b *executorBuilder) getStartTS() (uint64, error) { if err != nil { return 0, errors.Trace(err) } - if startTS == 0 && txn.Valid() { + if startTS == 0 { startTS = txn.StartTS() } b.startTS = startTS diff --git a/executor/executor_test.go b/executor/executor_test.go index 399ca9990d7c4..8631f2673aed9 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2345,7 +2345,7 @@ func (s *testSuite) TestSelectForUpdate(c *C) { tk.MustExec("drop table if exists t, t1") txn, err := tk.Se.Txn(true) - c.Assert(err, IsNil) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) c.Assert(txn.Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") diff --git a/executor/point_get.go b/executor/point_get.go index e9dbed0617865..db506dfb55bf0 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -154,7 +154,7 @@ func (e *PointGetExecutor) encodeIndexKey() (_ []byte, err error) { } func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) { - txn, err := e.ctx.Txn(true) + txn, err := e.ctx.Txn(false) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/simple.go b/executor/simple.go index cfd19af0cce49..e0ddc88911b1c 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -139,7 +139,7 @@ func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error { sessVars := e.ctx.GetSessionVars() logutil.Logger(context.Background()).Debug("execute rollback statement", zap.Uint64("conn", sessVars.ConnectionID)) sessVars.SetStatusFlag(mysql.ServerStatusInTrans, false) - txn, err := e.ctx.Txn(true) + txn, err := e.ctx.Txn(false) if err != nil { return errors.Trace(err) } diff --git a/session/session.go b/session/session.go index 68e611689f4a8..1f8aa53d12276 100644 --- a/session/session.go +++ b/session/session.go @@ -1028,6 +1028,9 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { } func (s *session) Txn(active bool) (kv.Transaction, error) { + if !s.txn.validOrPending() && active { + return &s.txn, kv.ErrInvalidTxn + } if s.txn.pending() && active { // Transaction is lazy initialized. // PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn, diff --git a/session/session_test.go b/session/session_test.go index 88cc6300ae270..186e3d59ca994 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -242,7 +242,7 @@ func (s *testSessionSuite) TestRowLock(c *C) { tk.MustExec("drop table if exists t") txn, err := tk.Se.Txn(true) - c.Assert(err, IsNil) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) c.Assert(txn.Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") @@ -330,7 +330,9 @@ func (s *testSessionSuite) TestTxnLazyInitialize(c *C) { tk.MustExec("create table t (id int)") tk.MustExec("set @@autocommit = 0") - txn, err := tk.Se.Txn(false) + txn, err := tk.Se.Txn(true) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) + txn, err = tk.Se.Txn(false) c.Assert(err, IsNil) c.Assert(txn.Valid(), IsFalse) tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) @@ -623,7 +625,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) { tk.MustExec("drop table if exists t") txn, err := tk.Se.Txn(true) - c.Assert(err, IsNil) + c.Assert(kv.ErrInvalidTxn.Equal(err), IsTrue) c.Assert(txn.Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") From 66c23c9b9da23e9a32072d699e6affec6511a63a Mon Sep 17 00:00:00 2001 From: Jack Yu Date: Fri, 10 Jan 2020 15:24:43 +0800 Subject: [PATCH 2/3] store: fix potential panic in GC worker (#14403) --- store/tikv/gcworker/gc_worker.go | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 4381445b53dc3..a8a5163e23b12 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -53,8 +53,6 @@ type GCWorker struct { lastFinish time.Time cancel context.CancelFunc done chan error - - session session.Session } // NewGCWorker creates a GCWorker instance. @@ -136,8 +134,6 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { logutil.Logger(ctx).Info("[gc worker] start", zap.String("uuid", w.uuid)) - w.session = createSession(w.store) - w.tick(ctx) // Immediately tick once to initialize configs. wg.Done() @@ -864,7 +860,6 @@ func (w *GCWorker) checkLeader() (bool, error) { if err != nil { return false, errors.Trace(err) } - w.session = se leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey) if err != nil { _, err1 := se.Execute(ctx, "ROLLBACK") @@ -895,6 +890,7 @@ func (w *GCWorker) checkLeader() (bool, error) { } lease, err := w.loadTime(gcLeaderLeaseKey) if err != nil { + se.RollbackTxn(ctx) return false, errors.Trace(err) } if lease == nil || lease.Before(time.Now()) { @@ -998,8 +994,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time func (w *GCWorker) loadValueFromSysTable(key string) (string, error) { ctx := context.Background() + se := createSession(w.store) + defer se.Close() stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key) - rs, err := w.session.Execute(ctx, stmt) + rs, err := se.Execute(ctx, stmt) if len(rs) > 0 { defer terror.Call(rs[0].Close) } @@ -1028,10 +1026,9 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error { ON DUPLICATE KEY UPDATE variable_value = '%[2]s', comment = '%[3]s'`, key, value, gcVariableComments[key]) - if w.session == nil { - return errors.New("[saveValueToSysTable session is nil]") - } - _, err := w.session.Execute(context.Background(), stmt) + se := createSession(w.store) + defer se.Close() + _, err := se.Execute(context.Background(), stmt) logutil.Logger(context.Background()).Debug("[gc worker] save kv", zap.String("key", key), zap.String("value", value), @@ -1084,13 +1081,6 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) { lastFinish: time.Now(), done: make(chan error), } - worker.session, err = session.CreateSession(worker.store) - if err != nil { - logutil.Logger(context.Background()).Error("initialize MockGCWorker session fail", zap.Error(err)) - return nil, errors.Trace(err) - } - privilege.BindPrivilegeManager(worker.session, nil) - worker.session.GetSessionVars().InRestrictedSQL = true return &MockGCWorker{worker: worker}, nil } From 4fa0dd680cfa4957b1aa2e8ea160fa11470133f6 Mon Sep 17 00:00:00 2001 From: imtbkcat Date: Fri, 17 Jan 2020 18:08:33 +0800 Subject: [PATCH 3/3] fix ci --- store/tikv/gcworker/gc_worker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index a8a5163e23b12..c6fb59fdc1d72 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -890,7 +890,8 @@ func (w *GCWorker) checkLeader() (bool, error) { } lease, err := w.loadTime(gcLeaderLeaseKey) if err != nil { - se.RollbackTxn(ctx) + _, err1 := se.Execute(ctx, "ROLLBACK") + terror.Log(errors.Trace(err1)) return false, errors.Trace(err) } if lease == nil || lease.Before(time.Now()) {