Skip to content

Commit

Permalink
Optimize with IN Clause for UPDATE/DELETE Statements on Vindexes (#15455
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wangweicugw authored Apr 1, 2024
1 parent e5eb981 commit add3887
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 44 deletions.
68 changes: 68 additions & 0 deletions go/test/endtoend/vtgate/queries/benchmark/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,33 @@ func (tq *testQuery) getInsertQuery(rows int) string {
return fmt.Sprintf("insert into %s(%s) values %s", tq.tableName, strings.Join(tq.cols, ","), strings.Join(allRows, ","))
}

func (tq *testQuery) getUpdateQuery(rows int) string {
var allRows []string
var row []string
for i, isInt := range tq.intTyp {
if isInt {
row = append(row, strconv.Itoa(i))
continue
}
row = append(row, tq.cols[i]+" = '"+getRandomString(50)+"'")
}
allRows = append(allRows, strings.Join(row, ","))

var ids []string
for i := 0; i <= rows; i++ {
ids = append(ids, strconv.Itoa(i))
}
return fmt.Sprintf("update %s set %s where id in (%s)", tq.tableName, strings.Join(allRows, ","), strings.Join(ids, ","))
}

func (tq *testQuery) getDeleteQuery(rows int) string {
var ids []string
for i := 0; i <= rows; i++ {
ids = append(ids, strconv.Itoa(i))
}
return fmt.Sprintf("delete from %s where id in (%s)", tq.tableName, strings.Join(ids, ","))
}

func getRandomString(size int) string {
var str strings.Builder

Expand Down Expand Up @@ -78,3 +105,44 @@ func BenchmarkShardedTblNoLookup(b *testing.B) {
})
}
}

func BenchmarkShardedTblUpdateIn(b *testing.B) {
conn, closer := start(b)
defer closer()

cols := []string{"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11", "c12"}
intType := make([]bool, len(cols))
tq := &testQuery{
tableName: "tbl_no_lkp_vdx",
cols: cols,
intTyp: intType,
}
insStmt := tq.getInsertQuery(10000)
_ = utils.Exec(b, conn, insStmt)
for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} {
updStmt := tq.getUpdateQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, updStmt)
}
})
}
}

func BenchmarkShardedTblDeleteIn(b *testing.B) {
conn, closer := start(b)
defer closer()
tq := &testQuery{
tableName: "tbl_no_lkp_vdx",
}
for _, rows := range []int{1, 10, 100, 500, 1000, 5000, 10000} {
insStmt := tq.getInsertQuery(rows)
_ = utils.Exec(b, conn, insStmt)
delStmt := tq.getDeleteQuery(rows)
b.Run(fmt.Sprintf("16-shards-%d-rows", rows), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = utils.Exec(b, conn, delStmt)
}
})
}
}
51 changes: 49 additions & 2 deletions go/test/endtoend/vtgate/queries/dml/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package dml
import (
"testing"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/utils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/utils"
)

