Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session,executor: fix point get under @@tidb_snapshot (#22460) #22527

Merged
merged 8 commits into from
Jan 27, 2021
46 changes: 23 additions & 23 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,30 +706,30 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
useMaxTS, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.Plan)
if err != nil {
return nil, err
}
if useMaxTS {
logutil.BgLogger().Debug("init txnStartTS with MaxUint64", zap.Uint64("conn", ctx.GetSessionVars().ConnectionID), zap.String("text", a.Text))
err = ctx.InitTxnWithStartTS(math.MaxUint64)
} else if ctx.GetSessionVars().SnapshotTS != 0 {
if _, ok := a.Plan.(*plannercore.CheckTable); ok {
err = ctx.InitTxnWithStartTS(ctx.GetSessionVars().SnapshotTS)
if snapshotTS := ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
if err := ctx.InitTxnWithStartTS(snapshotTS); err != nil {
return nil, err
}
}
if err != nil {
return nil, err
}

if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case useMaxTS:
stmtCtx.Priority = kv.PriorityHigh
case a.LowerPriority:
stmtCtx.Priority = kv.PriorityLow
} else {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
useMaxTS, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.Plan)
if err != nil {
return nil, err
}
if useMaxTS {
logutil.BgLogger().Debug("init txnStartTS with MaxUint64", zap.Uint64("conn", ctx.GetSessionVars().ConnectionID), zap.String("text", a.Text))
if err := ctx.InitTxnWithStartTS(math.MaxUint64); err != nil {
return nil, err
}
}
if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case useMaxTS:
stmtCtx.Priority = kv.PriorityHigh
case a.LowerPriority:
stmtCtx.Priority = kv.PriorityLow
}
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,44 @@ func (s *testPointGetSuite) TestReturnValues(c *C) {
tk.MustExec("rollback")
}

func (s *testPointGetSuite) TestWithTiDBSnapshot(c *C) {
// Fix issue https://github.com/pingcap/tidb/issues/22436
// Point get should not use math.MaxUint64 when variable @@tidb_snapshot is set.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists xx")
tk.MustExec(`create table xx (id int key)`)
tk.MustExec(`insert into xx values (1), (7)`)

// Unrelated code, to make this test pass in the unit test.
// The `tikv_gc_safe_point` global variable must be there, otherwise the 'set @@tidb_snapshot' operation fails.
timeSafe := time.Now().Add(-48 * 60 * 60 * time.Second).Format("20060102-15:04:05 -0700 MST")
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, timeSafe))

// Record the current tso.
tk.MustExec("begin")
tso := tk.Se.GetSessionVars().TxnCtx.StartTS
tk.MustExec("rollback")
c.Assert(tso > 0, IsTrue)

// Insert data.
tk.MustExec("insert into xx values (8)")

// Change the snapshot before the tso, the inserted data should not be seen.
tk.MustExec(fmt.Sprintf("set @@tidb_snapshot = '%d'", tso))
tk.MustQuery("select * from xx where id = 8").Check(testkit.Rows())

tk.MustQuery("select * from xx").Check(testkit.Rows("1", "7"))

// Check the query inside a transaction.
tk.MustExec("begin")
tk.MustQuery("select * from xx where id = 8").Check(testkit.Rows())
tk.MustExec("rollback")
}

func (s *testPointGetSuite) TestPointGetLockExistKey(c *C) {
var wg sync.WaitGroup
errCh := make(chan error)
Expand Down
5 changes: 5 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2192,6 +2192,11 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {

// PrepareTSFuture uses to try to get ts future.
func (s *session) PrepareTSFuture(ctx context.Context) {
if s.sessionVars.SnapshotTS != 0 {
// Do nothing when @@tidb_snapshot is set.
// In case the latest tso is misused.
return
}
if !s.txn.validOrPending() {
// Prepare the transaction future if the transaction is invalid (at the beginning of the transaction).
txnFuture := s.getTxnFuture(ctx)
Expand Down