Skip to content

Commit

Permalink
executor: add pessimistic lock keys runtime information (#19547) (#20199
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ti-srebot authored Oct 10, 2020
1 parent 2f98a29 commit 08c63bf
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 80 deletions.
28 changes: 17 additions & 11 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,8 +556,13 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
seVars := sctx.GetSessionVars()
lockCtx := newLockCtx(seVars, seVars.LockWaitTimeout)
var lockKeyStats *execdetails.LockKeysDetails
ctx = context.WithValue(ctx, execdetails.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
err = txn.LockKeys(ctx, lockCtx, keys...)
if lockKeyStats != nil {
seVars.StmtCtx.MergeLockKeysExecDetails(lockKeyStats)
}
if err == nil {
return nil
}
Expand Down Expand Up @@ -795,10 +800,21 @@ func (a *ExecStmt) CloseRecordSet(txnStartTS uint64, lastErr error) {
// 3. record execute duration metric.
// 4. update the `PrevStmt` in session variable.
func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, succ bool, hasMoreResults bool) {
sessVars := a.Ctx.GetSessionVars()
execDetail := sessVars.StmtCtx.GetExecDetails()
// Attach commit/lockKeys runtime stats to executor runtime stats.
if (execDetail.CommitDetail != nil || execDetail.LockKeysDetail != nil) && sessVars.StmtCtx.RuntimeStatsColl != nil {
stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID())
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
RuntimeStats: stats,
Commit: execDetail.CommitDetail,
LockKeys: execDetail.LockKeysDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
sessVars := a.Ctx.GetSessionVars()
prevStmt := a.GetTextToLog()
if config.RedactLogEnabled() {
sessVars.PrevStmt = FormatSQL(prevStmt, nil)
Expand Down Expand Up @@ -850,16 +866,6 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
stmtDetail = *(stmtDetailRaw.(*execdetails.StmtExecDetails))
}
execDetail := sessVars.StmtCtx.GetExecDetails()

// Attach commit runtime stats to executor runtime stats.
if execDetail.CommitDetail != nil && sessVars.StmtCtx.RuntimeStatsColl != nil {
stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID())
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
RuntimeStats: stats,
Commit: execDetail.CommitDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
Expand Down
8 changes: 7 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,13 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *kv.LockCtx,
if err != nil {
return err
}
return txn.LockKeys(sessionctx.SetCommitCtx(ctx, se), lockCtx, keys...)
var lockKeyStats *execdetails.LockKeysDetails
ctx = context.WithValue(ctx, execdetails.LockKeysDetailCtxKey, &lockKeyStats)
err = txn.LockKeys(sessionctx.SetCommitCtx(ctx, se), lockCtx, keys...)
if lockKeyStats != nil {
sctx.MergeLockKeysExecDetails(lockKeyStats)
}
return err
}

// LimitExec represents limit executor
Expand Down
37 changes: 37 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6017,6 +6017,43 @@ func (s *testSuite) TestIssue19372(c *C) {
tk.MustQuery("select (select t2.c_str from t2 where t2.c_str <= t1.c_str and t2.c_int in (1, 2) order by t2.c_str limit 1) x from t1 order by c_int;").Check(testkit.Rows("a", "a", "a"))
}

func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int, unique index (a))")

testSQLs := []string{
"insert ignore into t1 values (5,5);",
"insert into t1 values (5,5) on duplicate key update a=a+1;",
"replace into t1 values (5,6),(6,7)",
"update t1 set a=a+1 where a=6;",
}

getRootStats := func() string {
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(plannercore.Plan)
c.Assert(ok, IsTrue)
stats := tk.Se.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(p.ID())
return stats.String()
}
for _, sql := range testSQLs {
tk.MustExec(sql)
}

// Test for lock keys stats.
tk.MustExec("begin pessimistic")
tk.MustExec("update t1 set b=b+1")
c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*")
tk.MustExec("rollback")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t1 for update").Check(testkit.Rows("5 6", "7 7"))
c.Assert(getRootStats(), Matches, "time.*lock_keys.*time.* region.* keys.* lock_rpc:.* rpc_count.*")
tk.MustExec("rollback")
}