func TestMultiEqual(t *testing.T) {
Expand Down Expand Up @@ -363,3 +365,48 @@ func TestMultiTargetUpdate(t *testing.T) {
mcmp.AssertMatches(`select oid, ename from oevent_tbl order by oid`,
`[[INT64(1) VARCHAR("a")] [INT64(2) VARCHAR("xyz")] [INT64(3) VARCHAR("a")] [INT64(4) VARCHAR("a")]]`)
}

// TestDMLInUnique for update/delete statement using an IN clause with the Vindexes,
// the query is correctly split according to the corresponding values in the IN list.
func TestDMLInUnique(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 20, "vtgate")

mcmp, closer := start(t)
defer closer()

// initial rows
mcmp.Exec("insert into user_tbl(id, region_id, `name`) values (1,1,'a'),(2,2,'a'),(3,3,'a'),(4,4,'a'),(5,5,'a'),(6,6,'a')")

qr := mcmp.Exec("update user_tbl set `name` = 'b' where region_id in (1,2,3,4,5,6)")
assert.EqualValues(t, 6, qr.RowsAffected)
qr = mcmp.Exec("delete from user_tbl where region_id in (1,2,3,4,5,6)")
assert.EqualValues(t, 6, qr.RowsAffected)
mcmp.Exec("insert into user_tbl(id, region_id, `name`) values (1,1,'a'),(2,2,'a'),(3,3,'a'),(4,4,'a'),(5,5,'a'),(6,6,'a')")

assertVExplainEquals := func(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := utils.Exec(t, conn, query)
// strip the first column from each row as it is not deterministic in a VExplain query
for i := range qr.Rows {
qr.Rows[i] = qr.Rows[i][1:]
}
if err := sqltypes.RowsEqualsStr(expected, qr.Rows); err != nil {
t.Error(err)
}
}
expected := `[
[VARCHAR("sks") VARCHAR("-80") VARCHAR("begin")]
[VARCHAR("sks") VARCHAR("-80") VARCHAR("update user_tbl set ` + "`name`" + ` = 'b' where region_id in (1, 2, 3, 5)")]
[VARCHAR("sks") VARCHAR("80-") VARCHAR("begin")]
[VARCHAR("sks") VARCHAR("80-") VARCHAR("update user_tbl set ` + "`name`" + ` = 'b' where region_id in (4, 6)")]
]`
assertVExplainEquals(t, mcmp.VtConn, "vexplain /*vt+ EXECUTE_DML_QUERIES */ queries update user_tbl set `name` = 'b' where region_id in (1,2,3,4,5,6)", expected)

expected = `[
[VARCHAR("sks") VARCHAR("-80") VARCHAR("begin")]
[VARCHAR("sks") VARCHAR("-80") VARCHAR("delete from user_tbl where region_id in (1, 2, 3, 5)")]
[VARCHAR("sks") VARCHAR("80-") VARCHAR("begin")]
[VARCHAR("sks") VARCHAR("80-") VARCHAR("delete from user_tbl where region_id in (4, 6)")]
]`
assertVExplainEquals(t, mcmp.VtConn, "vexplain /*vt+ EXECUTE_DML_QUERIES */ queries delete from user_tbl where region_id in (1,2,3,4,5,6)", expected)
}
9 changes: 6 additions & 3 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,10 @@ func TestAutocommitDeleteIn(t *testing.T) {
require.NoError(t, err)

assertQueries(t, sbc1, []*querypb.BoundQuery{{
Sql: "delete from user_extra where user_id in (1, 2)",
BindVariables: map[string]*querypb.BindVariable{},
Sql: "delete from user_extra where user_id in ::__vals",
BindVariables: map[string]*querypb.BindVariable{
"__vals": sqltypes.TestBindVariable([]any{int64(1), int64(2)}),
},
}})
testCommitCount(t, "sbc1", sbc1, 0)

Expand Down Expand Up @@ -391,11 +393,12 @@ func TestAutocommitTransactionStarted(t *testing.T) {

// multi shard query - savepoint needed
sql = "update `user` set a = 2 where id in (1, 4)"
expectedSql := "update `user` set a = 2 where id in ::__vals"
_, err = executor.Execute(context.Background(), nil, "TestExecute", NewSafeSession(session), sql, map[string]*querypb.BindVariable{})
require.NoError(t, err)
require.Len(t, sbc1.Queries, 2)
require.Contains(t, sbc1.Queries[0].Sql, "savepoint")
require.Equal(t, sql, sbc1.Queries[1].Sql)
require.Equal(t, expectedSql, sbc1.Queries[1].Sql)
testCommitCount(t, "sbc1", sbc1, 0)
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (del *Delete) TryExecute(ctx context.Context, vcursor VCursor, bindVars map
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, del.QueryTimeout)
defer cancelFunc()

rss, _, err := del.findRoute(ctx, vcursor, bindVars)
rss, bvs, err := del.findRoute(ctx, vcursor, bindVars)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +58,7 @@ func (del *Delete) TryExecute(ctx context.Context, vcursor VCursor, bindVars map
case Unsharded:
return del.execUnsharded(ctx, del, vcursor, bindVars, rss)
case Equal, IN, Scatter, ByDestination, SubShard, EqualUnique, MultiEqual:
return del.execMultiDestination(ctx, del, vcursor, bindVars, rss, del.deleteVindexEntries)
return del.execMultiDestination(ctx, del, vcursor, bindVars, rss, del.deleteVindexEntries, bvs)
default:
// Unreachable.
return nil, fmt.Errorf("unsupported opcode: %v", del.Opcode)
Expand Down
35 changes: 34 additions & 1 deletion go/vt/vtgate/engine/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func TestDeleteInChangedVindexMultiCol(t *testing.T) {
`Execute delete from lkp_rg_tbl where from = :from and toc = :toc from: type:INT64 value:"6" toc: type:VARBINARY value:"\x01N\xb1\x90ɢ\xfa\x16\x9c" true`,
`Execute delete from lkp_rg_tbl where from = :from and toc = :toc from: type:INT64 value:"7" toc: type:VARBINARY value:"\x02N\xb1\x90ɢ\xfa\x16\x9c" true`,
// Finally, the actual delete, which is also sent to -20, same route as the subquery.
`ExecuteMultiShard sharded.-20: dummy_update {} true true`,
`ExecuteMultiShard sharded.-20: dummy_update {__vals0: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"}} true true`,
})
}

Expand Down Expand Up @@ -611,3 +611,36 @@ func TestDeleteMultiEqual(t *testing.T) {
`ExecuteMultiShard sharded.-20: dummy_delete {} sharded.20-: dummy_delete {} true false`,
})
}

