From c2d9d4c402d502d4113a76638f560e46f968be56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9s=20Taylor?= Date: Tue, 9 Jul 2024 16:42:53 +0200 Subject: [PATCH 1/3] planner: Handle ORDER BY inside derived tables (#16353) Signed-off-by: Manan Gupta Signed-off-by: Andres Taylor Co-authored-by: Manan Gupta --- .../queries/aggregation/aggregation_test.go | 3 +- .../planbuilder/operators/aggregator.go | 8 -- .../operators/horizon_expanding.go | 11 +- .../planbuilder/operators/projection.go | 9 -- .../planbuilder/operators/queryprojection.go | 17 +++ go/vt/vtgate/planbuilder/operators/route.go | 100 ++++++++---------- .../planbuilder/testdata/aggr_cases.json | 43 ++++++++ 7 files changed, 118 insertions(+), 73 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index e1f91ee2e08..a8ae8a94020 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -442,7 +442,8 @@ func TestOrderByCount(t *testing.T) { mcmp.Exec("insert into t9(id1, id2, id3) values(1, '1', '1'), (2, '2', '2'), (3, '2', '2'), (4, '3', '3'), (5, '3', '3'), (6, '3', '3')") - mcmp.AssertMatches("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC", `[[VARCHAR("3")] [VARCHAR("2")] [VARCHAR("1")]]`) + mcmp.Exec("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC") + mcmp.Exec("select COUNT(*) from (select 1 as one FROM t9 WHERE id3 = 3 ORDER BY id1 DESC LIMIT 3 OFFSET 0) subquery_for_count") } func TestAggregateAnyValue(t *testing.T) { diff --git a/go/vt/vtgate/planbuilder/operators/aggregator.go b/go/vt/vtgate/planbuilder/operators/aggregator.go index e9fee905024..91eca26c0ea 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregator.go +++ b/go/vt/vtgate/planbuilder/operators/aggregator.go @@ -112,14 +112,6 @@ func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e return offset } -func (a *Aggregator) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, groupby []bool, exprs []*sqlparser.AliasedExpr) (offsets []int) { - for i, ae := range exprs { - offset := a.addColumnWithoutPushing(ctx, ae, groupby[i]) - offsets = append(offsets, offset) - } - return -} - func (a *Aggregator) isDerived() bool { return a.DT != nil } diff --git a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go index 3f7700eed9d..3b934752a00 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go @@ -75,9 +75,18 @@ func expandUnionHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, unio } func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel *sqlparser.Select) (Operator, *ApplyResult) { - op := createProjectionFromSelect(ctx, horizon) qp := horizon.getQP(ctx) var extracted []string + + if horizon.IsDerived() { + // if we are dealing with a derived table, we need to make sure that the ordering columns + // are available outside the derived table + for _, order := range horizon.Query.GetOrderBy() { + qp.addColumn(ctx, order.Expr) + } + } + + op := createProjectionFromSelect(ctx, horizon) if qp.HasAggr { extracted = append(extracted, "Aggregation") } else { diff --git a/go/vt/vtgate/planbuilder/operators/projection.go b/go/vt/vtgate/planbuilder/operators/projection.go index 1f380ba736b..5f79f315d7e 100644 --- a/go/vt/vtgate/planbuilder/operators/projection.go +++ b/go/vt/vtgate/planbuilder/operators/projection.go @@ -308,15 +308,6 @@ func (p *Projection) addColumnWithoutPushing(ctx *plancontext.PlanningContext, e return p.addColumn(ctx, true, false, expr, false) } -func (p *Projection) addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, _ []bool, exprs []*sqlparser.AliasedExpr) []int { - offsets := make([]int, len(exprs)) - for idx, expr := range exprs { - offset := p.addColumn(ctx, reuse, false, expr, false) - offsets[idx] = offset - } - return offsets -} - func (p *Projection) AddWSColumn(ctx *plancontext.PlanningContext, offset int, underRoute bool) int { cols, aliased := p.Columns.(AliasedProjections) if !aliased { diff --git a/go/vt/vtgate/planbuilder/operators/queryprojection.go b/go/vt/vtgate/planbuilder/operators/queryprojection.go index 5729dbd0c2e..99651258a73 100644 --- a/go/vt/vtgate/planbuilder/operators/queryprojection.go +++ b/go/vt/vtgate/planbuilder/operators/queryprojection.go @@ -709,6 +709,23 @@ func (qp *QueryProjection) useGroupingOverDistinct(ctx *plancontext.PlanningCont return true } +// addColumn adds a column to the QueryProjection if it is not already present +func (qp *QueryProjection) addColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { + for _, selectExpr := range qp.SelectExprs { + getExpr, err := selectExpr.GetExpr() + if err != nil { + continue + } + if ctx.SemTable.EqualsExprWithDeps(getExpr, expr) { + return + } + } + qp.SelectExprs = append(qp.SelectExprs, SelectExpr{ + Col: aeWrap(expr), + Aggr: ContainsAggr(ctx, expr), + }) +} + func checkForInvalidGroupingExpressions(ctx *plancontext.PlanningContext, expr sqlparser.Expr) { _ = sqlparser.Walk(func(node sqlparser.SQLNode) (bool, error) { if IsAggr(ctx, node) { diff --git a/go/vt/vtgate/planbuilder/operators/route.go b/go/vt/vtgate/planbuilder/operators/route.go index feeb091a725..d1448f260d6 100644 --- a/go/vt/vtgate/planbuilder/operators/route.go +++ b/go/vt/vtgate/planbuilder/operators/route.go @@ -587,96 +587,87 @@ func (r *Route) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, // if at least one column is not already present, we check if we can easily find a projection // or aggregation in our source that we can add to - derived, op, ok, offsets := addMultipleColumnsToInput(ctx, r.Source, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr}) - r.Source = op - if ok { - return offsets[0] + derived, op, offset := addColumnToInput(ctx, r.Source, expr, reuse, gb) + if op != nil { + r.Source = op + } + if offset >= 0 { + return offset } // If no-one could be found, we probably don't have one yet, so we add one here src := createProjection(ctx, r.Source, derived) r.Source = src - offsets = src.addColumnsWithoutPushing(ctx, reuse, []bool{gb}, []*sqlparser.AliasedExpr{expr}) - return offsets[0] + return src.addColumnWithoutPushing(ctx, expr, gb) } type selectExpressions interface { Operator addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int - addColumnsWithoutPushing(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) []int derivedName() string } // addColumnToInput adds columns to an operator without pushing them down -func addMultipleColumnsToInput( +func addColumnToInput( ctx *plancontext.PlanningContext, operator Operator, - reuse bool, - addToGroupBy []bool, - exprs []*sqlparser.AliasedExpr, -) (derivedName string, // if we found a derived table, this will contain its name + expr *sqlparser.AliasedExpr, + reuse, addToGroupBy bool, +) ( + derivedName string, // if we found a derived table, this will contain its name projection Operator, // if an operator needed to be built, it will be returned here - found bool, // whether a matching op was found or not - offsets []int, // the offsets the expressions received + offset int, // the offset of the expression, -1 if not found ) { + var src Operator + var updateSrc func(Operator) switch op := operator.(type) { - case *SubQuery: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Outer, reuse, addToGroupBy, exprs) - if added { - op.Outer = src - } - return derivedName, op, added, offset + // Pass through operators - we can just add the columns to their source + case *SubQuery: + src, updateSrc = op.Outer, func(newSrc Operator) { op.Outer = newSrc } case *Distinct: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *Limit: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *Ordering: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src - } - return derivedName, op, added, offset - + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } case *LockAndComment: - derivedName, src, added, offset := addMultipleColumnsToInput(ctx, op.Source, reuse, addToGroupBy, exprs) - if added { - op.Source = src + src, updateSrc = op.Source, func(newSrc Operator) { op.Source = newSrc } + + // Union needs special handling, we can't really add new columns to all inputs + case *Union: + proj := wrapInDerivedProjection(ctx, op) + dtName, newOp, offset := addColumnToInput(ctx, proj, expr, reuse, addToGroupBy) + if newOp == nil { + newOp = proj } - return derivedName, op, added, offset + return dtName, newOp, offset + // Horizon is another one of these - we can't really add new columns to it case *Horizon: - // if the horizon has an alias, then it is a derived table, - // we have to add a new projection and can't build on this one - return op.Alias, op, false, nil + return op.Alias, nil, -1 case selectExpressions: name := op.derivedName() if name != "" { // if the only thing we can push to is a derived table, // we have to add a new projection and can't build on this one - return name, op, false, nil + return name, nil, -1 } - offset := op.addColumnsWithoutPushing(ctx, reuse, addToGroupBy, exprs) - return "", op, true, offset + offset := op.addColumnWithoutPushing(ctx, expr, addToGroupBy) + return "", nil, offset - case *Union: - proj := addDerivedProj(ctx, op) - return addMultipleColumnsToInput(ctx, proj, reuse, addToGroupBy, exprs) default: - return "", op, false, nil + return "", nil, -1 + } + + // Handle the case where we have a pass-through operator + derivedName, src, offset = addColumnToInput(ctx, src, expr, reuse, addToGroupBy) + if src != nil { + updateSrc(src) } + return derivedName, nil, offset } func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool) int { @@ -691,7 +682,7 @@ func (r *Route) AddWSColumn(ctx *plancontext.PlanningContext, offset int, _ bool ok, foundOffset := addWSColumnToInput(ctx, r.Source, offset) if !ok { - src := addDerivedProj(ctx, r.Source) + src := wrapInDerivedProjection(ctx, r.Source) r.Source = src return src.AddWSColumn(ctx, offset, true) } @@ -714,7 +705,8 @@ func addWSColumnToInput(ctx *plancontext.PlanningContext, source Operator, offse return false, -1 } -func addDerivedProj( +// wrapInDerivedProjection wraps the input in a derived table projection named "dt" +func wrapInDerivedProjection( ctx *plancontext.PlanningContext, op Operator, ) (projection *Projection) { diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index fb04edd6d44..7d20d065002 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -7072,5 +7072,48 @@ "user.user" ] } + }, + { + "comment": "Aggregation over a ORDER BY/LIMIT inside a derived table", + "query": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "plan": { + "QueryType": "SELECT", + "Original": "SELECT COUNT(*) FROM (SELECT 1 AS one FROM `user` WHERE `user`.`is_not_deleted` = true ORDER BY id DESC LIMIT 25 OFFSET 0) subquery_for_count", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "count_star(0) AS count(*)", + "Inputs": [ + { + "OperatorType": "SimpleProjection", + "Columns": "2", + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "25", + "Offset": "0", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where 1 != 1) as subquery_for_count where 1 != 1", + "OrderBy": "(1|3) DESC", + "Query": "select subquery_for_count.one, subquery_for_count.id, 1, weight_string(subquery_for_count.id) from (select 1 as one, id from `user` where `user`.is_not_deleted = true) as subquery_for_count order by id desc limit 25", + "Table": "`user`" + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } } ] From ac4be07d1920be09bdc81f946e7f35977cf5f702 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Wed, 10 Jul 2024 09:33:53 +0200 Subject: [PATCH 2/3] empty commit to trigger ci Signed-off-by: Andres Taylor From c2368864482cb35437c6d62d8e89f5951ae431f5 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Wed, 10 Jul 2024 10:00:48 +0200 Subject: [PATCH 3/3] restrict test to v20 Signed-off-by: Andres Taylor --- .../endtoend/vtgate/queries/aggregation/aggregation_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index a8ae8a94020..4057b1bdee1 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -443,7 +443,9 @@ func TestOrderByCount(t *testing.T) { mcmp.Exec("insert into t9(id1, id2, id3) values(1, '1', '1'), (2, '2', '2'), (3, '2', '2'), (4, '3', '3'), (5, '3', '3'), (6, '3', '3')") mcmp.Exec("SELECT t9.id2 FROM t9 GROUP BY t9.id2 ORDER BY COUNT(t9.id2) DESC") - mcmp.Exec("select COUNT(*) from (select 1 as one FROM t9 WHERE id3 = 3 ORDER BY id1 DESC LIMIT 3 OFFSET 0) subquery_for_count") + if utils.BinaryIsAtLeastAtVersion(20, "vtgate") { + mcmp.Exec("select COUNT(*) from (select 1 as one FROM t9 WHERE id3 = 3 ORDER BY id1 DESC LIMIT 3 OFFSET 0) subquery_for_count") + } } func TestAggregateAnyValue(t *testing.T) {