diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index e1f91ee2e08..4057b1bdee1 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -442,7 +442,10 @@ func TestOrderByCount(t *testing.T) { mcmp.Exec("insert into t9(id1, id2, id3) values(1, '1', '1'), (2, '2', '2'), (3, '2', '2'), (4, '3', '3'), (5, '3', '3'), (6, '3', '3')") - mcmp.AssertMatches("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC", `[[VARCHAR("3")] [VARCHAR("2")] [VARCHAR("1")]]`) + mcmp.Exec("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC") + if utils.BinaryIsAtLeastAtVersion(20, "vtgate") { + mcmp.Exec("select COUNT(*) from (select 1 as one FROM t9 WHERE id3 = 3 ORDER BY id1 DESC LIMIT 3 OFFSET 0) subquery_for_count") + } } func TestAggregateAnyValue(t *testing.T) { diff --git a/go/vt/vtgate/planbuilder/operators/aggregator.go b/go/vt/vtgate/planbuilder/operators/aggregator.go index e9fee905024..91eca26c0ea 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregator.go +++ b/go/vt/vtgate/planbuilder/operators/aggregator.go @@ -112,14 +112,6 @@ func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e return offset } -func (a *Aggregator) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, groupby []bool, exprs []*sqlparser.AliasedExpr) (offsets []int) { - for i, ae := range exprs { - offset := a.addColumnWithoutPushing(ctx, ae, groupby[i]) - offsets = append(offsets, offset) - } - return -} - func (a *Aggregator) isDerived() bool { return a.DT != nil } diff --git a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go index 3f7700eed9d..3b934752a00 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go @@ -75,9 +75,18 @@ func expandUnionHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, unio } func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel *sqlparser.Select) (Operator, *ApplyResult) { - op := createProjectionFromSelect(ctx, horizon) qp := horizon.getQP(ctx) var extracted []string + + if horizon.IsDerived() { + // if we are dealing with a derived table, we need to make sure that the ordering columns + // are available outside the derived table + for _, order := range horizon.Query.GetOrderBy() { + qp.addColumn(ctx, order.Expr) + } + } + + op := createProjectionFromSelect(ctx, horizon) if qp.HasAggr { extracted = append(extracted, "Aggregation") } else { diff --git a/go/vt/vtgate/planbuilder/operators/projection.go b/go/vt/vtgate/planbuilder/operators/projection.go index 1f380ba736b..5f79f315d7e 100644 --- a/go/vt/vtgate/planbuilder/operators/projection.go +++ b/go/vt/vtgate/planbuilder/operators/projection.go @@ -308,15 +308,6 @@ func (p *Projection) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e return p.addColumn(ctx, true, false, expr, false) } -func (p *Projection) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, _ []bool, exprs []*sqlparser.AliasedExpr) []int { - offsets := make([]int, len(exprs)) - for idx, expr := range exprs { - offset := p.addColumn(ctx, reuse, false, expr, false) - offsets[idx] = offset - } - return offsets -} - func (p *Projection) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int { cols, aliased := p.Columns.(AliasedProjections) if !aliased { diff --git a/go/vt/vtgate/planbuilder/operators/queryprojection.go b/go/vt/vtgate/planbuilder/operators/queryprojection.go index 5729dbd0c2e..99651258a73 100644 --- a/go/vt/vtgate/planbuilder/operators/queryprojection.go +++ b/go/vt/vtgate/planbuilder/operators/queryprojection.go @@ -709,6 +709,23 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont return true } +// addColumn adds a column to the QueryProjection if it is not already present +func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { + for _, selectExpr := range qp.SelectExprs { + getExpr, err := selectExpr.GetExpr() + if err != nil { + continue + } + if ctx.SemTable.EqualsExprWithDeps(getExpr, expr) { + return + } + } + qp.SelectExprs = append(qp.SelectExprs, SelectExpr{ + Col: aeWrap(expr), + Aggr: ContainsAggr(ctx, expr), + }) +} + func checkForInvalidGroupingExpressions(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { _ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) { if IsAggr(ctx, node) { diff --git a/go/vt/vtgate/planbuilder/operators/route.go b/go/vt/vtgate/planbuilder/operators/route.go index feeb091a725..d1448f260d6 100644 --- a/go/vt/vtgate/planbuilder/operators/route.go +++ b/go/vt/vtgate/planbuilder/operators/route.go @@ -587,96 +587,87 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, // if at least one column is not already present, we check if we can easily find a projection // or aggregation in our source that we can add to - derived, op, ok, offsets := addMultipleColumnsToInput(ctx, r.Source, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr}) - r.Source = op - if ok { - return offsets[0] + derived, op, offset := addColumnToInput(ctx, r.Source, expr, reuse, gb) + if op != nil { + r.Source = op + } + if offset >= 0 { + return offset } // If no-one could be found, we probably don't have one yet, so we add one here src := createProjection(ctx, r.Source, derived) r.Source = src - offsets = src.addColumnsWithoutPushing(ctx, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr}) - return offsets[0] + return src.addColumnWithoutPushing(ctx, expr, gb) } type selectExpressions interface { Operator addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int - addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) []int derivedName() string } // addColumnToInput adds columns to an operator without pushing them down -func addMultipleColumnsToInput( +func addColumnToInput( ctx *plancontext.PlanningContext, operator Operator, - reuse bool, - addToGroupBy []bool, - exprs []*sqlparser.AliasedExpr, -) (derivedName string, // if we found a derived table, this will contain its name + expr *sqlparser.AliasedExpr, + reuse, addToGroupBy bool, +) ( + derivedName string, // if we found a derived table, this will contain its name projection Operator, // if an operator needed to be built, it will be returned here - found bool, // whether a matching op was found or not - offsets []int, // the offsets the expressions received + offset int, // the offset of the expression, -1 if not found ) { + var src Operator + var updateSrc func(Operator) switch op := operator.(type) { - case *SubQuery: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Outer, reuse, addToGroupBy, exprs) - if added { - op.Outer = src - } - return derivedName, op, added, offset + // Pass through operators - we can just add the columns to their source + case *SubQuery: + src, updateSrc = op.Outer, func(newSrc Operator) { op.Outer = newSrc } case *Distinct: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *Limit: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *Ordering: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *LockAndComment: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } + + // Union needs special handling, we can't really add new columns to all inputs + case *Union: + proj := wrapInDerivedProjection(ctx, op) + dtName, newOp, offset := addColumnToInput(ctx, proj, expr, reuse, addToGroupBy) + if newOp == nil { + newOp = proj } - return derivedName, op, added, offset + return dtName, newOp, offset + // Horizon is another one of these - we can't really add new columns to it case *Horizon: - // if the horizon has an alias, then it is a derived table, - // we have to add a new projection and can't build on this one - return op.Alias, op, false, nil + return op.Alias, nil, -1 case selectExpressions: name := op.derivedName() if name != "" { // if the only thing we can push to is a derived table, // we have to add a new projection and can't build on this one - return name, op, false, nil + return name, nil, -1 } - offset := op.addColumnsWithoutPushing(ctx, reuse, addToGroupBy, exprs) - return "", op, true, offset + offset := op.addColumnWithoutPushing(ctx, expr, addToGroupBy) + return "", nil, offset - case *Union: - proj := addDerivedProj(ctx, op) - return addMultipleColumnsToInput(ctx, proj, reuse, addToGroupBy, exprs) default: - return "", op, false, nil + return "", nil, -1 + } + + // Handle the case where we have a pass-through operator + derivedName, src, offset = addColumnToInput(ctx, src, expr, reuse, addToGroupBy) + if src != nil { + updateSrc(src) } + return derivedName, nil, offset } func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool) int { @@ -691,7 +682,7 @@ func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool ok, foundOffset := addWSColumnToInput(ctx, r.Source, offset) if !ok { - src := addDerivedProj(ctx, r.Source) + src := wrapInDerivedProjection(ctx, r.Source) r.Source = src return src.AddWSColumn(ctx, offset, true) } @@ -714,7 +705,8 @@ func addWSColumnToInput(ctx *plancontext.PlanningContext, source Operator, offse return false, -1 } -func addDerivedProj( +// wrapInDerivedProjection wraps the input in a derived table projection named "dt" +func wrapInDerivedProjection( ctx *plancontext.PlanningContext, op Operator, ) (projection *Projection) { diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index fb04edd6d44..7d20d065002 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -7072,5 +7072,48 @@ "user.user" ] } + }, + { + "comment": "Aggregation over a ORDER BY/LIMIT inside a derived table", + "query": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "plan": { + "QueryType": "SELECT", + "Original": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "count_star(0) AS count(*)", + "Inputs": [ + { + "OperatorType": "SimpleProjection", + "Columns": "2", + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "25", + "Offset": "0", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1", + "OrderBy": "(1|3) DESC", + "Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit 25", + "Table": "`user`" + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } } ]