Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: refactor ts acquisition within build and execute phases #35376

Merged
merged 49 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
cc05adf
update
SpadeA-Tang Jun 13, 2022
0f136bd
update
SpadeA-Tang Jun 13, 2022
9ad4492
update
SpadeA-Tang Jun 14, 2022
9a68f75
update
SpadeA-Tang Jun 14, 2022
ddd7ed4
update
SpadeA-Tang Jun 14, 2022
142f55e
update
SpadeA-Tang Jun 14, 2022
ebb581f
update
SpadeA-Tang Jun 15, 2022
b595d3d
update
SpadeA-Tang Jun 15, 2022
673d389
update
SpadeA-Tang Jun 15, 2022
0b431c2
update
SpadeA-Tang Jun 15, 2022
516530b
update
SpadeA-Tang Jun 15, 2022
4c832a2
update
SpadeA-Tang Jun 15, 2022
5e7fc56
checkpoint
SpadeA-Tang Jun 16, 2022
781ce8f
update
SpadeA-Tang Jun 16, 2022
b54c368
save
SpadeA-Tang Jun 16, 2022
139e506
update
SpadeA-Tang Jun 18, 2022
cff38ac
update
SpadeA-Tang Jun 18, 2022
b3dd7ee
update
SpadeA-Tang Jun 18, 2022
3c7d1d3
delete unrelated test
SpadeA-Tang Jun 18, 2022
0b70f0e
modify some comments
SpadeA-Tang Jun 19, 2022
9a500e9
update
SpadeA-Tang Jun 20, 2022
b88d5ab
update
SpadeA-Tang Jun 20, 2022
7b40adc
update
SpadeA-Tang Jun 20, 2022
4906089
update
SpadeA-Tang Jun 20, 2022
bd12553
update
SpadeA-Tang Jun 20, 2022
350efbb
update
SpadeA-Tang Jun 20, 2022
2f28146
update
SpadeA-Tang Jun 20, 2022
4e52ee6
fmt
SpadeA-Tang Jun 20, 2022
4700b48
Merge branch 'master' into removeTxnForUpdateTS
lcwangchao Jun 20, 2022
1c5a1e9
update optimizeForNotFetchingLatestTS
SpadeA-Tang Jun 21, 2022
aa9c357
Merge branch 'master' into removeTxnForUpdateTS
SpadeA-Tang Jun 21, 2022
be1c611
update
SpadeA-Tang Jun 22, 2022
83c5fa9
remove unused code
SpadeA-Tang Jun 22, 2022
1da2557
update
SpadeA-Tang Jun 23, 2022
251a875
update
SpadeA-Tang Jun 23, 2022
fe66d11
update
SpadeA-Tang Jun 23, 2022
3cfb2fa
udpate
SpadeA-Tang Jun 24, 2022
035a2bd
Merge branch 'master' into removeTxnForUpdateTS
SpadeA-Tang Jun 24, 2022
0d3e267
update
SpadeA-Tang Jun 24, 2022
d83319b
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
3be9075
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
f519b42
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
1ea7192
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
168f950
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
2eccd2e
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
303dbd7
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 24, 2022
26d11e3
Merge branch 'master' into removeTxnForUpdateTS
ti-chi-bot Jun 27, 2022
2d4caf4
update
SpadeA-Tang Jun 27, 2022
1092f63
Merge branch 'removeTxnForUpdateTS' of github.com:SpadeA-Tang/tidb in…
SpadeA-Tang Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 22 additions & 36 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func IsFastPlan(p plannercore.Plan) bool {
}

// Exec builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// like the INSERT, UPDATE statements, it executes in this function. If the Executor returns
// result, execution is done after this function returns, in the returned sqlexec.RecordSet Next method.
func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
defer func() {
Expand Down Expand Up @@ -708,7 +708,10 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
keys = filterTemporaryTableKeys(sctx.GetSessionVars(), keys)
seVars := sctx.GetSessionVars()
keys = filterLockTableKeys(seVars.StmtCtx, keys)
lockCtx := newLockCtx(seVars, seVars.LockWaitTimeout, len(keys))
lockCtx, err := newLockCtx(sctx, seVars.LockWaitTimeout, len(keys))
if err != nil {
return err
}
var lockKeyStats *util.LockKeysDetails
ctx = context.WithValue(ctx, util.LockKeysDetailCtxKey, &lockKeyStats)
startLocking := time.Now()
Expand All @@ -730,51 +733,27 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
}
}

// UpdateForUpdateTS updates the ForUpdateTS, if newForUpdateTS is 0, it obtain a new TS from PD.
func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
txn, err := seCtx.Txn(false)
if err != nil {
return err
}
if !txn.Valid() {
return errors.Trace(kv.ErrInvalidTxn)
}

