Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce usages of old horizon planning fallback #13595

Merged
merged 3 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) er
// we are not going to see values multiple times, so we don't need to multiply with the count(*) from the other side
return ab.handlePushThroughAggregation(ctx, aggr)
default:
return errHorizonNotPlanned()
return vterrors.VT12001(fmt.Sprintf("aggregation not planned: %s", aggr.OpCode.String()))
systay marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (a *Aggregator) addIfAggregationColumn(ctx *plancontext.PlanningContext, co
if _, srcIsAlsoAggr := a.Source.(*Aggregator); srcIsAlsoAggr {
return 0, vterrors.VT12001("aggregation on top of aggregation not supported")
}
return -1, vterrors.VT13001(fmt.Sprintf("aggregation column on wrong index: want: %d, got: %d", colIdx, offset))
return -1, vterrors.VT12001(fmt.Sprintf("failed to plan aggregation on: %s", sqlparser.String(aggr.Original)))
systay marked this conversation as resolved.
Show resolved Hide resolved
}

a.Source = newSrc
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func createProjectionWithoutAggr(qp *QueryProjection, src ops.Operator) (*Projec
aggr, ok := expr.(sqlparser.AggrFunc)
if !ok {
// need to add logic to extract aggregations and pushed them to the top level
return nil, errHorizonNotPlanned()
return nil, vterrors.VT12001(fmt.Sprintf("unsupported aggregation expression: %s", sqlparser.String(expr)))
}
expr = aggr.GetArg()
if expr == nil {
Expand Down
11 changes: 1 addition & 10 deletions go/vt/vtgate/planbuilder/operators/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,7 @@ func optimizeHorizonPlanning(ctx *plancontext.PlanningContext, root ops.Operator
}
}

newOp, err := rewrite.FixedPointBottomUp(root, TableID, visitor, stopAtRoute)
if err != nil {
if vterr, ok := err.(*vterrors.VitessError); ok && vterr.ID == "VT13001" {
// we encountered a bug. let's try to back out
return nil, errHorizonNotPlanned()
}
return nil, err
}

return newOp, nil
return rewrite.FixedPointBottomUp(root, TableID, visitor, stopAtRoute)
}

func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (ops.Operator, *rewrite.ApplyResult, error) {
Expand Down
11 changes: 1 addition & 10 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,7 @@ func planOffsets(ctx *plancontext.PlanningContext, root ops.Operator) (ops.Opera
return in, rewrite.SameTree, nil
}

op, err := rewrite.TopDown(root, TableID, visitor, stopAtRoute)
if err != nil {
if vterr, ok := err.(*vterrors.VitessError); ok && vterr.ID == "VT13001" {
// we encountered a bug. let's try to back out
return nil, errHorizonNotPlanned()
}
return nil, err
}

return op, nil
return rewrite.TopDown(root, TableID, visitor, stopAtRoute)
}

