Skip to content

Commit

Permalink
planner: Handle ORDER BY inside derived tables (#16353)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
Signed-off-by: Andres Taylor <andres@planetscale.com>
Co-authored-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
systay and GuptaManan100 committed Jul 10, 2024
1 parent 795a7e5 commit c2d9d4c
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ func TestOrderByCount(t *testing.T) {

mcmp.Exec("insert into t9(id1, id2, id3) values(1, '1', '1'), (2, '2', '2'), (3, '2', '2'), (4, '3', '3'), (5, '3', '3'), (6, '3', '3')")

mcmp.AssertMatches("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC", `[[VARCHAR("3")] [VARCHAR("2")] [VARCHAR("1")]]`)
mcmp.Exec("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC")
mcmp.Exec("select COUNT(*) from (select 1 as one FROM t9 WHERE id3 = 3 ORDER BY id1 DESC LIMIT 3 OFFSET 0) subquery_for_count")
}

func TestAggregateAnyValue(t *testing.T) {
Expand Down
8 changes: 0 additions & 8 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,6 @@ func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e
return offset
}

func (a *Aggregator) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, groupby []bool, exprs []*sqlparser.AliasedExpr) (offsets []int) {
for i, ae := range exprs {
offset := a.addColumnWithoutPushing(ctx, ae, groupby[i])
offsets = append(offsets, offset)
}
return
}

func (a *Aggregator) isDerived() bool {
return a.DT != nil
}
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,18 @@ func expandUnionHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, unio
}

func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel *sqlparser.Select) (Operator, *ApplyResult) {
op := createProjectionFromSelect(ctx, horizon)
qp := horizon.getQP(ctx)
var extracted []string

if horizon.IsDerived() {
// if we are dealing with a derived table, we need to make sure that the ordering columns
// are available outside the derived table
for _, order := range horizon.Query.GetOrderBy() {
qp.addColumn(ctx, order.Expr)
}
}

op := createProjectionFromSelect(ctx, horizon)
if qp.HasAggr {
extracted = append(extracted, "Aggregation")
} else {
Expand Down
9 changes: 0 additions & 9 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,6 @@ func (p *Projection) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e
return p.addColumn(ctx, true, false, expr, false)
}

func (p *Projection) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, _ []bool, exprs []*sqlparser.AliasedExpr) []int {
offsets := make([]int, len(exprs))
for idx, expr := range exprs {
offset := p.addColumn(ctx, reuse, false, expr, false)
offsets[idx] = offset
}
return offsets
}

func (p *Projection) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int {
cols, aliased := p.Columns.(AliasedProjections)
if !aliased {
Expand Down
17 changes: 17 additions & 0 deletions go/vt/vtgate/planbuilder/operators/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,23 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont
return true
}

// addColumn adds a column to the QueryProjection if it is not already present
func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
for _, selectExpr := range qp.SelectExprs {
getExpr, err := selectExpr.GetExpr()
if err != nil {
continue
}
if ctx.SemTable.EqualsExprWithDeps(getExpr, expr) {
return
}
}
qp.SelectExprs = append(qp.SelectExprs, SelectExpr{
Col: aeWrap(expr),
Aggr: ContainsAggr(ctx, expr),
})
}

func checkForInvalidGroupingExpressions(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) {
if IsAggr(ctx, node) {
Expand Down
100 changes: 46 additions & 54 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,96 +587,87 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool,

// if at least one column is not already present, we check if we can easily find a projection
// or aggregation in our source that we can add to
derived, op, ok, offsets := addMultipleColumnsToInput(ctx, r.Source, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr})
r.Source = op
if ok {
return offsets[0]
derived, op, offset := addColumnToInput(ctx, r.Source, expr, reuse, gb)
if op != nil {
r.Source = op
}
if offset >= 0 {
return offset
}

// If no-one could be found, we probably don't have one yet, so we add one here
src := createProjection(ctx, r.Source, derived)
r.Source = src

offsets = src.addColumnsWithoutPushing(ctx, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr})
return offsets[0]
return src.addColumnWithoutPushing(ctx, expr, gb)
}

type selectExpressions interface {
Operator
addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int
addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) []int
derivedName() string
}

