From 78b2d4c8898d7c80e8b2fb6e731f09362bef806c Mon Sep 17 00:00:00 2001 From: zyguan Date: Mon, 14 Sep 2020 13:04:05 +0800 Subject: [PATCH 1/2] cherry pick #19949 to release-4.0 Signed-off-by: ti-srebot --- executor/executor_test.go | 54 ++++++++++++++++++++++++++++++++ kv/kv.go | 7 +++++ kv/txn.go | 7 +++++ session/session.go | 1 + sessionctx/variable/session.go | 8 +++++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/tidb_vars.go | 3 ++ sessionctx/variable/varsutil.go | 6 ++++ store/tikv/txn.go | 20 ++++++++++++ 9 files changed, 107 insertions(+) diff --git a/executor/executor_test.go b/executor/executor_test.go index fbf96891cfd03..0ebc8f8e587a3 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2719,6 +2719,60 @@ func (s *testSuite) TestTiDBCurrentTS(c *C) { c.Assert(terror.ErrorEqual(err, variable.ErrReadOnly), IsTrue, Commentf("err %v", err)) } +func (s *testSuite) TestTiDBLastTxnInfo(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int primary key)") + tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Check(testkit.Rows("0 0")) + + tk.MustExec("insert into t values (1)") + rows1 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows() + c.Assert(rows1[0][0].(string), Greater, "0") + c.Assert(rows1[0][0].(string), Less, rows1[0][1].(string)) + + tk.MustExec("begin") + tk.MustQuery("select a from t where a = 1").Check(testkit.Rows("1")) + rows2 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), @@tidb_current_ts").Rows() + tk.MustExec("commit") + rows3 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows() + c.Assert(rows2[0][0], Equals, rows1[0][0]) + c.Assert(rows2[0][1], Equals, rows1[0][1]) + c.Assert(rows3[0][0], Equals, rows1[0][0]) + c.Assert(rows3[0][1], Equals, rows1[0][1]) + c.Assert(rows2[0][1], Less, rows2[0][2]) + + tk.MustExec("begin") + tk.MustExec("update t set a = a + 1 where a = 1") + rows4 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), @@tidb_current_ts").Rows() + tk.MustExec("commit") + rows5 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows() + c.Assert(rows4[0][0], Equals, rows1[0][0]) + c.Assert(rows4[0][1], Equals, rows1[0][1]) + c.Assert(rows4[0][2], Equals, rows5[0][0]) + c.Assert(rows4[0][1], Less, rows4[0][2]) + c.Assert(rows4[0][2], Less, rows5[0][1]) + + tk.MustExec("begin") + tk.MustExec("update t set a = a + 1 where a = 2") + tk.MustExec("rollback") + rows6 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows() + c.Assert(rows6[0][0], Equals, rows5[0][0]) + c.Assert(rows6[0][1], Equals, rows5[0][1]) + + tk.MustExec("begin optimistic") + tk.MustExec("insert into t values (2)") + _, err := tk.Exec("commit") + c.Assert(err, NotNil) + rows7 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), json_extract(@@tidb_last_txn_info, '$.error')").Rows() + c.Assert(rows7[0][0], Greater, rows5[0][0]) + c.Assert(rows7[0][1], Equals, "0") + c.Assert(strings.Contains(err.Error(), rows7[0][1].(string)), IsTrue) + + _, err = tk.Exec("set @@tidb_last_txn_info = '{}'") + c.Assert(terror.ErrorEqual(err, variable.ErrReadOnly), IsTrue, Commentf("err %v", err)) +} + func (s *testSuite) TestSelectForUpdate(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/kv/kv.go b/kv/kv.go index 74ea94184bf90..43650746cee53 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -64,6 +64,13 @@ const ( InfoSchema // SchemaAmender is used to amend mutations for pessimistic transactions SchemaAmender +<<<<<<< HEAD +======= + // SampleStep skips 'SampleStep - 1' number of keys after each returned key. + SampleStep + // CommitHook is a callback function called right after the transaction gets committed + CommitHook +>>>>>>> 64c2cc5cb... sessionctx,store,kv: add a session variable to export last txn info (#19949) ) // Priority value for transaction priority. diff --git a/kv/txn.go b/kv/txn.go index f62fa4d5a54bd..4592282f96f90 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -111,3 +111,10 @@ func MockCommitErrorDisable() { func IsMockCommitErrorEnable() bool { return atomic.LoadInt64(&mockCommitErrorEnable) == 1 } + +// TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing) +type TxnInfo struct { + StartTS uint64 `json:"start_ts"` + CommitTS uint64 `json:"commit_ts"` + ErrMsg string `json:"error,omitempty"` +} diff --git a/session/session.go b/session/session.go index 69d62660898f6..eb4ae28772a88 100644 --- a/session/session.go +++ b/session/session.go @@ -425,6 +425,7 @@ func (s *session) doCommit(ctx context.Context) error { // Set this option for 2 phase commit to validate schema lease. s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs)) s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema) + s.txn.SetOption(kv.CommitHook, func(info kv.TxnInfo, _ error) { s.sessionVars.LastTxnInfo = info }) if s.GetSessionVars().EnableAmendPessimisticTxn { s.txn.SetOption(kv.SchemaAmender, NewSchemaAmenderForTikvTxn(s)) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e7b923d9253bc..283dac6ccec3b 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -639,6 +639,9 @@ type SessionVars struct { // EnableAmendPessimisticTxn indicates if schema change amend is enabled for pessimistic transactions. EnableAmendPessimisticTxn bool + + // LastTxnInfo keeps track the info of last committed transaction + LastTxnInfo kv.TxnInfo } // PreparedParams contains the parameters of the current prepared statement when executing it. @@ -1180,8 +1183,13 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBBatchCommit: s.BatchCommit = TiDBOptOn(val) case TiDBDMLBatchSize: +<<<<<<< HEAD s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize) case TiDBCurrentTS, TiDBConfig: +======= + s.DMLBatchSize = int(tidbOptInt64(val, DefOptCorrelationExpFactor)) + case TiDBCurrentTS, TiDBLastTxnInfo, TiDBConfig: +>>>>>>> 64c2cc5cb... sessionctx,store,kv: add a session variable to export last txn info (#19949) return ErrReadOnly case TiDBMaxChunkSize: s.MaxChunkSize = tidbOptPositiveInt32(val, DefMaxChunkSize) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 91ee906a2c110..b3a85c90f6a14 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -644,6 +644,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBBatchCommit, BoolToIntStr(DefBatchCommit)}, {ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)}, {ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)}, + {ScopeSession, TiDBLastTxnInfo, strconv.Itoa(DefCurretTS)}, {ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)}, {ScopeGlobal | ScopeSession, TiDBAllowBatchCop, strconv.Itoa(DefTiDBAllowBatchCop)}, {ScopeGlobal | ScopeSession, TiDBInitChunkSize, strconv.Itoa(DefInitChunkSize)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index f3254fa774abb..86d356d880932 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -68,6 +68,9 @@ const ( // It is read-only. TiDBCurrentTS = "tidb_current_ts" + // TiDBLastTxnInfo is used to get the last transaction info within the current session. + TiDBLastTxnInfo = "tidb_last_txn_info" + // tidb_config is a read-only variable that shows the config of the current server. TiDBConfig = "tidb_config" diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index d7cf6b8e49c40..fb6d8d59858d6 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -116,6 +116,12 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) { switch sysVar.Name { case TiDBCurrentTS: return fmt.Sprintf("%d", s.TxnCtx.StartTS), true, nil + case TiDBLastTxnInfo: + info, err := json.Marshal(s.LastTxnInfo) + if err != nil { + return "", true, err + } + return string(info), true, nil case TiDBGeneralLog: return fmt.Sprintf("%d", atomic.LoadUint32(&ProcessGeneralLog)), true, nil case TiDBPProfSQLCPU: diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 4ff782b35e3f5..9236a5ececc37 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -84,6 +84,8 @@ type tikvTxn struct { txnInfoSchema SchemaVer // SchemaAmender is used amend pessimistic txn commit mutations for schema change schemaAmender SchemaAmender + // commitCallback is called after current transaction gets committed + commitCallback func(info kv.TxnInfo, err error) } func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { @@ -233,6 +235,8 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) { txn.txnInfoSchema = val.(SchemaVer) case kv.SchemaAmender: txn.schemaAmender = val.(SchemaAmender) + case kv.CommitHook: + txn.commitCallback = val.(func(info kv.TxnInfo, err error)) } } @@ -306,6 +310,9 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { // pessimistic transaction should also bypass latch. if txn.store.txnLatches == nil || txn.IsPessimistic() { err = committer.execute(ctx) + if val == nil || connID > 0 { + txn.onCommitted(err) + } logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err)) return errors.Trace(err) } @@ -324,6 +331,9 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { return kv.ErrWriteConflictInTiDB.FastGenByArgs(txn.startTS) } err = committer.execute(ctx) + if val == nil || connID > 0 { + txn.onCommitted(err) + } if err == nil { lock.SetCommitTS(committer.commitTS) } @@ -361,6 +371,16 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { return txn.committer.pessimisticRollbackMutations(NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars), CommitterMutations{keys: txn.lockKeys}) } +func (txn *tikvTxn) onCommitted(err error) { + if txn.commitCallback != nil { + info := kv.TxnInfo{StartTS: txn.startTS, CommitTS: txn.commitTS} + if err != nil { + info.ErrMsg = err.Error() + } + txn.commitCallback(info, err) + } +} + // lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...kv.Key) error { txn.mu.Lock() From 952ca9f1ba2a7d9a9ecf2b704bdb60f3ceb3f63d Mon Sep 17 00:00:00 2001 From: zyguan Date: Wed, 28 Oct 2020 17:44:04 +0800 Subject: [PATCH 2/2] fix conflicts Signed-off-by: zyguan --- kv/kv.go | 5 ----- sessionctx/variable/session.go | 5 ----- 2 files changed, 10 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index 43650746cee53..c1e9a04780d18 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -64,13 +64,8 @@ const ( InfoSchema // SchemaAmender is used to amend mutations for pessimistic transactions SchemaAmender -<<<<<<< HEAD -======= - // SampleStep skips 'SampleStep - 1' number of keys after each returned key. - SampleStep // CommitHook is a callback function called right after the transaction gets committed CommitHook ->>>>>>> 64c2cc5cb... sessionctx,store,kv: add a session variable to export last txn info (#19949) ) // Priority value for transaction priority. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 283dac6ccec3b..fc509bcd58893 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1183,13 +1183,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case TiDBBatchCommit: s.BatchCommit = TiDBOptOn(val) case TiDBDMLBatchSize: -<<<<<<< HEAD s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize) - case TiDBCurrentTS, TiDBConfig: -======= - s.DMLBatchSize = int(tidbOptInt64(val, DefOptCorrelationExpFactor)) case TiDBCurrentTS, TiDBLastTxnInfo, TiDBConfig: ->>>>>>> 64c2cc5cb... sessionctx,store,kv: add a session variable to export last txn info (#19949) return ErrReadOnly case TiDBMaxChunkSize: s.MaxChunkSize = tidbOptPositiveInt32(val, DefMaxChunkSize)