Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: prefetch for DELETE and BatchPointGet when multi-statement #29391

Merged
merged 17 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,14 +2127,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
case *ast.UpdateStmt:
ResetUpdateStmtCtx(sc, stmt, vars)
case *ast.DeleteStmt:
sc.InDeleteStmt = true
sc.DupKeyAsWarning = stmt.IgnoreErr
sc.BadNullAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.TruncateAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.DividedByZeroAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.AllowInvalidDate = vars.SQLMode.HasAllowInvalidDatesMode()
sc.IgnoreZeroInDate = !vars.SQLMode.HasNoZeroInDateMode() || !vars.SQLMode.HasNoZeroDateMode() || !vars.StrictSQLMode || stmt.IgnoreErr || sc.AllowInvalidDate
sc.Priority = stmt.Priority
ResetDeleteStmtCtx(sc, stmt, vars)
case *ast.InsertStmt:
sc.InInsertStmt = true
// For insert statement (not for update statement), disabling the StrictSQLMode
Expand Down Expand Up @@ -2275,6 +2268,18 @@ func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars
sc.IgnoreNoPartition = stmt.IgnoreErr
}

// ResetDeleteStmtCtx resets statement context for DeleteStmt.
func ResetDeleteStmtCtx(sc *stmtctx.StatementContext, stmt *ast.DeleteStmt, vars *variable.SessionVars) {
sc.InDeleteStmt = true
sc.DupKeyAsWarning = stmt.IgnoreErr
sc.BadNullAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.TruncateAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.DividedByZeroAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.AllowInvalidDate = vars.SQLMode.HasAllowInvalidDatesMode()
sc.IgnoreZeroInDate = !vars.SQLMode.HasNoZeroInDateMode() || !vars.SQLMode.HasNoZeroDateMode() || !vars.StrictSQLMode || stmt.IgnoreErr || sc.AllowInvalidDate
sc.Priority = stmt.Priority
}

func setOptionForTopSQL(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot == nil {
return
Expand Down
2 changes: 1 addition & 1 deletion planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ type BatchPointGetPlan struct {
cost float64

// SinglePart indicates whether this BatchPointGetPlan is just for a single partition, instead of the whole partition table.
// If the BatchPointGetPlan is built in fast path, this value if false; if the plan is generated in physical optimization for a partition,
// If the BatchPointGetPlan is built in fast path, this value is false; if the plan is generated in physical optimization for a partition,
// this value would be true. This value would decide the behavior of BatchPointGetExec, i.e, whether to compute the table ID of the partition
// on the fly.
SinglePart bool
Expand Down
87 changes: 70 additions & 17 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,49 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
pointPlans := make([]plannercore.Plan, len(stmts))
var idxKeys []kv.Key //nolint: prealloc
var rowKeys []kv.Key //nolint: prealloc

handlePlan := func(p plannercore.PhysicalPlan, resetStmtCtxFn func()) error {
var tableID int64
switch v := p.(type) {
case *plannercore.PointGetPlan:
if v.PartitionInfo != nil {
tableID = v.PartitionInfo.ID
} else {
tableID = v.TblInfo.ID
}
if v.IndexInfo != nil {
resetStmtCtxFn()
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.getCtx(), v.TblInfo, v.IndexInfo, v.IndexValues, tableID)
if err1 != nil {
return err1
}
idxKeys = append(idxKeys, idxKey)
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tableID, v.Handle))
}
case *plannercore.BatchPointGetPlan:
if v.PartitionInfos != nil {
// TODO: move getting physical table ID from BatchPointGetExec.initialize to newBatchPointGetPlan
return nil
}
if v.IndexInfo != nil {
resetStmtCtxFn()
for _, idxVals := range v.IndexValues {
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.getCtx(), v.TblInfo, v.IndexInfo, idxVals, v.TblInfo.ID)
if err1 != nil {
return err1
}
idxKeys = append(idxKeys, idxKey)
}
} else {
for _, handle := range v.Handles {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(v.TblInfo.ID, handle))
}
}
}
return nil
}

