Skip to content

Commit

Permalink
server: prefetch for DELETE and BatchPointGet when multi-statement (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Mar 22, 2023
1 parent cfcb862 commit d9149b0
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 29 deletions.
2 changes: 1 addition & 1 deletion br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ go_test(
],
embed = [":restore"],
flaky = True,
race = "on",
race = "off",
shard_count = 50,
deps = [
"//br/pkg/backup",
Expand Down
2 changes: 1 addition & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ go_library(

go_test(
name = "executor_test",
timeout = "short",
timeout = "moderate",
srcs = [
"adapter_test.go",
"admin_test.go",
Expand Down
21 changes: 13 additions & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2127,14 +2127,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
case *ast.UpdateStmt:
ResetUpdateStmtCtx(sc, stmt, vars)
case *ast.DeleteStmt:
sc.InDeleteStmt = true
sc.DupKeyAsWarning = stmt.IgnoreErr
sc.BadNullAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.TruncateAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.DividedByZeroAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.AllowInvalidDate = vars.SQLMode.HasAllowInvalidDatesMode()
sc.IgnoreZeroInDate = !vars.SQLMode.HasNoZeroInDateMode() || !vars.SQLMode.HasNoZeroDateMode() || !vars.StrictSQLMode || stmt.IgnoreErr || sc.AllowInvalidDate
sc.Priority = stmt.Priority
ResetDeleteStmtCtx(sc, stmt, vars)
case *ast.InsertStmt:
sc.InInsertStmt = true
// For insert statement (not for update statement), disabling the StrictSQLMode
Expand Down Expand Up @@ -2275,6 +2268,18 @@ func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars
sc.IgnoreNoPartition = stmt.IgnoreErr
}

// ResetDeleteStmtCtx resets statement context for DeleteStmt.
func ResetDeleteStmtCtx(sc *stmtctx.StatementContext, stmt *ast.DeleteStmt, vars *variable.SessionVars) {
sc.InDeleteStmt = true
sc.DupKeyAsWarning = stmt.IgnoreErr
sc.BadNullAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.TruncateAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.DividedByZeroAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.AllowInvalidDate = vars.SQLMode.HasAllowInvalidDatesMode()
sc.IgnoreZeroInDate = !vars.SQLMode.HasNoZeroInDateMode() || !vars.SQLMode.HasNoZeroDateMode() || !vars.StrictSQLMode || stmt.IgnoreErr || sc.AllowInvalidDate
sc.Priority = stmt.Priority
}

func setOptionForTopSQL(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot == nil {
return
Expand Down
2 changes: 1 addition & 1 deletion planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ type BatchPointGetPlan struct {
cost float64

// SinglePart indicates whether this BatchPointGetPlan is just for a single partition, instead of the whole partition table.
// If the BatchPointGetPlan is built in fast path, this value if false; if the plan is generated in physical optimization for a partition,
// If the BatchPointGetPlan is built in fast path, this value is false; if the plan is generated in physical optimization for a partition,
// this value would be true. This value would decide the behavior of BatchPointGetExec, i.e, whether to compute the table ID of the partition
// on the fly.
SinglePart bool
Expand Down
87 changes: 70 additions & 17 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1918,6 +1918,49 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
pointPlans := make([]plannercore.Plan, len(stmts))
var idxKeys []kv.Key //nolint: prealloc
var rowKeys []kv.Key //nolint: prealloc

handlePlan := func(p plannercore.PhysicalPlan, resetStmtCtxFn func()) error {
var tableID int64
switch v := p.(type) {
case *plannercore.PointGetPlan:
if v.PartitionInfo != nil {
tableID = v.PartitionInfo.ID
} else {
tableID = v.TblInfo.ID
}
if v.IndexInfo != nil {
resetStmtCtxFn()
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.getCtx(), v.TblInfo, v.IndexInfo, v.IndexValues, tableID)
if err1 != nil {
return err1
}
idxKeys = append(idxKeys, idxKey)
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tableID, v.Handle))
}
case *plannercore.BatchPointGetPlan:
if v.PartitionInfos != nil {
// TODO: move getting physical table ID from BatchPointGetExec.initialize to newBatchPointGetPlan
return nil
}
if v.IndexInfo != nil {
resetStmtCtxFn()
for _, idxVals := range v.IndexValues {
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.getCtx(), v.TblInfo, v.IndexInfo, idxVals, v.TblInfo.ID)
if err1 != nil {
return err1
}
idxKeys = append(idxKeys, idxKey)
}
} else {
for _, handle := range v.Handles {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(v.TblInfo.ID, handle))
}
}
}
return nil
}

sc := vars.StmtCtx
for i, stmt := range stmts {
if _, ok := stmt.(*ast.UseStmt); ok {
Expand All @@ -1936,25 +1979,35 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
if p == nil {
continue
}
// Only support Update for now.
// Only support Update and Delete for now.
// TODO: support other point plans.
if x, ok := p.(*plannercore.Update); ok {
switch x := p.(type) {
case *plannercore.Update:
//nolint:forcetypeassert
updateStmt := stmt.(*ast.UpdateStmt)
if pp, ok := x.SelectPlan.(*plannercore.PointGetPlan); ok {
if pp.PartitionInfo != nil {
continue
}
if pp.IndexInfo != nil {
executor.ResetUpdateStmtCtx(sc, updateStmt, vars)
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.getCtx(), pp.TblInfo, pp.IndexInfo, pp.IndexValues, pp.TblInfo.ID)
if err1 != nil {
return nil, err1
}
idxKeys = append(idxKeys, idxKey)
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(pp.TblInfo.ID, pp.Handle))
}
updateStmt, ok := stmt.(*ast.UpdateStmt)
if !ok {
logutil.BgLogger().Warn("unexpected statement type for Update plan",
zap.String("type", fmt.Sprintf("%T", stmt)))
continue
}
err = handlePlan(x.SelectPlan, func() {
executor.ResetUpdateStmtCtx(sc, updateStmt, vars)
})
if err != nil {
return nil, err
}
case *plannercore.Delete:
deleteStmt, ok := stmt.(*ast.DeleteStmt)
if !ok {
logutil.BgLogger().Warn("unexpected statement type for Delete plan",
zap.String("type", fmt.Sprintf("%T", stmt)))
continue
}
err = handlePlan(x.SelectPlan, func() {
executor.ResetDeleteStmtCtx(sc, deleteStmt, vars)
})
if err != nil {
return nil, err
}
}
}
Expand Down
133 changes: 132 additions & 1 deletion server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ type snapshotCache interface {
SnapCacheHitCount() int
}

func TestPrefetchPointKeys(t *testing.T) {
func TestPrefetchPointKeys4Update(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
Expand Down Expand Up @@ -832,6 +832,137 @@ func TestPrefetchPointKeys(t *testing.T) {
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}

func TestPrefetchPointKeys4Delete(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
cc.setCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int, b int, c int, primary key (a, b))")
tk.MustExec("insert prefetch values (1, 1, 1), (2, 2, 2), (3, 3, 3)")
tk.MustExec("insert prefetch values (4, 4, 4), (5, 5, 5), (6, 6, 6)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 2 and b = 2")

// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a = 1 and b = 1;" +
"delete from prefetch where a = 2 and b = 2;" +
"delete from prefetch where a = 3 and b = 3;"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 4, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("4 4 4", "5 5 5", "6 6 6"))

tk.MustExec("begin pessimistic")
tk.MustExec("delete from prefetch where a = 5 and b = 5")
require.Equal(t, 2, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
query = "delete from prefetch where a = 4 and b = 4;" +
"delete from prefetch where a = 5 and b = 5;" +
"delete from prefetch where a = 6 and b = 6;"
err = cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
require.Equal(t, 6, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}

func TestPrefetchBatchPointGet(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
cc.setCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int primary key, b int)")
tk.MustExec("insert prefetch values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 1")

// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a in (2,3);" +
"delete from prefetch where a in (4,5);"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
//nolint:forcetypeassert
require.Equal(t, 4, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}

func TestPrefetchPartitionTable(t *testing.T) {
store := testkit.CreateMockStore(t)

cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
cc.setCtx(&TiDBContext{Session: tk.Session()})
ctx := context.Background()
tk.MustExec("use test")
tk.MustExec("create table prefetch (a int primary key, b int) partition by hash(a) partitions 4")
tk.MustExec("insert prefetch values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch where a = 1")

// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch where a = 2;" +
"delete from prefetch where a = 3;" +
"delete from prefetch where a in (4,5);"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
// TODO: support it later
//nolint:forcetypeassert
require.Equal(t, 2, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows())
}

func TestTiFlashFallback(t *testing.T) {
store := testkit.CreateMockStore(t,
mockstore.WithClusterInspector(func(c testutils.Cluster) {
Expand Down

0 comments on commit d9149b0

Please sign in to comment.