Skip to content

Commit

Permalink
server: prefetch for DELETE when multi-statement
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Nov 3, 2021
1 parent a96deab commit f89ac80
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 2 deletions.
6 changes: 6 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,12 @@ func ResetUpdateStmtCtx(sc *stmtctx.StatementContext, stmt *ast.UpdateStmt, vars
sc.IgnoreNoPartition = stmt.IgnoreErr
}

// ResetDeleteStmtCtx resets statement context for DeleteStmt.
func ResetDeleteStmtCtx(sc *stmtctx.StatementContext, stmt *ast.DeleteStmt) {
sc.InDeleteStmt = true
sc.Priority = stmt.Priority
}

// FillVirtualColumnValue will calculate the virtual column value by evaluating generated
// expression using rows from a chunk, and then fill this value into the chunk
func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnIndex []int,
Expand Down
19 changes: 18 additions & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
if p == nil {
continue
}
// Only support Update for now.
// Only support Update and Delete for now.
// TODO: support other point plans.
switch x := p.(type) {
case *plannercore.Update:
Expand All @@ -1861,6 +1861,23 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(pp.TblInfo.ID, pp.Handle))
}
}
case *plannercore.Delete:
deleteStmt := stmt.(*ast.DeleteStmt)
if pp, ok := x.SelectPlan.(*plannercore.PointGetPlan); ok {
if pp.PartitionInfo != nil {
continue
}
if pp.IndexInfo != nil {
executor.ResetDeleteStmtCtx(sc, deleteStmt)
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.ctx, pp.TblInfo, pp.IndexInfo, pp.IndexValues, pp.TblInfo.ID)
if err1 != nil {
return nil, err1
}
idxKeys = append(idxKeys, idxKey)
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(pp.TblInfo.ID, pp.Handle))
}
}
}
}
if len(idxKeys) == 0 && len(rowKeys) == 0 {
Expand Down
56 changes: 55 additions & 1 deletion server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ type snapshotCache interface {
SnapCacheHitCount() int
}

func TestPrefetchPointKeys(t *testing.T) {
func TestPrefetchPointKeys4Update(t *testing.T) {
t.Parallel()

store, clean := testkit.CreateMockStore(t)
Expand Down Expand Up @@ -739,6 +739,60 @@ func TestPrefetchPointKeys(t *testing.T) {
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}

func TestPrefetchPointKeys4Delete(t *testing.T) {
t.Parallel()

store, clean := testkit.CreateMockStore(t)
defer clean()

cc := &clientConn{
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
tk := testkit.NewTestKit(t, store)
cc.ctx = &TiDBContext{Session: tk.Session()}
ctx := context.Background()
tk.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly
tk.MustExec("use test")
tk.MustExec("create table prefetch2 (a int, b int, c int, primary key (a, b))")
tk.MustExec("insert prefetch2 values (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6)")
tk.MustExec("begin optimistic")
tk.MustExec("delete from prefetch2 where a = 1 and b = 1")

// enable multi-statement
capabilities := cc.ctx.GetSessionVars().ClientCapability
capabilities ^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(capabilities)
query := "delete from prefetch2 where a = 2 and b = 2;" +
"delete from prefetch2 where a = 3 and b = 3;"
err := cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err := tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
snap := txn.GetSnapshot()
require.Equal(t, 4, snap.(snapshotCache).SnapCacheHitCount())
tk.MustExec("commit")
tk.MustQuery("select * from prefetch2").Check(testkit.Rows("4 4 4", "5 5 5", "6 6 6"))

tk.MustExec("begin pessimistic")
tk.MustExec("delete from prefetch2 where a = 4 and b = 4")
require.Equal(t, 1, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
query = "delete from prefetch2 where a = 5 and b = 5;" +
"delete from prefetch2 where a = 6 and b = 6;"
err = cc.handleQuery(ctx, query)
require.NoError(t, err)
txn, err = tk.Session().Txn(false)
require.NoError(t, err)
require.True(t, txn.Valid())
require.Equal(t, 5, tk.Session().GetSessionVars().TxnCtx.PessimisticCacheHit)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch2").Check(testkit.Rows())
}

func testGetTableByName(t *testing.T, ctx sessionctx.Context, db, table string) table.Table {
dom := domain.GetDomain(ctx)
// Make sure the table schema is the new schema.
Expand Down

0 comments on commit f89ac80

Please sign in to comment.