Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: remove legacy.SimpleTxnContextProvider #35667

Merged
merged 12 commits into from
Jun 29, 2022
1 change: 0 additions & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ go_library(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//sessiontxn",
"//sessiontxn/legacy",
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
Expand Down
85 changes: 16 additions & 69 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/legacy"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -88,14 +87,11 @@ var (
// executorBuilder builds an Executor from a Plan.
// The InfoSchema must not change during execution.
type executorBuilder struct {
ctx sessionctx.Context
is infoschema.InfoSchema
snapshotTS uint64 // The ts for snapshot-read. A select statement without for update will use this ts
forUpdateTS uint64 // The ts should be used by insert/update/delete/select-for-update statement
snapshotTSCached bool
err error // err is set when there is error happened during Executor building process.
hasLock bool
Ti *TelemetryInfo
ctx sessionctx.Context
is infoschema.InfoSchema
err error // err is set when there is error happened during Executor building process.
hasLock bool
Ti *TelemetryInfo
// isStaleness means whether this statement use stale read.
isStaleness bool
readReplicaScope string
Expand All @@ -121,26 +117,13 @@ type CTEStorages struct {
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *executorBuilder {
b := &executorBuilder{
return &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
isStaleness: staleread.IsStmtStaleness(ctx),
readReplicaScope: replicaReadScope,
}

txnManager := sessiontxn.GetTxnManager(ctx)
if provider, ok := txnManager.GetContextProvider().(*legacy.SimpleTxnContextProvider); ok {
provider.GetReadTSFunc = b.getReadTS
provider.GetForUpdateTSFunc = func() (uint64, error) {
if b.forUpdateTS != 0 {
return b.forUpdateTS, nil
}
return b.getReadTS()
}
}

return b
}

// MockPhysicalPlan is used to return a specified executor in when build.
Expand Down Expand Up @@ -657,9 +640,7 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
defer func() { b.inSelectLockStmt = false }()
}
b.hasLock = true

// Build 'select for update' using the 'for update' ts.
if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand Down Expand Up @@ -865,8 +846,7 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor {

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
b.inInsertStmt = true

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand Down Expand Up @@ -1581,44 +1561,6 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
return txnManager.GetStmtReadTS()
}

// getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level
// and some stale/historical read contexts. For example, it will return txn.StartTS in RR and return
// the current timestamp in RC isolation
func (b *executorBuilder) getReadTS() (uint64, error) {
failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() {
// after refactoring stale read will use its own context provider
staleread.AssertStmtStaleness(b.ctx, false)
})

if b.snapshotTSCached {
return b.snapshotTS, nil
}

if snapshotTS := b.ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
b.snapshotTS = snapshotTS
b.snapshotTSCached = true
return snapshotTS, nil
}

if b.snapshotTS != 0 {
b.snapshotTSCached = true
// Return the cached value.
return b.snapshotTS, nil
}

txn, err := b.ctx.Txn(true)
if err != nil {
return 0, err
}

b.snapshotTS = txn.StartTS()
if b.snapshotTS == 0 {
return 0, errors.Trace(ErrGetStartTS)
}
b.snapshotTSCached = true
return b.snapshotTS, nil
}

func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor {
switch v.DBName.L {
case util.MetricSchemaName.L:
Expand Down Expand Up @@ -2119,8 +2061,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
}
}
}

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand Down Expand Up @@ -2178,7 +2119,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)
}

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand All @@ -2197,6 +2138,12 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
return deleteExec
}

func (b *executorBuilder) updateForUpdateTS() error {
// GetStmtForUpdateTS will auto update the for update ts if it is necessary
_, err := sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS()
return err
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask {
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O}
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
Expand Down
2 changes: 0 additions & 2 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ import (
func enableStaleReadCommonFailPoint(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return"))
return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan"))
}
}
Expand Down
1 change: 0 additions & 1 deletion session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ go_library(
"//sessionctx/variable",
"//sessiontxn",
"//sessiontxn/isolation",
"//sessiontxn/legacy",
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
Expand Down
10 changes: 1 addition & 9 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx/sessionstates"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/legacy"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -2388,7 +2387,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}

var is infoschema.InfoSchema
var snapshotTS uint64
replicaReadScope := oracle.GlobalTxnScope

Expand All @@ -2400,7 +2398,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
txnManager := sessiontxn.GetTxnManager(s)
if staleReadProcessor.IsStaleness() {
snapshotTS = staleReadProcessor.GetStalenessReadTS()
is = staleReadProcessor.GetStalenessInfoSchema()
is := staleReadProcessor.GetStalenessInfoSchema()
replicaReadScope = config.GetTxnScopeFromConfig()
err = txnManager.EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{
Type: sessiontxn.EnterNewTxnWithReplaceProvider,
Expand All @@ -2410,8 +2408,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
if err != nil {
return nil, err
}
} else {
is = s.GetInfoSchema().(infoschema.InfoSchema)
}

staleness := snapshotTS > 0
Expand All @@ -2427,10 +2423,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
return nil, err
}

if p, isOK := txnManager.GetContextProvider().(*legacy.SimpleTxnContextProvider); isOK {
p.InfoSchema = is
}

if ok {
rs, ok, err := s.cachedPointPlanExec(ctx, txnManager.GetTxnInfoSchema(), stmtID, preparedStmt, replicaReadScope, args)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/sessionstates/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//parser/types",
"//sessionctx/stmtctx",
"//types",
],
)
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ go_library(
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//parser/terror",
"//util/disk",
"//util/execdetails",
"//util/memory",
"//util/resourcegrouptag",
"//util/topsql/stmtstats",
"//util/tracing",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_atomic//:atomic",
Expand All @@ -32,9 +34,11 @@ go_test(
embed = [":stmtctx"],
deps = [
"//kv",
"//sessionctx/variable",
"//testkit",
"//testkit/testsetup",
"//util/execdetails",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_goleak//:goleak",
Expand Down
1 change: 1 addition & 0 deletions sessiontxn/isolation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
"//testkit",
"//testkit/testsetup",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
Expand Down
41 changes: 0 additions & 41 deletions sessiontxn/legacy/BUILD.bazel

This file was deleted.

Loading