Skip to content

Commit

Permalink
Move more horizon planning to the operators (#13412)
Browse files Browse the repository at this point in the history
* error out if we have aggregate gtid in handleAggr

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Signed-off-by: Andres Taylor <andres@planetscale.com>

* remove un-required for loop

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Signed-off-by: Andres Taylor <andres@planetscale.com>

* add GetSelectExprs to the operator interface and remove horizon/derived

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Signed-off-by: Andres Taylor <andres@planetscale.com>

* support distinct aggregations on the new horizon planner

Signed-off-by: Andres Taylor <andres@planetscale.com>

* stop pushing of aggregation filtering into derived tables

Signed-off-by: Andres Taylor <andres@planetscale.com>

* wip - enable derived tables in the new horizon planner

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Signed-off-by: Andres Taylor <andres@planetscale.com>

* Bring back support for union inside derived tables

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Signed-off-by: Andres Taylor <andres@planetscale.com>

* fix typo in error

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Signed-off-by: Andres Taylor <andres@planetscale.com>

* update test expectations

Signed-off-by: Andres Taylor <andres@planetscale.com>

* use semantics.RewriteDerivedTableExpression instead of manual rewriting of ColNames

Signed-off-by: Andres Taylor <andres@planetscale.com>

* work on making derived tables with aggregation work

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor code

Signed-off-by: Andres Taylor <andres@planetscale.com>

* enable horizon planning in more situations

Signed-off-by: Andres Taylor <andres@planetscale.com>

* update test expectations

Signed-off-by: Andres Taylor <andres@planetscale.com>

* make sure to always use a method to create aggregate params

Signed-off-by: Andres Taylor <andres@planetscale.com>

* cleanup and refactoring

Signed-off-by: Andres Taylor <andres@planetscale.com>

* use weight strings for min/max

Signed-off-by: Andres Taylor <andres@planetscale.com>

* update list of error code

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* disallow aggregation on top of aggregation with a clearer error message

Signed-off-by: Andres Taylor <andres@planetscale.com>

* fail min/max queries on types we cant compare

Signed-off-by: Andres Taylor <andres@planetscale.com>

* test: remove pattern not used

Signed-off-by: Andres Taylor <andres@planetscale.com>

* spread table id through derived tables

Signed-off-by: Andres Taylor <andres@planetscale.com>

* add ordering bottom up so the order can be re-used

Signed-off-by: Andres Taylor <andres@planetscale.com>

* unify Derived and Horizon into a single struct

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor: aggregation-pushing

Change the aggregation type when pushed down and not
after the fact.

Signed-off-by: Andres Taylor <andres@planetscale.com>

* add support handling sum(distinct x) and count(distinct x) on top of joins

Signed-off-by: Andres Taylor <andres@planetscale.com>

---------

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
Signed-off-by: Andres Taylor <andres@planetscale.com>
Co-authored-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
frouioui and systay authored Jul 5, 2023
1 parent fc29ef2 commit 26a2890
Show file tree
Hide file tree
Showing 56 changed files with 1,162 additions and 1,218 deletions.
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
VT09012 = errorWithoutState("VT09012", vtrpcpb.Code_FAILED_PRECONDITION, "%s statement with %s tablet not allowed", "This type of statement is not allowed on the given tablet.")
VT09013 = errorWithoutState("VT09013", vtrpcpb.Code_FAILED_PRECONDITION, "semi-sync plugins are not loaded", "Durability policy wants Vitess to use semi-sync, but the MySQL instances don't have the semi-sync plugin loaded.")
VT09014 = errorWithoutState("VT09014", vtrpcpb.Code_FAILED_PRECONDITION, "vindex cannot be modified", "The vindex cannot be used as table in DML statement")
VT09015 = errorWithoutState("VT09015", vtrpcpb.Code_FAILED_PRECONDITION, "schema tracking required", "This query cannot be planned without more information on the SQL schema. Please turn on schema tracking or add authoritative columns information to your VSchema.")

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")

Expand Down Expand Up @@ -136,6 +137,7 @@ var (
VT09012,
VT09013,
VT09014,
VT09015,
VT10001,
VT12001,
VT13001,
Expand Down
45 changes: 28 additions & 17 deletions go/vt/vtgate/engine/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"strconv"

"vitess.io/vitess/go/vt/vterrors"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/collations"
Expand All @@ -41,7 +43,6 @@ type AggregateParams struct {
// These are used only for distinct opcodes.
KeyCol int
WCol int
WAssigned bool
CollationID collations.ID

Alias string `json:",omitempty"`
Expand All @@ -53,22 +54,26 @@ type AggregateParams struct {
OrigOpcode AggregateOpcode
}

func (ap *AggregateParams) isDistinct() bool {
return ap.Opcode == AggregateCountDistinct || ap.Opcode == AggregateSumDistinct
func NewAggregateParam(opcode AggregateOpcode, col int, alias string) *AggregateParams {
out := &AggregateParams{
Opcode: opcode,
Col: col,
Alias: alias,
WCol: -1,
}
if opcode.NeedsComparableValues() {
out.KeyCol = col
}
return out
}

func (ap *AggregateParams) preProcess() bool {
switch ap.Opcode {
case AggregateCountDistinct, AggregateSumDistinct, AggregateGtid, AggregateCount, AggregateGroupConcat:
return true
default:
return false
}
func (ap *AggregateParams) WAssigned() bool {
return ap.WCol >= 0
}

func (ap *AggregateParams) String() string {
keyCol := strconv.Itoa(ap.Col)
if ap.WAssigned {
if ap.WAssigned() {
keyCol = fmt.Sprintf("%s|%d", keyCol, ap.WCol)
}
if ap.CollationID != collations.Unknown {
Expand Down Expand Up @@ -161,7 +166,7 @@ func merge(
) ([]sqltypes.Value, []sqltypes.Value, error) {
result := sqltypes.CopyRow(row1)
for index, aggr := range aggregates {
if aggr.isDistinct() {
if aggr.Opcode.IsDistinct() {
if row2[aggr.KeyCol].IsNull() {
continue
}
Expand Down Expand Up @@ -194,8 +199,14 @@ func merge(
}
result[aggr.Col], err = evalengine.NullSafeAdd(value, v2, fields[aggr.Col].Type)
case AggregateMin:
if aggr.WAssigned() && !row2[aggr.Col].IsComparable() {
return minMaxWeightStringError()
}
result[aggr.Col], err = evalengine.Min(row1[aggr.Col], row2[aggr.Col], aggr.CollationID)
case AggregateMax:
if aggr.WAssigned() && !row2[aggr.Col].IsComparable() {
return minMaxWeightStringError()
}
result[aggr.Col], err = evalengine.Max(row1[aggr.Col], row2[aggr.Col], aggr.CollationID)
case AggregateCountDistinct:
result[aggr.Col], err = evalengine.NullSafeAdd(row1[aggr.Col], countOne, fields[aggr.Col].Type)
Expand Down Expand Up @@ -241,6 +252,10 @@ func merge(
return result, curDistincts, nil
}

func minMaxWeightStringError() ([]sqltypes.Value, []sqltypes.Value, error) {
return nil, nil, vterrors.VT12001("min/max on types that are not comparable is not supported")
}

func convertFinal(current []sqltypes.Value, aggregates []*AggregateParams) ([]sqltypes.Value, error) {
result := sqltypes.CopyRow(current)
for _, aggr := range aggregates {
Expand Down Expand Up @@ -270,17 +285,13 @@ func convertFields(fields []*querypb.Field, aggrs []*AggregateParams) []*querypb
if aggr.Alias != "" {
fields[aggr.Col].Name = aggr.Alias
}
if aggr.isDistinct() {
// TODO: this should move to plan time
aggr.KeyCol = aggr.Col
}
}
return fields
}

func findComparableCurrentDistinct(row []sqltypes.Value, aggr *AggregateParams) sqltypes.Value {
curDistinct := row[aggr.KeyCol]
if aggr.WAssigned && !curDistinct.IsComparable() {
if aggr.WAssigned() && !curDistinct.IsComparable() {
aggr.KeyCol = aggr.WCol
curDistinct = row[aggr.KeyCol]
}
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/engine/opcode/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,21 @@ func (code AggregateOpcode) Type(typ *querypb.Type) (querypb.Type, bool) {
panic(code.String()) // we have a unit test checking we never reach here
}
}

func (code AggregateOpcode) NeedsComparableValues() bool {
switch code {
case AggregateCountDistinct, AggregateSumDistinct, AggregateMin, AggregateMax:
return true
default:
return false
}
}

func (code AggregateOpcode) IsDistinct() bool {
switch code {
case AggregateCountDistinct, AggregateSumDistinct:
return true
default:
return false
}
}
Loading

0 comments on commit 26a2890

Please sign in to comment.