Skip to content

Commit

Permalink
planner/cascades: add transformation rule MergeAdjacentTopN (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Reminiscent authored and hsqlu committed Feb 7, 2020
1 parent 79a9c04 commit 5b58cc9
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 0 deletions.
24 changes: 24 additions & 0 deletions planner/cascades/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,27 @@ func (s *testIntegrationSuite) TestMemTableScan(c *C) {
tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...))
}
}

func (s *testIntegrationSuite) TestTopN(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int primary key, b int);")
tk.MustExec("insert into t values (1, 11), (4, 44), (2, 22), (3, 33);")
tk.MustExec("set session tidb_enable_cascades_planner = 1;")
var input []string
var output []struct {
SQL string
Plan []string
Result []string
}
s.testData.GetTestCases(c, &input, &output)
for i, sql := range input {
s.testData.OnRecord(func() {
output[i].SQL = sql
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + sql).Rows())
output[i].Result = s.testData.ConvertRowsToStrings(tk.MustQuery(sql).Rows())
})
tk.MustQuery("explain " + sql).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...))
}
}
12 changes: 12 additions & 0 deletions planner/cascades/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,17 @@
"cases": [
"select * from information_schema.processlist"
]
},
{
"name": "TestTopN",
"cases": [
"select a from (select a from t where b > 2 order by a limit 3 offset 1) as t1 order by a limit 2 offset 1",
"select * from (select * from t order by a limit 3) as t1 order by a limit 5",
"select b from (select b from t order by b limit 10 offset 10) as t1 order by b limit 10 offset 5",
"select b from (select b from t order by b limit 10 offset 2) as t1 order by b limit 3 offset 5",
"select a from (select a from t order by a limit 3 offset 5) as t1 order by a limit 3 offset 5",
"select a from (select a from t where b > 2 order by a, b limit 3 offset 1) as t1 order by a limit 2 offset 1",
"select * from (select * from t order by a limit 3) as t1 order by a, b limit 5"
]
}
]
92 changes: 92 additions & 0 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -595,5 +595,97 @@
"Result": null
}
]
},
{
"Name": "TestTopN",
"Cases": [
{
"SQL": "select a from (select a from t where b > 2 order by a limit 3 offset 1) as t1 order by a limit 2 offset 1",
"Plan": [
"Projection_25 2.00 root test.t.a",
"└─Limit_27 2.00 root offset:2, count:2",
" └─TableReader_35 4.00 root data:Limit_36",
" └─Limit_36 4.00 cop[tikv] offset:0, count:4",
" └─Selection_33 4.00 cop[tikv] gt(test.t.b, 2)",
" └─TableScan_34 4.00 cop[tikv] table:t, range:[-inf,+inf], keep order:true, stats:pseudo"
],
"Result": [
"3",
"4"
]
},
{
"SQL": "select * from (select * from t order by a limit 3) as t1 order by a limit 5",
"Plan": [
"Limit_17 3.00 root offset:0, count:3",
"└─TableReader_23 3.00 root data:Limit_24",
" └─Limit_24 3.00 cop[tikv] offset:0, count:3",
" └─TableScan_22 3.00 cop[tikv] table:t, range:[-inf,+inf], keep order:true, stats:pseudo"
],
"Result": [
"1 11",
"2 22",
"3 33"
]
},
{
"SQL": "select b from (select b from t order by b limit 10 offset 10) as t1 order by b limit 10 offset 5",
"Plan": [
"TopN_16 5.00 root test.t.b:asc, offset:15, count:5",
"└─TableReader_18 20.00 root data:TopN_19",
" └─TopN_19 20.00 cop[tikv] test.t.b:asc, offset:0, count:20",
" └─TableScan_21 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Result": null
},
{
"SQL": "select b from (select b from t order by b limit 10 offset 2) as t1 order by b limit 3 offset 5",
"Plan": [
"TopN_16 3.00 root test.t.b:asc, offset:7, count:3",
"└─TableReader_18 10.00 root data:TopN_19",
" └─TopN_19 10.00 cop[tikv] test.t.b:asc, offset:0, count:10",
" └─TableScan_21 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Result": null
},
{
"SQL": "select a from (select a from t order by a limit 3 offset 5) as t1 order by a limit 3 offset 5",
"Plan": [
"TableDual_14 0.00 root rows:0"
],
"Result": null
},
{
"SQL": "select a from (select a from t where b > 2 order by a, b limit 3 offset 1) as t1 order by a limit 2 offset 1",
"Plan": [
"Projection_25 2.00 root test.t.a",
"└─TopN_26 2.00 root test.t.a:asc, test.t.b:asc, offset:2, count:2",
" └─TableReader_28 4.00 root data:TopN_29",
" └─TopN_29 4.00 cop[tikv] test.t.a:asc, test.t.b:asc, offset:0, count:4",
" └─Selection_31 8000.00 cop[tikv] gt(test.t.b, 2)",
" └─TableScan_32 10000.00 cop[tikv] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
],
"Result": [
"3",
"4"
]
},
{
"SQL": "select * from (select * from t order by a limit 3) as t1 order by a, b limit 5",
"Plan": [
"Limit_14 3.00 root offset:0, count:5",
"└─Sort_26 3.00 root test.t.a:asc, test.t.b:asc",
" └─Limit_16 3.00 root offset:0, count:3",
" └─TableReader_22 3.00 root data:Limit_23",
" └─Limit_23 3.00 cop[tikv] offset:0, count:3",
" └─TableScan_21 3.00 cop[tikv] table:t, range:[-inf,+inf], keep order:true, stats:pseudo"
],
"Result": [
"1 11",
"2 22",
"3 33"
]
}
]
}
]
17 changes: 17 additions & 0 deletions planner/cascades/testdata/transformation_rules_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,23 @@
"select a from (select a from t limit 3 offset 5) t1 limit 3 offset 5"
]
},
{
"name": "TestMergeAdjacentTopN",
"cases": [
"select b from (select b from t where c > 1 order by b limit 3) as t1 order by b limit 2",
"select a from (select a from t where b > 2 order by a limit 3 offset 1) as t1 order by a limit 2 offset 1",
"select * from (select * from t order by a limit 3) as t1 order by a limit 5",
"select b from (select b from t order by b limit 5) as t1 order by b limit 10",
"select b from (select b from t order by b limit 20) as t1 order by b limit 10",
"select b from (select b from t order by b limit 10) as t1 order by b limit 10",
"select b from (select b from t order by b limit 10 offset 10) as t1 order by b limit 10 offset 5",
"select b from (select b from t order by b limit 10 offset 2) as t1 order by b limit 3 offset 5",
"select b from (select b from t order by b limit 10 offset 5) as t1 order by b limit 5 offset 5",
"select a from (select a from t order by a limit 3 offset 5) as t1 order by a limit 3 offset 5",
"select b from (select b from t where c > 1 order by b, a limit 3) as t1 order by b limit 2",
"select a from (select a from t where b > 2 order by a, b limit 3 offset 1) as t1 order by a limit 2 offset 1"
]
},
{
"name": "TestTransformLimitToTableDual",
"cases": [
Expand Down
143 changes: 143 additions & 0 deletions planner/cascades/testdata/transformation_rules_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,149 @@
}
]
},
{
"Name": "TestMergeAdjacentTopN",
"Cases": [
{
"SQL": "select b from (select b from t where c > 1 order by b limit 3) as t1 order by b limit 2",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_11 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b,test.t.c]",
" TopN_14 input:[Group#2], test.t.b:asc, offset:0, count:2",
"Group#2 Schema:[test.t.b,test.t.c]",
" Selection_2 input:[Group#3], gt(test.t.c, 1)",
"Group#3 Schema:[test.t.b,test.t.c]",
" TableScan_1 table:t"
]
},
{
"SQL": "select a from (select a from t where b > 2 order by a limit 3 offset 1) as t1 order by a limit 2 offset 1",
"Result": [
"Group#0 Schema:[test.t.a]",
" Projection_11 input:[Group#1], test.t.a",
"Group#1 Schema:[test.t.a,test.t.b]",
" TopN_14 input:[Group#2], test.t.a:asc, offset:2, count:2",
"Group#2 Schema:[test.t.a,test.t.b]",
" Selection_2 input:[Group#3], gt(test.t.b, 2)",
"Group#3 Schema:[test.t.a,test.t.b]",
" TableScan_1 table:t"
]
},
{
"SQL": "select * from (select * from t order by a limit 3) as t1 order by a limit 5",
"Result": [
"Group#0 Schema:[test.t.a,test.t.b,test.t.c,test.t.d,test.t.e,test.t.c_str,test.t.d_str,test.t.e_str,test.t.f,test.t.g,test.t.h,test.t.i_date]",
" Projection_10 input:[Group#1], test.t.a, test.t.b, test.t.c, test.t.d, test.t.e, test.t.c_str, test.t.d_str, test.t.e_str, test.t.f, test.t.g, test.t.h, test.t.i_date",
"Group#1 Schema:[test.t.a,test.t.b,test.t.c,test.t.d,test.t.e,test.t.c_str,test.t.d_str,test.t.e_str,test.t.f,test.t.g,test.t.h,test.t.i_date]",
" TopN_13 input:[Group#2], test.t.a:asc, offset:0, count:3",
"Group#2 Schema:[test.t.a,test.t.b,test.t.c,test.t.d,test.t.e,test.t.c_str,test.t.d_str,test.t.e_str,test.t.f,test.t.g,test.t.h,test.t.i_date]",
" TableScan_1 table:t"
]
},
{
"SQL": "select b from (select b from t order by b limit 5) as t1 order by b limit 10",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_10 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b]",
" TopN_13 input:[Group#2], test.t.b:asc, offset:0, count:5",
"Group#2 Schema:[test.t.b]",
" TableScan_1 table:t"
]
},
{
"SQL": "select b from (select b from t order by b limit 20) as t1 order by b limit 10",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_10 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b]",
" TopN_13 input:[Group#2], test.t.b:asc, offset:0, count:10",
"Group#2 Schema:[test.t.b]",
" TableScan_1 table:t"
]
},
{
"SQL": "select b from (select b from t order by b limit 10) as t1 order by b limit 10",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_10 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b]",
" TopN_13 input:[Group#2], test.t.b:asc, offset:0, count:10",
"Group#2 Schema:[test.t.b]",
" TableScan_1 table:t"
]
},
{
"SQL": "select b from (select b from t order by b limit 10 offset 10) as t1 order by b limit 10 offset 5",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_10 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b]",
" TopN_13 input:[Group#2], test.t.b:asc, offset:15, count:5",
"Group#2 Schema:[test.t.b]",
" TableScan_1 table:t"
]
},
{
"SQL": "select b from (select b from t order by b limit 10 offset 2) as t1 order by b limit 3 offset 5",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_10 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b]",
" TopN_13 input:[Group#2], test.t.b:asc, offset:7, count:3",
"Group#2 Schema:[test.t.b]",
" TableScan_1 table:t"
]
},
{
"SQL": "select b from (select b from t order by b limit 10 offset 5) as t1 order by b limit 5 offset 5",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_10 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.b]",
" TopN_13 input:[Group#2], test.t.b:asc, offset:10, count:5",
"Group#2 Schema:[test.t.b]",
" TableScan_1 table:t"
]
},
{
"SQL": "select a from (select a from t order by a limit 3 offset 5) as t1 order by a limit 3 offset 5",
"Result": [
"Group#0 Schema:[test.t.a]",
" Projection_10 input:[Group#1], test.t.a",
"Group#1 Schema:[test.t.a]",
" TableDual_13 rowcount:0"
]
},
{
"SQL": "select b from (select b from t where c > 1 order by b, a limit 3) as t1 order by b limit 2",
"Result": [
"Group#0 Schema:[test.t.b]",
" Projection_13 input:[Group#1], test.t.b",
"Group#1 Schema:[test.t.a,test.t.b,test.t.c]",
" TopN_16 input:[Group#2], test.t.b:asc, test.t.a:asc, offset:0, count:2",
"Group#2 Schema:[test.t.a,test.t.b,test.t.c]",
" Selection_2 input:[Group#3], gt(test.t.c, 1)",
"Group#3 Schema:[test.t.a,test.t.b,test.t.c]",
" TableScan_1 table:t"
]
},
{
"SQL": "select a from (select a from t where b > 2 order by a, b limit 3 offset 1) as t1 order by a limit 2 offset 1",
"Result": [
"Group#0 Schema:[test.t.a]",
" Projection_13 input:[Group#1], test.t.a",
"Group#1 Schema:[test.t.a,test.t.b]",
" TopN_16 input:[Group#2], test.t.a:asc, test.t.b:asc, offset:2, count:2",
"Group#2 Schema:[test.t.a,test.t.b]",
" Selection_2 input:[Group#3], gt(test.t.b, 2)",
"Group#3 Schema:[test.t.a,test.t.b]",
" TableScan_1 table:t"
]
}
]
},
{
"Name": "TestTransformLimitToTableDual",
"Cases": [
Expand Down
61 changes: 61 additions & 0 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var defaultTransformationMap = map[memo.Operand][]Transformation{
NewRulePushTopNDownOuterJoin(),
NewRulePushTopNDownUnionAll(),
NewRulePushTopNDownTiKVSingleGather(),
NewRuleMergeAdjacentTopN(),
},
}

Expand Down Expand Up @@ -1283,6 +1284,66 @@ func (r *PushTopNDownTiKVSingleGather) OnTransform(old *memo.ExprIter) (newExprs
return []*memo.GroupExpr{finalTopNExpr}, true, false, nil
}

// MergeAdjacentTopN merge adjacent TopN.
type MergeAdjacentTopN struct {
baseRule
}

// NewRuleMergeAdjacentTopN creates a new Transformation MergeAdjacentTopN.
// The pattern of this rule is `TopN->TopN->X`.
func NewRuleMergeAdjacentTopN() Transformation {
rule := &MergeAdjacentTopN{}
rule.pattern = memo.BuildPattern(
memo.OperandTopN,
memo.EngineAll,
memo.NewPattern(memo.OperandTopN, memo.EngineAll),
)
return rule
}

// Match implements Transformation interface.
func (r *MergeAdjacentTopN) Match(expr *memo.ExprIter) bool {
topN := expr.GetExpr().ExprNode.(*plannercore.LogicalTopN)
child := expr.Children[0].GetExpr().ExprNode.(*plannercore.LogicalTopN)

// We can use this rule when the sort columns of parent TopN is a prefix of child TopN.
if len(child.ByItems) < len(topN.ByItems) {
return false
}
for i := 0; i < len(topN.ByItems); i++ {
if !topN.ByItems[i].Equal(topN.SCtx(), child.ByItems[i]) {
return false
}
}
return true
}

// OnTransform implements Transformation interface.
// This rule tries to merge adjacent TopN.
func (r *MergeAdjacentTopN) OnTransform(old *memo.ExprIter) (newExprs []*memo.GroupExpr, eraseOld bool, eraseAll bool, err error) {
topN := old.GetExpr().ExprNode.(*plannercore.LogicalTopN)
child := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalTopN)
childGroups := old.Children[0].GetExpr().Children

if child.Count <= topN.Offset {
tableDual := plannercore.LogicalTableDual{RowCount: 0}.Init(child.SCtx(), child.SelectBlockOffset())
tableDual.SetSchema(old.GetExpr().Schema())
tableDualExpr := memo.NewGroupExpr(tableDual)
return []*memo.GroupExpr{tableDualExpr}, true, true, nil
}

offset := child.Offset + topN.Offset
count := uint64(math.Min(float64(child.Count-topN.Offset), float64(topN.Count)))
newTopN := plannercore.LogicalTopN{
Count: count,
Offset: offset,
ByItems: child.ByItems,
}.Init(child.SCtx(), child.SelectBlockOffset())
newTopNExpr := memo.NewGroupExpr(newTopN)
newTopNExpr.SetChildren(childGroups...)
return []*memo.GroupExpr{newTopNExpr}, true, false, nil
}

// MergeAggregationProjection merges the Projection below an Aggregation as a new Aggregation.
// The Projection may be regenerated in the ImplementationPhase. But this rule allows the
// Aggregation to match other rules, such as MergeAdjacentAggregation.
Expand Down
Loading

0 comments on commit 5b58cc9

Please sign in to comment.