// The Oracle serializable isolation is actually SI in pessimistic mode.
// Do not update ForUpdateTS when the user is using the Serializable isolation level.
// It can be used temporarily on the few occasions when an Oracle-like isolation level is needed.
// Support for this does not mean that TiDB supports serializable isolation of MySQL.
// tidb_skip_isolation_level_check should still be disabled by default.
if seCtx.GetSessionVars().IsIsolation(ast.Serializable) {
return nil
}
if newForUpdateTS == 0 {
// Because the ForUpdateTS is used for the snapshot for reading data in DML.
// We can avoid allocating a global TSO here to speed it up by using the local TSO.
version, err := seCtx.GetStore().CurrentVersion(seCtx.GetSessionVars().TxnCtx.TxnScope)
if err != nil {
return err
}
newForUpdateTS = version.Ver
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
return nil
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error) (_ Executor, err error) {
if lockErr == nil {
return nil, nil
}
failpoint.Inject("assertPessimisticLockErr", func() {
if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errWriteConflict")
} else if terror.ErrorEqual(kv.ErrKeyExists, lockErr) {
sessiontxn.AddAssertEntranceForLockError(a.Ctx, "errDuplicateKey")
}
})

defer func() {
if _, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok {
err = ErrDeadlock
}
}()

action, err := sessiontxn.GetTxnManager(a.Ctx).OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr)
txnManager := sessiontxn.GetTxnManager(a.Ctx)
action, err := txnManager.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr)
if err != nil {
return nil, err
}
Expand All @@ -789,10 +768,17 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
a.retryCount++
a.retryStartTime = time.Now()

err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(ctx)
err = txnManager.OnStmtRetry(ctx)
if err != nil {
return nil, err
}

// Without this line of code, the result will still be correct. But it can ensure that the update time of for update read
// is determined which is beneficial for testing.
if _, err = txnManager.GetStmtForUpdateTS(); err != nil {
return nil, err
}

breakpoint.Inject(a.Ctx, sessiontxn.BreakPointOnStmtRetryAfterLockError)

e, err := a.buildExecutor()
Expand Down
20 changes: 9 additions & 11 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type BatchPointGetExec struct {
singlePart bool
partTblID int64
idxVals [][]types.Datum
startTS uint64
readReplicaScope string
isStaleness bool
snapshotTS uint64
Expand Down Expand Up @@ -97,22 +96,18 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() {

// Open implements the Executor interface.
func (e *BatchPointGetExec) Open(context.Context) error {
e.snapshotTS = e.startTS
sessVars := e.ctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
stmtCtx := sessVars.StmtCtx
if e.lock {
e.snapshotTS = txnCtx.GetForUpdateTS()
}
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
e.txn = txn
var snapshot kv.Snapshot
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == e.snapshotTS {
// We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS.
// The snapshot may contains cache that can reduce RPC call.
// We can safely reuse the transaction snapshot if snapshotTS is equal to forUpdateTS.
// The snapshot may contain cache that can reduce RPC call.
snapshot = txn.GetSnapshot()
} else {
snapshot = e.ctx.GetSnapshotWithTS(e.snapshotTS)
Expand Down Expand Up @@ -540,13 +535,16 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}

// LockKeys locks the keys for pessimistic transaction.
func LockKeys(ctx context.Context, seCtx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := seCtx.GetSessionVars().TxnCtx
lctx := newLockCtx(seCtx.GetSessionVars(), lockWaitTime, len(keys))
func LockKeys(ctx context.Context, sctx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := sctx.GetSessionVars().TxnCtx
lctx, err := newLockCtx(sctx, lockWaitTime, len(keys))
if err != nil {
return err
}
if txnCtx.IsPessimistic {
lctx.InitReturnValues(len(keys))
}
err := doLockKeys(ctx, seCtx, lctx, keys...)
err = doLockKeys(ctx, sctx, lctx, keys...)
if err != nil {
return err
}
Expand Down
72 changes: 27 additions & 45 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,11 +657,11 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
defer func() { b.inSelectLockStmt = false }()
}
b.hasLock = true
if b.err = b.updateForUpdateTSIfNeeded(v.Children()[0]); b.err != nil {

// Build 'select for update' using the 'for update' ts.
if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you change "updateForUpdateTS" to "getSnapshotTS()"?
Does it update the latest TS for "SELECT ... FOR UPDATE" in original code, but not now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSnapshotTS can also update for_update_ts. The naming is somewhat confusing..

return nil
}
// Build 'select for update' using the 'for update' ts.
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

src := b.build(v.Children()[0])
if b.err != nil {
Expand Down Expand Up @@ -865,14 +865,11 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor {

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
b.inInsertStmt = true
if v.SelectPlan != nil {
// Try to update the forUpdateTS for insert/replace into select statements.
// Set the selectPlan parameter to nil to make it always update the forUpdateTS.
if b.err = b.updateForUpdateTSIfNeeded(nil); b.err != nil {
return nil
}

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
return nil
}
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

selectExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -2116,10 +2113,11 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
}
}
}
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
return nil
}
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand Down Expand Up @@ -2173,10 +2171,11 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
for _, info := range v.TblColPosInfos {
tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)
}
if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil {

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
return nil
}
b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
Expand All @@ -2192,34 +2191,6 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
return deleteExec
}