sc := vars.StmtCtx
for i, stmt := range stmts {
if _, ok := stmt.(*ast.UseStmt); ok {
Expand All @@ -1936,25 +1979,35 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
if p == nil {
continue
}
// Only support Update for now.
// Only support Update and Delete for now.
// TODO: support other point plans.
if x, ok := p.(*plannercore.Update); ok {
switch x := p.(type) {
case *plannercore.Update:
//nolint:forcetypeassert
updateStmt := stmt.(*ast.UpdateStmt)
if pp, ok := x.SelectPlan.(*plannercore.PointGetPlan); ok {
if pp.PartitionInfo != nil {
continue
}
if pp.IndexInfo != nil {
executor.ResetUpdateStmtCtx(sc, updateStmt, vars)
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.getCtx(), pp.TblInfo, pp.IndexInfo, pp.IndexValues, pp.TblInfo.ID)
if err1 != nil {
return nil, err1
}
idxKeys = append(idxKeys, idxKey)
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(pp.TblInfo.ID, pp.Handle))
}
updateStmt, ok := stmt.(*ast.UpdateStmt)
if !ok {
logutil.BgLogger().Warn("unexpected statement type for Update plan",
zap.String("type", fmt.Sprintf("%T", stmt)))
continue
}
err = handlePlan(x.SelectPlan, func() {
executor.ResetUpdateStmtCtx(sc, updateStmt, vars)
})
if err != nil {
return nil, err
}
case *plannercore.Delete:
deleteStmt, ok := stmt.(*ast.DeleteStmt)
if !ok {
logutil.BgLogger().Warn("unexpected statement type for Delete plan",
zap.String("type", fmt.Sprintf("%T", stmt)))
continue
}
err = handlePlan(x.SelectPlan, func() {
executor.ResetDeleteStmtCtx(sc, deleteStmt, vars)
})
if err != nil {
return nil, err
}
}
}
Expand Down
133 changes: 132 additions & 1 deletion server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ type snapshotCache interface {
SnapCacheHitCount() int
}

func TestPrefetchPointKeys(t *testing.T) {
func TestPrefetchPointKeys4Update(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
Expand Down Expand Up @@ -832,6 +832,137 @@ func TestPrefetchPointKeys(t *testing.T) {
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}

func TestPrefetchPointKeys4Delete(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
cc.setCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int, b int, c int, primary key (a, b))")
tk.MustExec("insert prefetch values (1, 1, 1), (2, 2, 2), (3, 3, 3)")
tk.MustExec("insert prefetch values (4, 4, 4), (5, 5, 5), (6, 6, 6)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 2 and b = 2")

// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a = 1 and b = 1;" +
"delete from prefetch where a = 2 and b = 2;" +
"delete from prefetch where a = 3 and b = 3;"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 4, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("4 4 4", "5 5 5", "6 6 6"))

tk.MustExec("begin pessimistic")
tk.MustExec("delete from prefetch where a = 5 and b = 5")
require.Equal(t, 2, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
query = "delete from prefetch where a = 4 and b = 4;" +
"delete from prefetch where a = 5 and b = 5;" +
"delete from prefetch where a = 6 and b = 6;"
err = cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
require.Equal(t, 6, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}

func TestPrefetchBatchPointGet(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
cc.setCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int primary key, b int)")
tk.MustExec("insert prefetch values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 1")

// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a in (2,3);" +
"delete from prefetch where a in (4,5);"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 4, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}

func TestPrefetchPartitionTable(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
cc.setCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int primary key, b int) partition by hash(a) partitions 4")
tk.MustExec("insert prefetch values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 1")

// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a = 2;" +
"delete from prefetch where a = 3;" +
"delete from prefetch where a in (4,5);"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
// TODO: support it later
//nolint:forcetypeassert
require.Equal(t, 2, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}

func TestTiFlashFallback(t *testing.T) {
store := testkit.CreateMockStore(t,
mockstore.WithClusterInspector(func(c testutils.Cluster) {
Expand Down