Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for commutative join types getting stuck in analysis loop #1411

Merged
merged 15 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion enginetest/enginetests.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func TestJoinQueries(t *testing.T, harness Harness) {
for _, tt := range queries.SkippedJoinQueryTests {
TestQuery(t, harness, tt.Query, tt.Expected, tt.ExpectedColumns, nil)
}

for _, ts := range queries.SkippedJoinScripts {
TestScript(t, harness, ts)
}
Expand Down
4 changes: 4 additions & 0 deletions enginetest/queries/join_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
)

var JoinQueryTests = []QueryTest{
{
Query: `SELECT (SELECT 1 FROM (SELECT x FROM xy INNER JOIN uv ON (x = u OR y = v) LIMIT 1) r) AS s FROM xy`,
Expected: []sql.Row{{1}, {1}, {1}, {1}},
},
{
Query: `select a from ab where exists (select 1 from xy where a =x)`,
Expected: []sql.Row{{0}, {1}, {2}, {3}},
Expand Down
3,836 changes: 1,759 additions & 2,077 deletions enginetest/queries/query_plans.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion enginetest/testgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestWriteIndexQueryPlans(t *testing.T) {

func TestWriteIntegrationQueryPlans(t *testing.T) {
t.Skip()
writePlans(t, [][]setup.SetupScript{setup.MydbData, setup.Integration_testData}, queries.IntegrationPlanTests, "IntegrationPlanTests", 2)
writePlans(t, [][]setup.SetupScript{setup.MydbData, setup.Integration_testData}, queries.IntegrationPlanTests, "IntegrationPlanTests", 1)
}

func writePlans(t *testing.T, s [][]setup.SetupScript, original []queries.QueryPlanTest, name string, parallelism int) {
Expand Down
18 changes: 7 additions & 11 deletions sql/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func NewResolveSubqueryExprSelector(sel RuleSelector) RuleSelector {
optimizeJoinsId,
setJoinScopeLenId,
applyHashLookupsId,
finalizeSubqueryExprsId,
finalizeSubqueriesId,
parallelizeId,
pushdownFiltersId:
return false
Expand All @@ -394,15 +394,9 @@ func NewFinalizeNestedSubquerySel(sel RuleSelector) RuleSelector {
switch id {
case pruneColumnsId, optimizeJoinsId, setJoinScopeLenId, applyHashLookupsId, pushdownFiltersId:
return true
}
return sel(id)
}
}

func NewResolveSubquerySel(sel RuleSelector) RuleSelector {
return func(id RuleId) bool {
switch id {
case pruneColumnsId, optimizeJoinsId, pushdownFiltersId:
case finalizeSubqueriesId:
// Don't run finalizeSubqueries on subqueries, since calling it on the root of the statement will
// recursively handle subqueries from the bottom of the plan up.
return false
}
return sel(id)
Expand All @@ -412,7 +406,9 @@ func NewResolveSubquerySel(sel RuleSelector) RuleSelector {
func NewFinalizeSubqueryExprSelector(sel RuleSelector) RuleSelector {
return func(id RuleId) bool {
switch id {
case resolveSubqueryExprsId:
case finalizeSubqueriesId:
// Don't run finalizeSubqueries on subqueries, since calling it on the root of the statement will
// recursively handle subqueries from the bottom of the plan up.
return false
}
return sel(id)
Expand Down
1 change: 0 additions & 1 deletion sql/analyzer/coster.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (c *coster) costRead(t sql.Table) (float64, error) {
return float64(0), err
}
return float64(stats.RowCount()), nil

}

func (c *coster) costValues(v *values) (float64, error) {
Expand Down
5 changes: 2 additions & 3 deletions sql/analyzer/indexed_joins.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func constructJoinPlan(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope,
// TODO: nested subqueries attempt to replan joins, which
// is not ideal but not the end of the world.
reorder = false

}
case *plan.HashLookup:
// TODO: hash lookup rule is unnecessary, fold into join ordering
Expand Down Expand Up @@ -166,7 +165,7 @@ func replanJoin(ctx *sql.Context, n *plan.JoinNode, a *Analyzer, scope *Scope) (
addHashJoins(m)

if a.Verbose && a.Debug {
ctx.GetLogger().Logger.Println(m.String())
a.Log(m.String())
}

if hint := extractJoinHint(n); !hint.IsEmpty() {
Expand Down Expand Up @@ -398,7 +397,7 @@ func splitIndexableOr(filters []sql.Expression, indexes []sql.Index, attributeSo
}

// firstMatchingIndex returns first index that |e| can use as a lookup.
// This simplifes index selection for concatJoin to avoid building
// This simplifies index selection for concatJoin to avoid building
// memo objects for equality expressions and indexes.
func firstMatchingIndex(e *expression.Equals, indexes []sql.Index, attributeSource string, aliases TableAliases) *lookup {
for _, lIdx := range indexes {
Expand Down
6 changes: 3 additions & 3 deletions sql/analyzer/indexed_joins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func TestHashJoins(t *testing.T) {
memo: `memo:
├── G1: (tableScan: ab)
├── G2: (tableScan: xy)
├── G3: (hashJoin 1 2) (hashJoin 1 2) (hashJoin 2 1) (innerJoin 2 1) (innerJoin 1 2) (innerJoin 1 2)
├── G3: (hashJoin 1 2) (hashJoin 2 1) (innerJoin 2 1) (innerJoin 1 2)
├── G4: (tableScan: pq)
├── G5: (hashJoin 3 4) (hashJoin 1 8) (hashJoin 8 1) (hashJoin 3 4) (hashJoin 4 3) (innerJoin 4 3) (innerJoin 3 4) (innerJoin 8 1) (innerJoin 1 8) (innerJoin 3 4)
├── G5: (hashJoin 3 4) (hashJoin 1 8) (hashJoin 8 1) (hashJoin 4 3) (innerJoin 4 3) (innerJoin 8 1) (innerJoin 1 8) (innerJoin 3 4)
├── G6: (tableScan: uv)
├── G7: (hashJoin 5 6) (hashJoin 1 10) (hashJoin 10 1) (hashJoin 3 9) (hashJoin 9 3) (hashJoin 5 6) (hashJoin 6 5) (innerJoin 6 5) (innerJoin 5 6) (innerJoin 9 3) (innerJoin 3 9) (innerJoin 10 1) (innerJoin 1 10) (innerJoin 5 6)
├── G7: (hashJoin 5 6) (hashJoin 1 10) (hashJoin 10 1) (hashJoin 3 9) (hashJoin 9 3) (hashJoin 6 5) (innerJoin 6 5) (innerJoin 9 3) (innerJoin 3 9) (innerJoin 10 1) (innerJoin 1 10) (innerJoin 5 6)
├── G8: (hashJoin 2 4) (hashJoin 4 2) (innerJoin 4 2) (innerJoin 2 4)
├── G9: (hashJoin 4 6) (hashJoin 6 4) (innerJoin 6 4) (innerJoin 4 6)
└── G10: (hashJoin 2 9) (hashJoin 9 2) (hashJoin 8 6) (hashJoin 6 8) (innerJoin 6 8) (innerJoin 8 6) (innerJoin 9 2) (innerJoin 2 9)
Expand Down
7 changes: 5 additions & 2 deletions sql/analyzer/join_order_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (j *joinOrderBuilder) checkSize() {
}
}

// dpSube iterats all disjoint combinations of table sets,
// dpSube iterates all disjoint combinations of table sets,
// adding plans to the tree when we find two sets that can
// be joined
func (j *joinOrderBuilder) dbSube() {
Expand Down Expand Up @@ -397,6 +397,7 @@ func (j *joinOrderBuilder) addJoin(op plan.JoinType, s1, s2 vertexSet, joinFilte
} else {
j.addJoinToGroup(op, left, right, joinFilter, selFilters, group)
}

if commute(op) {
j.addJoinToGroup(op, right, left, joinFilter, selFilters, group)
}
Expand All @@ -412,7 +413,9 @@ func (j *joinOrderBuilder) addJoinToGroup(
group *exprGroup,
) {
rel := j.constructJoin(op, left, right, joinFilter, group)
group.prepend(rel)
if !group.hasJoinRelExpr(rel) {
group.prepend(rel)
}
return
}

Expand Down
18 changes: 9 additions & 9 deletions sql/analyzer/join_order_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func TestJoinOrderBuilder(t *testing.T) {
plans: `memo:
├── G1: (tableScan: a)
├── G2: (tableScan: b)
├── G3: (innerJoin 2 1) (innerJoin 1 2) (innerJoin 1 2)
├── G3: (innerJoin 2 1) (innerJoin 1 2)
├── G4: (tableScan: c)
├── G5: (innerJoin 4 3) (innerJoin 3 4) (innerJoin 8 1) (innerJoin 1 8) (innerJoin 3 4)
├── G5: (innerJoin 4 3) (innerJoin 8 1) (innerJoin 1 8) (innerJoin 3 4)
├── G6: (tableScan: d)
├── G7: (innerJoin 6 5) (innerJoin 5 6) (innerJoin 9 3) (innerJoin 3 9) (innerJoin 10 1) (innerJoin 1 10) (innerJoin 5 6)
├── G7: (innerJoin 6 5) (innerJoin 9 3) (innerJoin 3 9) (innerJoin 10 1) (innerJoin 1 10) (innerJoin 5 6)
├── G8: (innerJoin 4 2) (innerJoin 2 4)
├── G9: (innerJoin 6 4) (innerJoin 4 6)
└── G10: (innerJoin 6 8) (innerJoin 8 6) (innerJoin 9 2) (innerJoin 2 9)
Expand Down Expand Up @@ -91,17 +91,17 @@ func TestJoinOrderBuilder(t *testing.T) {
plans: `memo:
├── G1: (tableScan: a)
├── G2: (tableScan: b)
├── G3: (leftJoin 1 2) (leftJoin 1 2)
├── G3: (leftJoin 1 2)
├── G4: (tableScan: c)
├── G5: (tableScan: d)
├── G6: (fullOuterJoin 4 5) (fullOuterJoin 4 5)
├── G6: (fullOuterJoin 4 5)
├── G7: (tableScan: e)
├── G8: (leftJoin 6 7) (leftJoin 6 7)
├── G9: (innerJoin 8 3) (innerJoin 3 8) (leftJoin 14 2) (innerJoin 3 8)
├── G8: (leftJoin 6 7)
├── G9: (innerJoin 8 3) (leftJoin 14 2) (innerJoin 3 8)
├── G10: (tableScan: f)
├── G11: (tableScan: g)
├── G12: (innerJoin 11 10) (innerJoin 10 11) (innerJoin 10 11)
├── G13: (innerJoin 12 9) (innerJoin 9 12) (innerJoin 15 3) (innerJoin 3 15) (leftJoin 16 2) (innerJoin 9 12)
├── G12: (innerJoin 11 10) (innerJoin 10 11)
├── G13: (innerJoin 12 9) (innerJoin 15 3) (innerJoin 3 15) (leftJoin 16 2) (innerJoin 9 12)
├── G14: (innerJoin 8 1) (innerJoin 1 8)
├── G15: (innerJoin 12 8) (innerJoin 8 12)
└── G16: (innerJoin 12 14) (innerJoin 14 12) (innerJoin 15 1) (innerJoin 1 15)
Expand Down
33 changes: 28 additions & 5 deletions sql/analyzer/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (m *Memo) memoize(rel relExpr) *exprGroup {
}

// optimizeRoot finds the implementation for the root expression
// that ahs the lowest cost.
// that has the lowest cost.
func (m *Memo) optimizeRoot() error {
return m.optimizeMemoGroup(m.root)
}
Expand Down Expand Up @@ -294,7 +294,6 @@ type exprGroup struct {
id GroupId
m *Memo
first relExpr
last relExpr
best relExpr
cost float64
done bool
Expand All @@ -310,7 +309,6 @@ func newExprGroup(m *Memo, id GroupId, rel relExpr) *exprGroup {
m: m,
id: id,
first: rel,
last: rel,
}
rel.setGroup(grp)
grp.relProps = newRelProps(rel)
Expand All @@ -325,6 +323,31 @@ func (e *exprGroup) prepend(rel relExpr) {
rel.setNext(first)
}

// hasJoinRelExpr returns true if the specified relExpr is a joinRel that is already represented by an identical
// relExpr in this exprGroup. If |rel| is not an instance of joinRel, or if
func (e *exprGroup) hasJoinRelExpr(rel relExpr) bool {
joinRelExpr, isJoinRel := rel.(joinRel)
if !isJoinRel {
return false
}

for curr := e.first; curr != nil; {
if jbCurr, ok := curr.(joinRel); ok {
jbRel := joinRelExpr.joinPrivate()
jbCurr := jbCurr.joinPrivate()
if jbRel.op == jbCurr.op &&
jbRel.left.id == jbCurr.left.id &&
jbRel.right.id == jbCurr.right.id {
return true
}
}

curr = curr.next()
}

return false
}

func (e *exprGroup) children() []*exprGroup {
n := e.first
children := make([]*exprGroup, 0)
Expand All @@ -336,7 +359,7 @@ func (e *exprGroup) children() []*exprGroup {
}

func (e *exprGroup) updateBest(n relExpr, grpCost float64) {
if e.best == nil || grpCost < e.cost {
if e.best == nil || grpCost <= e.cost {
e.best = n
e.cost = grpCost
}
Expand Down Expand Up @@ -367,7 +390,7 @@ type relExpr interface {
}

type relBase struct {
// g is this relation's expresion group
// g is this relation's expression group
g *exprGroup
// n is the next relExpr in the exprGroup linked list
n relExpr
Expand Down
62 changes: 58 additions & 4 deletions sql/analyzer/resolve_subqueries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,75 @@ func resolveSubqueries(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope,

// finalizeSubqueries runs the final analysis pass on subquery expressions and subquery aliases in the node tree to ensure
// they are fully resolved and that the plan is ready to be executed. The logic is similar to when subqueries are initially
// resolved with resolveSubqueries, but with a few small differences:
// resolved with resolveSubqueries, but with a few important differences:
// - finalizeSubqueries processes each subquery once, from the bottom of the plan tree up, and should only be included
// when analyzing a root node at the top of the plan.
// - resolveSubqueries skips pruneColumns and optimizeJoins for subquery expressions and only runs the OnceBeforeDefault
// rule set on subquery aliases.
// - finalizeSubqueries runs a full analysis pass on subquery expressions and runs all rule batches except for OnceBeforeDefault.
func finalizeSubqueries(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope, sel RuleSelector) (sql.Node, transform.TreeIdentity, error) {
span, ctx := ctx.Span("finalize_subqueries")
defer span.End()

return resolveSubqueriesHelper(ctx, a, n, scope, sel, true)
return finalizeSubqueriesHelper(ctx, a, n, scope, sel)
}

// finalizeSubqueriesHelper recurses through the specified |node| to find all leaf subqueries and subquery expressions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one confusing thing is that the traversal connectivity crosses some scope boundaries, but not others. I think it still works? for example subqueries in unions, windows, or insert sources.

// working it's way from the bottom of the plan up, and runs a final analysis pass on subqueries to ensure they are
// ready to be executed.
func finalizeSubqueriesHelper(ctx *sql.Context, a *Analyzer, node sql.Node, scope *Scope, sel RuleSelector) (sql.Node, transform.TreeIdentity, error) {
return transform.Node(node, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
if sqa, ok := n.(*plan.SubqueryAlias); ok {
newNode, same1, err := finalizeSubqueriesHelper(ctx, a, sqa.Child, scope.newScopeFromSubqueryAlias(sqa), sel)
if err != nil {
return n, transform.SameTree, err
}

newSqa, err := sqa.WithChildren(newNode)
if err != nil {
return n, transform.SameTree, err
}

newNode, same2, err := analyzeSubqueryAlias(ctx, a, newSqa.(*plan.SubqueryAlias), scope, sel, true)
if err != nil {
return n, transform.SameTree, err
}
if same1 && same2 {
return n, transform.SameTree, nil
} else {
return newNode, transform.NewTree, nil
}
} else {
return transform.OneNodeExprsWithNode(n, func(node sql.Node, e sql.Expression) (sql.Expression, transform.TreeIdentity, error) {
if sq, ok := e.(*plan.Subquery); ok {
newNode, same1, err := finalizeSubqueriesHelper(ctx, a, sq.Query, scope.newScopeFromSubqueryExpression(node), sel)
if err != nil {
return e, transform.SameTree, err
}

newSq := sq.WithQuery(newNode)
newExpression, same2, err := analyzeSubqueryExpression(ctx, a, node, newSq, scope, sel, true)
if err != nil {
return e, transform.SameTree, err
}

if same1 && same2 {
return e, transform.SameTree, nil
} else {
return newExpression, transform.NewTree, nil
}
} else {
return e, transform.SameTree, nil
}
})
}
})
}

func resolveSubqueriesHelper(ctx *sql.Context, a *Analyzer, node sql.Node, scope *Scope, sel RuleSelector, finalize bool) (sql.Node, transform.TreeIdentity, error) {
return transform.Node(node, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
if sqa, ok := n.(*plan.SubqueryAlias); ok {
return analyzeSubqueryAlias(ctx, a, node, sqa, scope, sel, finalize)
return analyzeSubqueryAlias(ctx, a, sqa, scope, sel, finalize)
} else {
return transform.OneNodeExprsWithNode(n, func(node sql.Node, e sql.Expression) (sql.Expression, transform.TreeIdentity, error) {
if sq, ok := e.(*plan.Subquery); ok {
Expand Down Expand Up @@ -124,7 +178,7 @@ func analyzeSubqueryExpression(ctx *sql.Context, a *Analyzer, n sql.Node, sq *pl
// analyzeSubqueryAlias runs analysis on the specified subquery alias, |sqa|. The |finalize| parameter indicates if this is
// the final run of the analyzer on the query before execution, which means all rules, starting from the default-rules
// batch are processed, otherwise only the once-before-default batch of rules is processed for all other non-final passes.
func analyzeSubqueryAlias(ctx *sql.Context, a *Analyzer, node sql.Node, sqa *plan.SubqueryAlias, scope *Scope, sel RuleSelector, finalize bool) (sql.Node, transform.TreeIdentity, error) {
func analyzeSubqueryAlias(ctx *sql.Context, a *Analyzer, sqa *plan.SubqueryAlias, scope *Scope, sel RuleSelector, finalize bool) (sql.Node, transform.TreeIdentity, error) {
subScope := scope.newScopeFromSubqueryAlias(sqa)

var child sql.Node
Expand Down