// TestDeleteInUnique is a test function for delete statement using an IN clause with the Vindexes,
// the query is correctly split according to the corresponding values in the IN list.
func TestDeleteInUnique(t *testing.T) {
ks := buildTestVSchema().Keyspaces["sharded"]
upd := &Delete{
DML: &DML{
RoutingParameters: &RoutingParameters{
Opcode: IN,
Keyspace: ks.Keyspace,
Vindex: ks.Vindexes["hash"],
Values: []evalengine.Expr{evalengine.TupleExpr{
evalengine.NewLiteralInt(1),
evalengine.NewLiteralInt(2),
evalengine.NewLiteralInt(4),
}}},
Query: "delete t where id in ::__vals",
},
}

tupleBV := &querypb.BindVariable{
Type: querypb.Type_TUPLE,
Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))),
}
vc := newDMLTestVCursor("-20", "20-")
vc.shardForKsid = []string{"-20", "20-"}
_, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"4"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(d2fd8867d50d2dfe)`,
`ExecuteMultiShard sharded.-20: delete t where id in ::__vals {__vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"4"}} sharded.20-: delete t where id in ::__vals {__vals: type:TUPLE values:{type:INT64 value:"2"}} true false`,
})
}
5 changes: 3 additions & 2 deletions go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ func (dml *DML) execUnsharded(ctx context.Context, primitive Primitive, vcursor
return execShard(ctx, primitive, vcursor, dml.Query, bindVars, rss[0], true /* rollbackOnError */, !dml.PreventAutoCommit /* canAutocommit */)
}

func (dml *DML) execMultiDestination(ctx context.Context, primitive Primitive, vcursor VCursor, bindVars map[string]*querypb.BindVariable, rss []*srvtopo.ResolvedShard, dmlSpecialFunc func(context.Context, VCursor, map[string]*querypb.BindVariable, []*srvtopo.ResolvedShard) error) (*sqltypes.Result, error) {
func (dml *DML) execMultiDestination(ctx context.Context, primitive Primitive, vcursor VCursor, bindVars map[string]*querypb.BindVariable, rss []*srvtopo.ResolvedShard, dmlSpecialFunc func(context.Context, VCursor,
map[string]*querypb.BindVariable, []*srvtopo.ResolvedShard) error, bvs []map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
if len(rss) == 0 {
return &sqltypes.Result{}, nil
}
Expand All @@ -90,7 +91,7 @@ func (dml *DML) execMultiDestination(ctx context.Context, primitive Primitive, v
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: dml.Query,
BindVariables: bindVars,
BindVariables: bvs[i],
}
}
return execMultiShard(ctx, primitive, vcursor, rss, queries, dml.MultiShardAutocommit)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/dml_with_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestDeleteWithMultiTarget(t *testing.T) {
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_1 {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true true`,
`ExecuteMultiShard ks.-20: dummy_delete_1 {__vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"} dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true true`,
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_2 {dml_vals: type:TUPLE values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x011"} values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x012"} values:{type:TUPLE value:"\x89\x02\x03200\x89\x02\x013"}} true true`,
})
Expand All @@ -175,7 +175,7 @@ func TestDeleteWithMultiTarget(t *testing.T) {
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_1 {dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true true`,
`ExecuteMultiShard ks.-20: dummy_delete_1 {__vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"} dml_vals: type:TUPLE values:{type:INT64 value:"1"} values:{type:INT64 value:"2"} values:{type:INT64 value:"3"}} true true`,
`ResolveDestinations ks [type:INT64 value:"1" type:INT64 value:"2" type:INT64 value:"3"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`,
`ExecuteMultiShard ks.-20: dummy_delete_2 {dml_vals: type:TUPLE values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x011"} values:{type:TUPLE value:"\x89\x02\x03100\x89\x02\x012"} values:{type:TUPLE value:"\x89\x02\x03200\x89\x02\x013"}} true true`,
})
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (upd *Update) TryExecute(ctx context.Context, vcursor VCursor, bindVars map
ctx, cancelFunc := addQueryTimeout(ctx, vcursor, upd.QueryTimeout)
defer cancelFunc()

rss, _, err := upd.findRoute(ctx, vcursor, bindVars)
rss, bvs, err := upd.findRoute(ctx, vcursor, bindVars)
if err != nil {
return nil, err
}
Expand All @@ -69,7 +69,7 @@ func (upd *Update) TryExecute(ctx context.Context, vcursor VCursor, bindVars map
case Unsharded:
return upd.execUnsharded(ctx, upd, vcursor, bindVars, rss)
case Equal, EqualUnique, IN, Scatter, ByDestination, SubShard, MultiEqual:
return upd.execMultiDestination(ctx, upd, vcursor, bindVars, rss, upd.updateVindexEntries)
return upd.execMultiDestination(ctx, upd, vcursor, bindVars, rss, upd.updateVindexEntries, bvs)
default:
// Unreachable.
return nil, fmt.Errorf("unsupported opcode: %v", upd.Opcode)
Expand Down
Loading

0 comments on commit add3887

Please sign in to comment.