Skip to content

Commit

Permalink
add bindvars and offset planning to semi join
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui committed Aug 10, 2023
1 parent d007297 commit fd57d06
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 334 deletions.
24 changes: 21 additions & 3 deletions go/vt/vtgate/planbuilder/operators/semi_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package operators
import (
"golang.org/x/exp/maps"

"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
Expand All @@ -43,9 +45,10 @@ type SemiJoin struct {
// Clone implements the Operator interface
func (c *SemiJoin) Clone(inputs []ops.Operator) ops.Operator {
return &SemiJoin{
LHS: inputs[0],
RHS: inputs[1],
Vars: maps.Clone(c.Vars),
LHS: inputs[0],
RHS: inputs[1],
Vars: maps.Clone(c.Vars),
bindVars: maps.Clone(c.bindVars),
}
}

Expand Down Expand Up @@ -82,3 +85,18 @@ func (c *SemiJoin) GetColumns(ctx *plancontext.PlanningContext) ([]*sqlparser.Al
func (c *SemiJoin) GetSelectExprs(ctx *plancontext.PlanningContext) (sqlparser.SelectExprs, error) {
return c.LHS.GetSelectExprs(ctx)
}

func (c *SemiJoin) planOffsets(ctx *plancontext.PlanningContext) error {
c.Vars = make(map[string]int, len(c.bindVars))
for bvname, col := range c.bindVars {
coloffset, err := c.LHS.AddColumns(ctx, true, []bool{false}, []*sqlparser.AliasedExpr{aeWrap(col)})
if err != nil {
return err
}
if len(coloffset) != 1 {
return vterrors.VT13001("should have only one column offset")
}
c.Vars[bvname] = coloffset[0]
}
return nil
}
29 changes: 14 additions & 15 deletions go/vt/vtgate/planbuilder/operators/subquery_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,43 +338,42 @@ func createSemiJoin(
}

extractor := getExtractor(ctx, newOuter)
f := &Filter{
Source: rhsOp,
}

for _, pred := range preds {
sqlparser.SafeRewrite(pred, nil, extractor.visitExpr)
var err error
rhsOp, err = rhsOp.AddPredicate(ctx, pred)
if err != nil {
return nil, err
}
f.Predicates = append(f.Predicates, pred)
}

// We just a single row from the RHS
limitInner := &Limit{
Source: rhsOp,
Source: f,
AST: &sqlparser.Limit{Rowcount: sqlparser.NewIntLiteral("1")},
Pushed: false,
}

return &SemiJoin{
LHS: newOuter,
RHS: limitInner,
Vars: extractor.vars,
LHS: newOuter,
RHS: limitInner,
bindVars: extractor.bindVars,
}, nil
}

func getExtractor(ctx *plancontext.PlanningContext, outer ops.Operator) *varsExtractor {
return &varsExtractor{
ctx: ctx,
vars: map[string]int{},
bindVars: map[*sqlparser.ColName]string{},
bindVars: map[string]*sqlparser.ColName{},
outer: outer,
rhs: TableID(outer),
}
}

// select id from user where exists (select 1 from music where user.id = foo)
type varsExtractor struct {
ctx *plancontext.PlanningContext
vars map[string]int
bindVars map[*sqlparser.ColName]string
bindVars map[string]*sqlparser.ColName
rhs semantics.TableSet
outer ops.Operator
}
Expand All @@ -393,7 +392,7 @@ func (ve *varsExtractor) visitExpr(cursor *sqlparser.Cursor) bool {
// check whether the bindVariable already exists in the map
// we do so by checking that the column names are the same and their recursive dependencies are the same
// so the column names `user.a` and `a` would be considered equal as long as both are bound to the same table
for colName, bindVar := range ve.bindVars {
for bindVar, colName := range ve.bindVars {
if ve.ctx.SemTable.EqualsExprWithDeps(node, colName) {
cursor.Replace(sqlparser.NewArgument(bindVar))
return true
Expand All @@ -405,7 +404,7 @@ func (ve *varsExtractor) visitExpr(cursor *sqlparser.Cursor) bool {
bindVar := ve.ctx.ReservedVars.ReserveColName(node)
cursor.Replace(sqlparser.NewTypedArgument(bindVar, typ))
// store it in the map for future comparisons
ve.bindVars[node] = bindVar
ve.bindVars[bindVar] = node
return true
}

Expand Down
198 changes: 97 additions & 101 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1677,53 +1677,54 @@
}
},
{
"comment": "correlated subquery in exists clause with an ordering",
"query": "select col, id from user where exists(select user_id from user_extra where user_id = 3 and user_id < user.id) order by id",
"plan": {
"QueryType": "SELECT",
"Original": "select col, id from user where exists(select user_id from user_extra where user_id = 3 and user_id < user.id) order by id",
"Instructions": {
"OperatorType": "SemiJoin",
"JoinVars": {
"user_id": 0
},
"ProjectedIndexes": "[1, 0]",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.id, col, weight_string(id) from `user` where 1 != 1",
"OrderBy": "(0|2) ASC",
"Query": "select `user`.id, col, weight_string(id) from `user` order by id asc",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
"QueryType": "SELECT",
"Original": "select col, id from user where exists(select user_id from user_extra where user_id = 3 and user_id < user.id) order by id",
"Instructions": {
"OperatorType": "SimpleProjection",
"Columns": [
0,
1
],
"Inputs": [
{
"OperatorType": "SemiJoin",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select col, id, weight_string(id) from `user` where 1 != 1",
"OrderBy": "(1|2) ASC",
"Query": "select col, id, weight_string(id) from `user` order by id asc",
"Table": "`user`"
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra where user_id = 3 and user_id < :user_id",
"Table": "user_extra",
"Values": [
"INT64(3)"
],
"Vindex": "user_index"
}
]
},
"TablesUsed": [
"user.user",
"user.user_extra"
]
}
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra where user_id = 3 and user_id < :user_id limit 1",
"Table": "user_extra",
"Values": [
"INT64(3)"
],
"Vindex": "user_index"
}
]
}
]
},
"TablesUsed": [
"user.user",
"user.user_extra"
]
},
{
"comment": "Column and Literal equality filter on scatter aggregates",
Expand Down Expand Up @@ -2532,62 +2533,57 @@
}
},
{
"comment": "aggregation on top of semijoin",
"query": "select count(*) from user where exists (select 0 from user_extra where user.apa = user_extra.bar)",
"plan": {
"QueryType": "SELECT",
"Original": "select count(*) from user where exists (select 0 from user_extra where user.apa = user_extra.bar)",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "sum_count_star(0) AS count(*)",
"Inputs": [
{
"OperatorType": "Projection",
"Expressions": [
"[COLUMN 1] as count(*)"
],
"Inputs": [
{
"OperatorType": "SemiJoin",
"JoinVars": {
"user_apa": 0
},
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select `user`.apa, count(*), weight_string(`user`.apa) from `user` where 1 != 1 group by `user`.apa, weight_string(`user`.apa)",
"Query": "select `user`.apa, count(*), weight_string(`user`.apa) from `user` group by `user`.apa, weight_string(`user`.apa)",
"Table": "`user`"
"QueryType": "SELECT",
"Original": "select count(*) from user where exists (select 0 from user_extra where user.apa = user_extra.bar)",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "sum_count_star(0) AS count(*)",
"ResultColumns": 1,
"Inputs": [
{
"OperatorType": "SemiJoin",
"JoinVars": {
"user_apa": 1
},
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*), `user`.apa from `user` where 1 != 1 group by `user`.apa",
"Query": "select count(*), `user`.apa from `user` group by `user`.apa",
"Table": "`user`"
},
{
"OperatorType": "Limit",
"Count": "INT64(1)",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra where user_extra.bar = :user_apa",
"Table": "user_extra"
}
]
}
]
}
]
},
"TablesUsed": [
"user.user",
"user.user_extra"
]
}
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra where user_extra.bar = :user_apa limit :__upper_limit",
"Table": "user_extra"
}
]
}
]
}
]
},
"TablesUsed": [
"user.user",
"user.user_extra"
]
},
{
"comment": "we have to track the order of distinct aggregation expressions",
Expand Down
Loading

0 comments on commit fd57d06

Please sign in to comment.