func (s *testSuite) TestIssue13758(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -236,6 +237,7 @@ type LockCtx struct {
ValuesLock sync.Mutex
LockExpired *uint32
CheckKeyExists map[string]struct{}
Stats *execdetails.LockKeysDetails
}

// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
Expand Down
11 changes: 11 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,17 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
sc.mu.Unlock()
}

// MergeLockKeysExecDetails merges lock keys execution details into self.
func (sc *StatementContext) MergeLockKeysExecDetails(lockKeys *execdetails.LockKeysDetails) {
sc.mu.Lock()
if sc.mu.execDetails.LockKeysDetail == nil {
sc.mu.execDetails.LockKeysDetail = lockKeys
} else {
sc.mu.execDetails.LockKeysDetail.Merge(lockKeys)
}
sc.mu.Unlock()
}

// GetExecDetails gets the execution details for the statement.
func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails {
var details execdetails.ExecDetails
Expand Down
17 changes: 16 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh

var batches []batchMutations
var sizeFunc = c.keySize
if _, ok := action.(actionPrewrite); ok {

switch act := action.(type) {
case actionPrewrite:
// Do not update regionTxnSize on retries. They are not used when building a PrewriteRequest.
if len(bo.errors) == 0 {
for _, group := range groups {
Expand All @@ -564,6 +566,10 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
}
sizeFunc = c.keyValueSize
atomic.AddInt32(&c.getDetail().PrewriteRegionNum, int32(len(groups)))
case actionPessimisticLock:
if act.LockCtx.Stats != nil {
act.LockCtx.Stats.RegionNum = int32(len(groups))
}
}

primaryIdx := -1
Expand Down Expand Up @@ -934,7 +940,12 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
time.Sleep(300 * time.Millisecond)
return kv.ErrWriteConflict
})
startTime := time.Now()
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.LockRPCTime, int64(time.Since(startTime)))
atomic.AddInt64(&action.LockCtx.Stats.LockRPCCount, 1)
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -989,7 +1000,11 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
startTime = time.Now()
msBeforeTxnExpired, _, err := c.store.lockResolver.ResolveLocks(bo, 0, locks)
if action.LockCtx.Stats != nil {
atomic.AddInt64(&action.LockCtx.Stats.ResolveLockTime, int64(time.Since(startTime)))
}
if err != nil {
return errors.Trace(err)
}
Expand Down
18 changes: 18 additions & 0 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
// Exclude keys that are already locked.
var err error
keys := make([][]byte, 0, len(keysInput))
startTime := time.Now()
defer func() {
if err == nil {
if lockCtx.PessimisticLockWaited != nil {
Expand All @@ -380,6 +381,14 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
if lockCtx.LockKeysCount != nil {
*lockCtx.LockKeysCount += int32(len(keys))
}
if lockCtx.Stats != nil {
lockCtx.Stats.TotalTime = time.Since(startTime)
ctxValue := ctx.Value(execdetails.LockKeysDetailCtxKey)
if ctxValue != nil {
lockKeysDetail := ctxValue.(**execdetails.LockKeysDetails)
*lockKeysDetail = lockCtx.Stats
}
}
}()
for _, key := range keysInput {
// The value of lockedMap is only used by pessimistic transactions.
Expand Down Expand Up @@ -428,12 +437,21 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
assignedPrimaryKey = true
}

lockCtx.Stats = &execdetails.LockKeysDetails{
LockKeys: int32(len(keys)),
}
bo := NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars)
txn.committer.forUpdateTS = lockCtx.ForUpdateTS
// If the number of keys greater than 1, it can be on different region,
// concurrently execute on multiple regions may lead to deadlock.
txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1
err = txn.committer.pessimisticLockMutations(bo, lockCtx, CommitterMutations{keys: keys})
if bo.totalSleep > 0 {
atomic.AddInt64(&lockCtx.Stats.BackoffTime, int64(bo.totalSleep)*int64(time.Millisecond))
lockCtx.Stats.Mu.Lock()
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.types...)
lockCtx.Stats.Mu.Unlock()
}
if lockCtx.Killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
Expand Down
Loading

0 comments on commit 08c63bf

Please sign in to comment.