Skip to content

Commit

Permalink
sessionctx,store,kv: add a session variable to export last txn info (#…
Browse files Browse the repository at this point in the history
…19949) (#20696)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
  • Loading branch information
ti-srebot authored Nov 12, 2020
1 parent 92a602b commit ee324ef
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 1 deletion.
54 changes: 54 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2719,6 +2719,60 @@ func (s *testSuite) TestTiDBCurrentTS(c *C) {
c.Assert(terror.ErrorEqual(err, variable.ErrReadOnly), IsTrue, Commentf("err %v", err))
}

func (s *testSuite) TestTiDBLastTxnInfo(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key)")
tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Check(testkit.Rows("0 0"))

tk.MustExec("insert into t values (1)")
rows1 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
c.Assert(rows1[0][0].(string), Greater, "0")
c.Assert(rows1[0][0].(string), Less, rows1[0][1].(string))

tk.MustExec("begin")
tk.MustQuery("select a from t where a = 1").Check(testkit.Rows("1"))
rows2 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), @@tidb_current_ts").Rows()
tk.MustExec("commit")
rows3 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
c.Assert(rows2[0][0], Equals, rows1[0][0])
c.Assert(rows2[0][1], Equals, rows1[0][1])
c.Assert(rows3[0][0], Equals, rows1[0][0])
c.Assert(rows3[0][1], Equals, rows1[0][1])
c.Assert(rows2[0][1], Less, rows2[0][2])

tk.MustExec("begin")
tk.MustExec("update t set a = a + 1 where a = 1")
rows4 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), @@tidb_current_ts").Rows()
tk.MustExec("commit")
rows5 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
c.Assert(rows4[0][0], Equals, rows1[0][0])
c.Assert(rows4[0][1], Equals, rows1[0][1])
c.Assert(rows4[0][2], Equals, rows5[0][0])
c.Assert(rows4[0][1], Less, rows4[0][2])
c.Assert(rows4[0][2], Less, rows5[0][1])

tk.MustExec("begin")
tk.MustExec("update t set a = a + 1 where a = 2")
tk.MustExec("rollback")
rows6 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()
c.Assert(rows6[0][0], Equals, rows5[0][0])
c.Assert(rows6[0][1], Equals, rows5[0][1])

tk.MustExec("begin optimistic")
tk.MustExec("insert into t values (2)")
_, err := tk.Exec("commit")
c.Assert(err, NotNil)
rows7 := tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.start_ts'), json_extract(@@tidb_last_txn_info, '$.commit_ts'), json_extract(@@tidb_last_txn_info, '$.error')").Rows()
c.Assert(rows7[0][0], Greater, rows5[0][0])
c.Assert(rows7[0][1], Equals, "0")
c.Assert(strings.Contains(err.Error(), rows7[0][1].(string)), IsTrue)

_, err = tk.Exec("set @@tidb_last_txn_info = '{}'")
c.Assert(terror.ErrorEqual(err, variable.ErrReadOnly), IsTrue, Commentf("err %v", err))
}