// addColumnToInput adds columns to an operator without pushing them down
func addMultipleColumnsToInput(
func addColumnToInput(
ctx *plancontext.PlanningContext,
operator Operator,
reuse bool,
addToGroupBy []bool,
exprs []*sqlparser.AliasedExpr,
) (derivedName string, // if we found a derived table, this will contain its name
expr *sqlparser.AliasedExpr,
reuse, addToGroupBy bool,
) (
derivedName string, // if we found a derived table, this will contain its name
projection Operator, // if an operator needed to be built, it will be returned here
found bool, // whether a matching op was found or not
offsets []int, // the offsets the expressions received
offset int, // the offset of the expression, -1 if not found
) {
var src Operator
var updateSrc func(Operator)
switch op := operator.(type) {
case *SubQuery:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Outer, reuse, addToGroupBy, exprs)
if added {
op.Outer = src
}
return derivedName, op, added, offset

// Pass through operators - we can just add the columns to their source
case *SubQuery:
src, updateSrc = op.Outer, func(newSrc Operator) { op.Outer = newSrc }
case *Distinct:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *Limit:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *Ordering:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
}
return derivedName, op, added, offset

src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }
case *LockAndComment:
derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs)
if added {
op.Source = src
src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc }

// Union needs special handling, we can't really add new columns to all inputs
case *Union:
proj := wrapInDerivedProjection(ctx, op)
dtName, newOp, offset := addColumnToInput(ctx, proj, expr, reuse, addToGroupBy)
if newOp == nil {
newOp = proj
}
return derivedName, op, added, offset
return dtName, newOp, offset

// Horizon is another one of these - we can't really add new columns to it
case *Horizon:
// if the horizon has an alias, then it is a derived table,
// we have to add a new projection and can't build on this one
return op.Alias, op, false, nil
return op.Alias, nil, -1

case selectExpressions:
name := op.derivedName()
if name != "" {
// if the only thing we can push to is a derived table,
// we have to add a new projection and can't build on this one
return name, op, false, nil
return name, nil, -1
}
offset := op.addColumnsWithoutPushing(ctx, reuse, addToGroupBy, exprs)
return "", op, true, offset
offset := op.addColumnWithoutPushing(ctx, expr, addToGroupBy)
return "", nil, offset

case *Union:
proj := addDerivedProj(ctx, op)
return addMultipleColumnsToInput(ctx, proj, reuse, addToGroupBy, exprs)
default:
return "", op, false, nil
return "", nil, -1
}

// Handle the case where we have a pass-through operator
derivedName, src, offset = addColumnToInput(ctx, src, expr, reuse, addToGroupBy)
if src != nil {
updateSrc(src)
}
return derivedName, nil, offset
}

func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool) int {
Expand All @@ -691,7 +682,7 @@ func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool

ok, foundOffset := addWSColumnToInput(ctx, r.Source, offset)
if !ok {
src := addDerivedProj(ctx, r.Source)
src := wrapInDerivedProjection(ctx, r.Source)
r.Source = src
return src.AddWSColumn(ctx, offset, true)
}
Expand All @@ -714,7 +705,8 @@ func addWSColumnToInput(ctx *plancontext.PlanningContext, source Operator, offse
return false, -1
}

func addDerivedProj(
// wrapInDerivedProjection wraps the input in a derived table projection named "dt"
func wrapInDerivedProjection(
ctx *plancontext.PlanningContext,
op Operator,
) (projection *Projection) {
Expand Down
43 changes: 43 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -7072,5 +7072,48 @@
"user.user"
]
}
},
{
"comment": "Aggregation over a ORDER BY/LIMIT inside a derived table",
"query": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count",
"plan": {
"QueryType": "SELECT",
"Original": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "count_star(0) AS count(*)",
"Inputs": [
{
"OperatorType": "SimpleProjection",
"Columns": "2",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "25",
"Offset": "0",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1",
"OrderBy": "(1|3) DESC",
"Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit 25",
"Table": "`user`"
}
]
}
]
}
]
},
"TablesUsed": [
"user.user"
]
}
}
]

0 comments on commit c2d9d4c

Please sign in to comment.