func fetchByOffset(e sqlparser.SQLNode) bool {
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ func createSimpleProjection(ctx *plancontext.PlanningContext, qp *QueryProjectio
}

for _, e := range qp.SelectExprs {
if _, isStar := e.Col.(*sqlparser.StarExpr); isStar {
return nil, errHorizonNotPlanned()
}
ae, err := e.GetAliasedExpr()
if err != nil {
return nil, err
Expand Down
25 changes: 13 additions & 12 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,21 +543,22 @@ func createProjection(src ops.Operator) (*Projection, error) {
return proj, nil
}

func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, _, addToGroupBy bool) (ops.Operator, int, error) {
func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, reuseExisting, addToGroupBy bool) (ops.Operator, int, error) {
removeKeyspaceFromSelectExpr(expr)

// check if columns is already added.
cols, err := r.GetColumns()
if err != nil {
return nil, 0, err
}
colAsExpr := func(e *sqlparser.AliasedExpr) sqlparser.Expr {
return e.Expr
}
if offset, found := canReuseColumn(ctx, cols, expr.Expr, colAsExpr); found {
return r, offset, nil
if reuseExisting {
// check if columns is already added.
cols, err := r.GetColumns()
if err != nil {
return nil, 0, err
}
colAsExpr := func(e *sqlparser.AliasedExpr) sqlparser.Expr {
return e.Expr
}
if offset, found := canReuseColumn(ctx, cols, expr.Expr, colAsExpr); found {
return r, offset, nil
}
}

// if column is not already present, we check if we can easily find a projection
// or aggregation in our source that we can add to
if ok, offset := addColumnToInput(r.Source, expr, addToGroupBy); ok {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/planbuilder/operators/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package operators

import (
"fmt"

"vitess.io/vitess/go/slices2"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -104,7 +106,7 @@ func (to *Table) TablesUsed() []string {
func addColumn(ctx *plancontext.PlanningContext, op ColNameColumns, e sqlparser.Expr) (int, error) {
col, ok := e.(*sqlparser.ColName)
if !ok {
return 0, vterrors.VT13001("cannot push this expression to a table/vindex: %s", sqlparser.String(e))
return 0, vterrors.VT12001(fmt.Sprintf("cannot add '%s' expression to a table/vindex", sqlparser.String(e)))
}
sqlparser.RemoveKeyspaceFromColName(col)
cols := op.GetColNames()
Expand Down
35 changes: 35 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -5877,5 +5877,40 @@
"user.user"
]
}
},
{
"comment": "aggregation on top of derived table with limit",
"query": "select count(val2), sum(val2) from (select id, val2 from user where val2 is null limit 2) as x",
"plan": {
"QueryType": "SELECT",
"Original": "select count(val2), sum(val2) from (select id, val2 from user where val2 is null limit 2) as x",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "count(0) AS count(val2), sum(1) AS sum(val2)",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "INT64(2)",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select val2, val2 from (select id, val2 from `user` where 1 != 1) as x where 1 != 1",
"Query": "select val2, val2 from (select id, val2 from `user` where val2 is null) as x limit :__upper_limit",
"Table": "`user`"
}
]
}
]
},
"TablesUsed": [
"user.user"
]
}
}
]
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/tpch_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -746,12 +746,12 @@
{
"comment": "TPC-H query 8",
"query": "select o_year, sum(case when nation = 'BRAZIL' then volume else 0 end) / sum(volume) as mkt_share from ( select extract(year from o_orderdate) as o_year, l_extendedprice * (1 - l_discount) as volume, n2.n_name as nation from part, supplier, lineitem, orders, customer, nation n1, nation n2, region where p_partkey = l_partkey and s_suppkey = l_suppkey and l_orderkey = o_orderkey and o_custkey = c_custkey and c_nationkey = n1.n_nationkey and n1.n_regionkey = r_regionkey and r_name = 'AMERICA' and s_nationkey = n2.n_nationkey and o_orderdate between date '1995-01-01' and date('1996-12-31') and p_type = 'ECONOMY ANODIZED STEEL' ) as all_nations group by o_year order by o_year",
"plan": "VT12001: unsupported: in scatter query: complex aggregate expression"
"plan": "VT12001: unsupported: failed to plan aggregation on: sum(case when nation = 'BRAZIL' then volume else 0 end)"
},
{
"comment": "TPC-H query 9",
"query": "select nation, o_year, sum(amount) as sum_profit from ( select n_name as nation, extract(year from o_orderdate) as o_year, l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount from part, supplier, lineitem, partsupp, orders, nation where s_suppkey = l_suppkey and ps_suppkey = l_suppkey and ps_partkey = l_partkey and p_partkey = l_partkey and o_orderkey = l_orderkey and s_nationkey = n_nationkey and p_name like '%green%' ) as profit group by nation, o_year order by nation, o_year desc",
"plan": "VT12001: unsupported: aggregation on columns from different sources"
"plan": "VT12001: unsupported: failed to plan aggregation on: sum(amount) as sum_profit"
},
{
"comment": "TPC-H query 10",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/unsupported_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@
{
"comment": "select func(keyspace_id) from user_index where id = :id",
"query": "select func(keyspace_id) from user_index where id = :id",
"plan": "VT12001: unsupported: expression on results of a vindex function"
"plan": "VT12001: unsupported: cannot add 'func(keyspace_id)' expression to a table/vindex"
},
{
"comment": "delete with multi-table targets",
Expand Down
13 changes: 7 additions & 6 deletions go/vt/vtgate/semantics/table_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package semantics

import (
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -106,12 +107,12 @@ func (tc *tableCollector) up(cursor *sqlparser.Cursor) error {

func newVindexTable(t sqlparser.IdentifierCS) *vindexes.Table {
vindexCols := []vindexes.Column{
{Name: sqlparser.NewIdentifierCI("id")},
{Name: sqlparser.NewIdentifierCI("keyspace_id")},
{Name: sqlparser.NewIdentifierCI("range_start")},
{Name: sqlparser.NewIdentifierCI("range_end")},
{Name: sqlparser.NewIdentifierCI("hex_keyspace_id")},
{Name: sqlparser.NewIdentifierCI("shard")},
{Name: sqlparser.NewIdentifierCI("id"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("keyspace_id"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("range_start"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("range_end"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("hex_keyspace_id"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("shard"), Type: querypb.Type_VARBINARY},
}

return &vindexes.Table{
Expand Down