// updateForUpdateTSIfNeeded updates the ForUpdateTS for a pessimistic transaction if needed.
// PointGet executor will get conflict error if the ForUpdateTS is older than the latest commitTS,
// so we don't need to update now for better latency.
func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.PhysicalPlan) error {
txnCtx := b.ctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return nil
}
if _, ok := selectPlan.(*plannercore.PointGetPlan); ok {
return nil
}
// Activate the invalid txn, use the txn startTS as newForUpdateTS
txn, err := b.ctx.Txn(false)
if err != nil {
return err
}
if !txn.Valid() {
_, err := b.ctx.Txn(true)
if err != nil {
return err
}
return nil
}
// GetStmtForUpdateTS will auto update the for update ts if necessary
_, err = sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS()
return err
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask {
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O}
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
Expand Down Expand Up @@ -4663,18 +4634,26 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
return nil
}

startTS, err := b.getSnapshotTS()
if plan.Lock && !b.inSelectLockStmt {
b.inSelectLockStmt = true
defer func() {
b.inSelectLockStmt = false
}()
}

snapshotTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
return nil
}

decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
startTS: startTS,
snapshotTS: snapshotTS,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
keepOrder: plan.KeepOrder,
Expand All @@ -4687,9 +4666,11 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
partTblID: plan.PartTblID,
columns: plan.Columns,
}

if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(plan.TblInfo, startTS)
e.cacheTable = b.getCacheTable(plan.TblInfo, snapshotTS)
}

if plan.TblInfo.TempTableType != model.TempTableNone {
// Temporary table should not do any lock operations
e.lock = false
Expand All @@ -4699,6 +4680,7 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
if e.lock {
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
b.hasLock = true
}

var capacity int
if plan.IndexInfo != nil && !isCommonHandleRead(plan.TblInfo, plan.IndexInfo) {
e.idxVals = plan.IndexValues
Expand Down
19 changes: 14 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -1042,12 +1043,20 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
for id := range e.tblID2Handle {
e.updateDeltaForTableID(id)
}

return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime, len(e.keys)), e.keys...)
lockCtx, err := newLockCtx(e.ctx, lockWaitTime, len(e.keys))
if err != nil {
return err
}
return doLockKeys(ctx, e.ctx, lockCtx, e.keys...)
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64, numKeys int) *tikvstore.LockCtx {
lockCtx := tikvstore.NewLockCtx(seVars.TxnCtx.GetForUpdateTS(), lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime())
func newLockCtx(sctx sessionctx.Context, lockWaitTime int64, numKeys int) (*tikvstore.LockCtx, error) {
seVars := sctx.GetSessionVars()
forUpdateTS, err := sessiontxn.GetTxnManager(sctx).GetStmtForUpdateTS()
if err != nil {
return nil, err
}
lockCtx := tikvstore.NewLockCtx(forUpdateTS, lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You changed the old code process of newLockCtx. The first parameter of NewLockCtx comes from "seVars.TxnCtx.GetForUpdateTS()" before, but now from "sessiontxn.GetTxnManager(sctx).GetStmtForUpdateTS()".
Does the old code has some bugs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change it not due to a bug fix. This PR aims to change the management of the acquisition of ts by txnManager. Previsouly, some ts acquisition if from txnManager, but some is from TxnCtx such as here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the logic of "seVars.TxnCtx.GetForUpdateTS()" is different from "sessiontxn.GetTxnManager(sctx).GetStmtForUpdateTS()".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, previsouly, executor can change TxnCtx.forUpdateTS directly. Now, forUpdateTS is managed by the relevant transaction context provider and returned by GetStmtForUpdateTS. Although when forUpdateTS is changed, we also change txnCtx.forUpdateTS, now we should use GetTxnManager(sctx).GetStmtForUpdateTS() in normal code and only use TxnCtx.GetForUpdateTS() in places such as log print.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetTxnManager(sctx).GetStmtForUpdateTS() in normal code and only use TxnCtx.GetForUpdateTS() in places such as log print.

Any plan to remove the usage of GetForUpdateTS() in point_get and batch_point_get?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some will be removed in the later PRs.

lockCtx.Killed = &seVars.Killed
lockCtx.PessimisticLockWaited = &seVars.StmtCtx.PessimisticLockWaited
lockCtx.LockKeysDuration = &seVars.StmtCtx.LockKeysDuration
Expand Down Expand Up @@ -1082,7 +1091,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64, numKeys int) *
if lockCtx.ForUpdateTS > 0 && seVars.AssertionLevel != variable.AssertionLevelOff {
lockCtx.InitCheckExistence(numKeys)
}
return lockCtx
return lockCtx, nil
}

// doLockKeys is the main entry for pessimistic lock keys
Expand Down
Loading