Skip to content

Commit

Permalink
session,executor: fix point get under @@tidb_snapshot (#22460) (#22527)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jan 27, 2021
1 parent 8d893eb commit 67a531f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 23 deletions.
46 changes: 23 additions & 23 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,30 +706,30 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx := a.Ctx
stmtCtx := ctx.GetSessionVars().StmtCtx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
useMaxTS, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.Plan)
if err != nil {
return nil, err
}
if useMaxTS {
logutil.BgLogger().Debug("init txnStartTS with MaxUint64", zap.Uint64("conn", ctx.GetSessionVars().ConnectionID), zap.String("text", a.Text))
err = ctx.InitTxnWithStartTS(math.MaxUint64)
} else if ctx.GetSessionVars().SnapshotTS != 0 {
if _, ok := a.Plan.(*plannercore.CheckTable); ok {
err = ctx.InitTxnWithStartTS(ctx.GetSessionVars().SnapshotTS)
if snapshotTS := ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
if err := ctx.InitTxnWithStartTS(snapshotTS); err != nil {
return nil, err
}
}
if err != nil {
return nil, err
}

if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case useMaxTS:
stmtCtx.Priority = kv.PriorityHigh
case a.LowerPriority:
stmtCtx.Priority = kv.PriorityLow
} else {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
useMaxTS, err := plannercore.IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.Plan)
if err != nil {
return nil, err
}
if useMaxTS {
logutil.BgLogger().Debug("init txnStartTS with MaxUint64", zap.Uint64("conn", ctx.GetSessionVars().ConnectionID), zap.String("text", a.Text))
if err := ctx.InitTxnWithStartTS(math.MaxUint64); err != nil {
return nil, err
}
}
if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case useMaxTS:
stmtCtx.Priority = kv.PriorityHigh
case a.LowerPriority:
stmtCtx.Priority = kv.PriorityLow
}
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,44 @@ func (s *testPointGetSuite) TestReturnValues(c *C) {
tk.MustExec("rollback")
}

func (s *testPointGetSuite) TestWithTiDBSnapshot(c *C) {
// Fix issue https://github.com/pingcap/tidb/issues/22436
// Point get should not use math.MaxUint64 when variable @@tidb_snapshot is set.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists xx")
tk.MustExec(`create table xx (id int key)`)
tk.MustExec(`insert into xx values (1), (7)`)

// Unrelated code, to make this test pass in the unit test.
// The `tikv_gc_safe_point` global variable must be there, otherwise the 'set @@tidb_snapshot' operation fails.
timeSafe := time.Now().Add(-48 * 60 * 60 * time.Second).Format("20060102-15:04:05 -0700 MST")
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
ON DUPLICATE KEY
UPDATE variable_value = '%[1]s'`
tk.MustExec(fmt.Sprintf(safePointSQL, timeSafe))

// Record the current tso.
tk.MustExec("begin")
tso := tk.Se.GetSessionVars().TxnCtx.StartTS
tk.MustExec("rollback")
c.Assert(tso > 0, IsTrue)

// Insert data.
tk.MustExec("insert into xx values (8)")

// Change the snapshot before the tso, the inserted data should not be seen.
tk.MustExec(fmt.Sprintf("set @@tidb_snapshot = '%d'", tso))
tk.MustQuery("select * from xx where id = 8").Check(testkit.Rows())

tk.MustQuery("select * from xx").Check(testkit.Rows("1", "7"))

// Check the query inside a transaction.
tk.MustExec("begin")
tk.MustQuery("select * from xx where id = 8").Check(testkit.Rows())
tk.MustExec("rollback")
}

func (s *testPointGetSuite) TestPointGetLockExistKey(c *C) {
var wg sync.WaitGroup
errCh := make(chan error)
Expand Down
5 changes: 5 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2210,6 +2210,11 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {

// PrepareTSFuture uses to try to get ts future.
func (s *session) PrepareTSFuture(ctx context.Context) {
if s.sessionVars.SnapshotTS != 0 {
// Do nothing when @@tidb_snapshot is set.
// In case the latest tso is misused.
return
}
if !s.txn.validOrPending() {
// Prepare the transaction future if the transaction is invalid (at the beginning of the transaction).
txnFuture := s.getTxnFuture(ctx)
Expand Down

0 comments on commit 67a531f

Please sign in to comment.