func (s *testSuite) TestSelectForUpdate(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const (
InfoSchema
// SchemaAmender is used to amend mutations for pessimistic transactions
SchemaAmender
// CommitHook is a callback function called right after the transaction gets committed
CommitHook
)

// Priority value for transaction priority.
Expand Down
7 changes: 7 additions & 0 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,10 @@ func MockCommitErrorDisable() {
func IsMockCommitErrorEnable() bool {
return atomic.LoadInt64(&mockCommitErrorEnable) == 1
}

// TxnInfo is used to keep track the info of a committed transaction (mainly for diagnosis and testing)
type TxnInfo struct {
StartTS uint64 `json:"start_ts"`
CommitTS uint64 `json:"commit_ts"`
ErrMsg string `json:"error,omitempty"`
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func (s *session) doCommit(ctx context.Context) error {
// Set this option for 2 phase commit to validate schema lease.
s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs))
s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema)
s.txn.SetOption(kv.CommitHook, func(info kv.TxnInfo, _ error) { s.sessionVars.LastTxnInfo = info })
if s.GetSessionVars().EnableAmendPessimisticTxn {
s.txn.SetOption(kv.SchemaAmender, NewSchemaAmenderForTikvTxn(s))
}
Expand Down
5 changes: 4 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,9 @@ type SessionVars struct {

// EnableAmendPessimisticTxn indicates if schema change amend is enabled for pessimistic transactions.
EnableAmendPessimisticTxn bool

// LastTxnInfo keeps track the info of last committed transaction
LastTxnInfo kv.TxnInfo
}

// PreparedParams contains the parameters of the current prepared statement when executing it.
Expand Down Expand Up @@ -1181,7 +1184,7 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.BatchCommit = TiDBOptOn(val)
case TiDBDMLBatchSize:
s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize)
case TiDBCurrentTS, TiDBConfig:
case TiDBCurrentTS, TiDBLastTxnInfo, TiDBConfig:
return ErrReadOnly
case TiDBMaxChunkSize:
s.MaxChunkSize = tidbOptPositiveInt32(val, DefMaxChunkSize)
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBBatchCommit, BoolToIntStr(DefBatchCommit)},
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeSession, TiDBLastTxnInfo, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
{ScopeGlobal | ScopeSession, TiDBAllowBatchCop, strconv.Itoa(DefTiDBAllowBatchCop)},
{ScopeGlobal | ScopeSession, TiDBInitChunkSize, strconv.Itoa(DefInitChunkSize)},
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ const (
// It is read-only.
TiDBCurrentTS = "tidb_current_ts"

// TiDBLastTxnInfo is used to get the last transaction info within the current session.
TiDBLastTxnInfo = "tidb_last_txn_info"

// tidb_config is a read-only variable that shows the config of the current server.
TiDBConfig = "tidb_config"

Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ func GetSessionOnlySysVars(s *SessionVars, key string) (string, bool, error) {
switch sysVar.Name {
case TiDBCurrentTS:
return fmt.Sprintf("%d", s.TxnCtx.StartTS), true, nil
case TiDBLastTxnInfo:
info, err := json.Marshal(s.LastTxnInfo)
if err != nil {
return "", true, err
}
return string(info), true, nil
case TiDBGeneralLog:
return fmt.Sprintf("%d", atomic.LoadUint32(&ProcessGeneralLog)), true, nil
case TiDBPProfSQLCPU:
Expand Down
20 changes: 20 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ type tikvTxn struct {
txnInfoSchema SchemaVer
// SchemaAmender is used amend pessimistic txn commit mutations for schema change
schemaAmender SchemaAmender
// commitCallback is called after current transaction gets committed
commitCallback func(info kv.TxnInfo, err error)
}

func newTiKVTxn(store *tikvStore) (*tikvTxn, error) {
Expand Down Expand Up @@ -243,6 +245,8 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) {
txn.txnInfoSchema = val.(SchemaVer)
case kv.SchemaAmender:
txn.schemaAmender = val.(SchemaAmender)
case kv.CommitHook:
txn.commitCallback = val.(func(info kv.TxnInfo, err error))
}
}

Expand Down Expand Up @@ -316,6 +320,9 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
// pessimistic transaction should also bypass latch.
if txn.store.txnLatches == nil || txn.IsPessimistic() {
err = committer.execute(ctx)
if val == nil || connID > 0 {
txn.onCommitted(err)
}
logutil.Logger(ctx).Debug("[kv] txnLatches disabled, 2pc directly", zap.Error(err))
return errors.Trace(err)
}
Expand All @@ -334,6 +341,9 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
return kv.ErrWriteConflictInTiDB.FastGenByArgs(txn.startTS)
}
err = committer.execute(ctx)
if val == nil || connID > 0 {
txn.onCommitted(err)
}
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
Expand Down Expand Up @@ -371,6 +381,16 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error {
return txn.committer.pessimisticRollbackMutations(NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars), CommitterMutations{keys: txn.lockKeys})
}

func (txn *tikvTxn) onCommitted(err error) {
if txn.commitCallback != nil {
info := kv.TxnInfo{StartTS: txn.startTS, CommitTS: txn.commitTS}
if err != nil {
info.ErrMsg = err.Error()
}
txn.commitCallback(info, err)
}
}

// lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock
func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...kv.Key) error {
txn.mu.Lock()
Expand Down

0 comments on commit ee324ef

Please sign in to comment.