Skip to content

Commit

Permalink
planner, CTE: Fix default inline CTE which contains agg or window fun…
Browse files Browse the repository at this point in the history
…ction and refactor inline CTE strategy (#48438)

close #47711
  • Loading branch information
ti-chi-bot authored Nov 10, 2023
1 parent 5a6c8c8 commit 61e5beb
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 148 deletions.
12 changes: 0 additions & 12 deletions cmd/explaintest/r/cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -789,15 +789,3 @@ with cte1 as (select 1), cte2 as (select 2) select * from cte1 union (with cte2
1
1
3
explain with cte1 as (select 1), cte2 as (select 2) select * from cte1 union (with cte2 as (select 3) select * from cte2 union all select * from cte2);
id estRows task access object operator info
HashAgg_24 3.00 root group by:Column#9, funcs:firstrow(Column#9)->Column#9
└─Union_25 3.00 root
├─Projection_26 1.00 root 1->Column#9
│ └─TableDual_27 1.00 root rows:1
└─Union_29 2.00 root
├─CTEFullScan_31 1.00 root CTE:cte2 data:CTE_2
└─CTEFullScan_33 1.00 root CTE:cte2 data:CTE_2
CTE_2 1.00 root Non-Recursive CTE
└─Projection_22(Seed Part) 1.00 root 3->Column#5
└─TableDual_23 1.00 root rows:1
27 changes: 0 additions & 27 deletions cmd/explaintest/r/explain_cte.result
Original file line number Diff line number Diff line change
Expand Up @@ -167,33 +167,6 @@ CTE_0 8001.00 root Recursive CTE, limit(offset:0, count:0)
│ └─TableFullScan_17 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─Projection_21(Recursive Part) 10000.00 root cast(plus(test.t1.c1, 1), int(11))->test.t1.c1
└─CTETable_22 10000.00 root Scan on CTE_0
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 1) select * from cte1;
id estRows task access object operator info
CTEFullScan_34 1.00 root CTE:cte1 data:CTE_0
CTE_0 1.00 root Non-Recursive CTE
└─Limit_20(Seed Part) 1.00 root offset:0, count:1
└─HashAgg_21 1.00 root group by:Column#11, funcs:firstrow(Column#11)->Column#11
└─Union_22 20000.00 root
├─TableReader_25 10000.00 root data:TableFullScan_24
│ └─TableFullScan_24 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─IndexReader_32 10000.00 root index:IndexFullScan_31
└─IndexFullScan_31 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 100 offset 100) select * from cte1;
id estRows task access object operator info
CTEFullScan_34 100.00 root CTE:cte1 data:CTE_0
CTE_0 100.00 root Non-Recursive CTE
└─Limit_20(Seed Part) 100.00 root offset:100, count:100
└─HashAgg_21 200.00 root group by:Column#11, funcs:firstrow(Column#11)->Column#11
└─Union_22 20000.00 root
├─TableReader_25 10000.00 root data:TableFullScan_24
│ └─TableFullScan_24 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
└─IndexReader_32 10000.00 root index:IndexFullScan_31
└─IndexFullScan_31 10000.00 cop[tikv] table:t2, index:c1(c1) keep order:false, stats:pseudo
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 0 offset 0) select * from cte1;
id estRows task access object operator info
CTEFullScan_18 0.00 root CTE:cte1 data:CTE_0
CTE_0 0.00 root Non-Recursive CTE
└─TableDual_16(Seed Part) 0.00 root rows:0
CREATE TABLE `customer` (
`c_customer_sk` int(11) NOT NULL,
`c_customer_id` char(16) NOT NULL,
Expand Down
1 change: 0 additions & 1 deletion cmd/explaintest/t/cte.test
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,3 @@ INSERT INTO `t_dnmxh` VALUES (104,571000,NULL),(104,572000,44.37),(104,573000,59
WITH cte_0 AS (select distinct ref_0.wkey as c0, ref_0.pkey as c1, ref_0.c_xhsndb as c2 from t_dnmxh as ref_0 where (1 <= ( select ref_1.pkey not in ( select ref_5.wkey as c0 from t_dnmxh as ref_5 where (ref_5.wkey < ( select ref_6.pkey as c0 from t_cqmg3b as ref_6 where 88 between 96 and 76)) ) as c0 from (t_cqmg3b as ref_1 left outer join t_dnmxh as ref_2 on (ref_1.wkey = ref_2.wkey )) where ref_0.c_xhsndb is NULL union select 33 <= 91 as c0 from t_cqmg3b as ref_8 ))), cte_1 AS (select ref_9.wkey as c0, ref_9.pkey as c1, ref_9.c_anpf_c as c2, ref_9.c_b_fp_c as c3, ref_9.c_ndccfb as c4, ref_9.c_8rswc as c5 from t_cqmg3b as ref_9) select count(1) from cte_0 as ref_10 where case when 56 < 50 then case when 100 in ( select distinct ref_11.c4 as c0 from cte_1 as ref_11 where (ref_11.c4 > ( select ref_13.pkey as c0 from t_dnmxh as ref_13 where (ref_13.wkey > ( select distinct ref_11.c1 as c0 from cte_0 as ref_14)) )) or (1 = 1)) then null else null end else '7mxv6' end not like 'ki4%vc';
#case
with cte1 as (select 1), cte2 as (select 2) select * from cte1 union (with cte2 as (select 3) select * from cte2 union all select * from cte2) order by 1;
explain with cte1 as (select 1), cte2 as (select 2) select * from cte1 union (with cte2 as (select 3) select * from cte2 union all select * from cte2);
5 changes: 0 additions & 5 deletions cmd/explaintest/t/explain_cte.test
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 fro
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 1) select * from cte1 dt1 join cte1 dt2 on dt1.c1 = dt2.c1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 + 1 c1 from cte1 limit 0 offset 0) select * from cte1 dt1 join cte1 dt2 on dt1.c1 = dt2.c1;

# non-recursive limit
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 1) select * from cte1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 100 offset 100) select * from cte1;
explain with recursive cte1(c1) as (select c1 from t1 union select c1 from t2 limit 0 offset 0) select * from cte1;

# TPC-DS Q11
CREATE TABLE `customer` (
`c_customer_sk` int(11) NOT NULL,
Expand Down
58 changes: 48 additions & 10 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu
b.optFlag |= flagSkewDistinctAgg
}

// flag it if cte contain aggregation
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow = true
}

plan4Agg := LogicalAggregation{AggFuncs: make([]*aggregation.AggFuncDesc, 0, len(aggFuncList))}.Init(b.ctx, b.getSelectOffset())
if hint := b.TableHints(); hint != nil {
plan4Agg.aggHints = hint.aggHints
Expand Down Expand Up @@ -4051,7 +4056,7 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L
// In particular, recursive CTE have separate warnings, so they are no longer called.
if b.buildingCTE {
if b.isCTE {
b.outerCTEs[len(b.outerCTEs)-1].isInline = true
b.outerCTEs[len(b.outerCTEs)-1].forceInlineByHintOrVar = true
} else if !b.buildingRecursivePartForCTE {
// If there has subquery which is not CTE and using `MERGE()` hint, we will show this warning;
b.ctx.GetSessionVars().StmtCtx.AppendWarning(
Expand Down Expand Up @@ -4455,10 +4460,13 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
prevSchema := cte.seedLP.Schema().Clone()
lp.SetSchema(getResultCTESchema(cte.seedLP.Schema(), b.ctx.GetSessionVars()))

if cte.recurLP != nil && cte.isInline {
b.ctx.GetSessionVars().StmtCtx.AppendWarning(
ErrInternal.GenWithStack("Recursive CTE can not be inlined."))
// If current CTE query contain another CTE which 'containAggOrWindow' is true, current CTE 'containAggOrWindow' will be true
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow = cte.containAggOrWindow || b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow
}
// Compute cte inline
b.computeCTEInlineFlag(cte)

if cte.recurLP == nil && cte.isInline {
saveCte := make([]*cteInfo, len(b.outerCTEs[i:]))
copy(saveCte, b.outerCTEs[i:])
Expand Down Expand Up @@ -4495,6 +4503,36 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
return nil, nil
}

// computeCTEInlineFlag, Combine the declaration of CTE and the use of CTE to jointly determine **whether a CTE can be inlined**
/*
There are some cases that CTE must be not inlined.
1. CTE is recursive CTE.
2. CTE contains agg or window and it is referenced by recursive part of CTE.
3. Consumer count of CTE is more than one.
If 1 or 2 conditions are met, CTE cannot be inlined.
But if query is hint by 'merge()' or session variable "tidb_opt_force_inline_cte",
CTE will still not be inlined but a warning will be recorded "Hint or session variables are invalid"
If 3 condition is met, CTE can be inlined by hint and session variables.
*/
func (b *PlanBuilder) computeCTEInlineFlag(cte *cteInfo) {
if cte.recurLP != nil {
if cte.forceInlineByHintOrVar {
b.ctx.GetSessionVars().StmtCtx.AppendWarning(
ErrInternal.GenWithStack("Recursive CTE %s can not be inlined by merge() or tidb_opt_force_inline_cte.", cte.def.Name))
}
} else if cte.containAggOrWindow && b.buildingRecursivePartForCTE {
if cte.forceInlineByHintOrVar {
b.ctx.GetSessionVars().StmtCtx.AppendWarning(ErrCTERecursiveForbidsAggregation.FastGenByArgs(cte.def.Name))
}
} else if cte.consumerCount > 1 {
if cte.forceInlineByHintOrVar {
cte.isInline = true
}
} else {
cte.isInline = true
}
}

func (b *PlanBuilder) buildDataSourceFromCTEMerge(ctx context.Context, cte *ast.CommonTableExpression) (LogicalPlan, error) {
p, err := b.buildResultSetNode(ctx, cte.Query.Query, true)
if err != nil {
Expand Down Expand Up @@ -6542,6 +6580,9 @@ func sortWindowSpecs(groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr, ord
}

func (b *PlanBuilder) buildWindowFunctions(ctx context.Context, p LogicalPlan, groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr, orderedSpec []*ast.WindowSpec, aggMap map[*ast.AggregateFuncExpr]int) (LogicalPlan, map[*ast.WindowFuncExpr]int, error) {
if b.buildingCTE {
b.outerCTEs[len(b.outerCTEs)-1].containAggOrWindow = true
}
args := make([]ast.ExprNode, 0, 4)
windowMap := make(map[*ast.WindowFuncExpr]int)
for _, window := range sortWindowSpecs(groupedFuncs, orderedSpec) {
Expand Down Expand Up @@ -7415,16 +7456,13 @@ func (b *PlanBuilder) buildWith(ctx context.Context, w *ast.WithClause) error {
nameMap[cte.Name.L] = struct{}{}
}
for _, cte := range w.CTEs {
b.outerCTEs = append(b.outerCTEs, &cteInfo{def: cte, nonRecursive: !w.IsRecursive, isBuilding: true, storageID: b.allocIDForCTEStorage, seedStat: &property.StatsInfo{}})
b.outerCTEs = append(b.outerCTEs, &cteInfo{def: cte, nonRecursive: !w.IsRecursive, isBuilding: true, storageID: b.allocIDForCTEStorage, seedStat: &property.StatsInfo{}, consumerCount: cte.ConsumerCount})
b.allocIDForCTEStorage++
saveFlag := b.optFlag
// Init the flag to flagPrunColumns, otherwise it's missing.
b.optFlag = flagPrunColumns
// Case1: If the current CTE has only one consumer, the default is set to inline CTE
// Case2: If the session variable "tidb_opt_force_inline_cte" is true, all of CTEs will be inlined.
// Otherwise, whether CTEs are inlined depends on whether the merge() hint is declared.
if !cte.IsRecursive && (cte.ConsumerCount == 1 || b.ctx.GetSessionVars().EnableForceInlineCTE()) {
b.outerCTEs[len(b.outerCTEs)-1].isInline = true
if b.ctx.GetSessionVars().EnableForceInlineCTE() {
b.outerCTEs[len(b.outerCTEs)-1].forceInlineByHintOrVar = true
}
_, err := b.buildCte(ctx, cte, w.IsRecursive)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,8 @@ func TestSingleConsumerCTE(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE `t` (`a` int(11));")
tk.MustExec("create table t1 (c1 int primary key, c2 int, index c2 (c2));")
tk.MustExec("create table t2 (c1 int unique, c2 int);")
tk.MustExec("insert into t values (1), (5), (10), (15), (20), (30), (50);")

var (
Expand Down
8 changes: 4 additions & 4 deletions planner/core/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ func TestPlanStatsLoad(t *testing.T) {
{ // CTE
sql: "with cte(x, y) as (select d + 1, b from t where c > 1) select * from cte where x < 3",
check: func(p plannercore.Plan, tableInfo *model.TableInfo) {
ps, ok := p.(*plannercore.PhysicalSelection)
ps, ok := p.(*plannercore.PhysicalProjection)
require.True(t, ok)
pc, ok := ps.Children()[0].(*plannercore.PhysicalCTE)
pc, ok := ps.Children()[0].(*plannercore.PhysicalTableReader)
require.True(t, ok)
pp, ok := pc.SeedPlan.(*plannercore.PhysicalProjection)
pp, ok := pc.GetTablePlan().(*plannercore.PhysicalSelection)
require.True(t, ok)
reader, ok := pp.Children()[0].(*plannercore.PhysicalTableReader)
reader, ok := pp.Children()[0].(*plannercore.PhysicalTableScan)
require.True(t, ok)
require.Greater(t, countFullStats(reader.Stats().HistColl, tableInfo.Columns[2].ID), 0)
},
Expand Down
7 changes: 7 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,14 @@ type cteInfo struct {
// The LogicalCTEs that reference the same table should share the same CteClass.
cteClass *CTEClass

// isInline will determine whether it can be inlined when **CTE is used**
isInline bool
// forceInlineByHintOrVar will be true when CTE is hint by merge() or session variable "tidb_opt_force_inline_cte=true"
forceInlineByHintOrVar bool
// If CTE contain aggregation or window function in query (Indirect references to other cte containing agg or window in the query are also counted.)
containAggOrWindow bool
// Compute in preprocess phase. Record how many consumers the current CTE has
consumerCount int
}

type subQueryCtx = uint64
Expand Down
7 changes: 7 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
p.showTp = node.Tp
p.resolveShowStmt(node)
case *ast.SetOprSelectList:
if node.With != nil {
p.preprocessWith.cteStack = append(p.preprocessWith.cteStack, node.With.CTEs)
}
p.checkSetOprSelectList(node)
case *ast.DeleteTableList:
p.stmtTp = TypeDelete
Expand Down Expand Up @@ -639,6 +642,10 @@ func (p *preprocessor) Leave(in ast.Node) (out ast.Node, ok bool) {
if x.With != nil {
p.preprocessWith.cteStack = p.preprocessWith.cteStack[0 : len(p.preprocessWith.cteStack)-1]
}
case *ast.SetOprSelectList:
if x.With != nil {
p.preprocessWith.cteStack = p.preprocessWith.cteStack[0 : len(p.preprocessWith.cteStack)-1]
}
}

return in, p.err == nil
Expand Down
75 changes: 5 additions & 70 deletions planner/core/testdata/flat_plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@
{
"Depth": 2,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "│ │ ",
Expand All @@ -232,80 +232,15 @@
{
"Depth": 2,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
}
],
"CTEs": [
[
{
"Depth": 0,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 1,
"Label": 3,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 2,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
}
],
[
{
"Depth": 0,
"Label": 0,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 1,
"Label": 3,
"IsRoot": true,
"StoreType": 2,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
},
{
"Depth": 2,
"Label": 0,
"IsRoot": false,
"StoreType": 0,
"ReqType": 0,
"IsPhysicalPlan": true,
"TextTreeIndent": "",
"IsLastChild": true
}
]
]
"CTEs": null
},
{
"SQL": "WITH RECURSIVE cte (n) AS( SELECT 1 UNION ALL SELECT n + 1 FROM cte WHERE n < 5)SELECT * FROM cte;",
Expand Down
23 changes: 10 additions & 13 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -6592,24 +6592,21 @@
{
"SQL": "explain format = 'brief' select /*+ qb_name(qb_v8, v8), merge(@qb_v8) */ * from v8;",
"Plan": [
"HashAgg 16000.00 root group by:Column#21, funcs:firstrow(Column#21)->Column#21",
"HashAgg 16000.00 root group by:Column#41, funcs:firstrow(Column#41)->Column#41",
"└─Union 1000000010000.00 root ",
" ├─HashJoin 1000000000000.00 root CARTESIAN inner join",
" │ ├─TableReader(Build) 10000.00 root data:TableFullScan",
" │ │ └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
" │ └─CTEFullScan(Probe) 100000000.00 root CTE:cte2 data:CTE_1",
" │ └─Projection(Probe) 100000000.00 root 1->Column#55",
" │ └─HashJoin 100000000.00 root CARTESIAN inner join",
" │ ├─Projection(Build) 10000.00 root 1->Column#54",
" │ │ └─IndexReader 10000.00 root index:IndexFullScan",
" │ │ └─IndexFullScan 10000.00 cop[tikv] table:t3, index:idx_a(a) keep order:false, stats:pseudo",
" │ └─Projection(Probe) 10000.00 root 1->Column#53",
" │ └─IndexReader 10000.00 root index:IndexFullScan",
" │ └─IndexFullScan 10000.00 cop[tikv] table:t2, index:idx_a(a) keep order:false, stats:pseudo",
" └─TableReader 10000.00 root data:TableFullScan",
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
"CTE_1 100000000.00 root Non-Recursive CTE",
"└─HashJoin(Seed Part) 100000000.00 root CARTESIAN inner join",
" ├─CTEFullScan(Build) 10000.00 root CTE:cte4 data:CTE_3",
" └─CTEFullScan(Probe) 10000.00 root CTE:cte3 data:CTE_2",
"CTE_3 10000.00 root Non-Recursive CTE",
"└─IndexReader(Seed Part) 10000.00 root index:IndexFullScan",
" └─IndexFullScan 10000.00 cop[tikv] table:t3, index:idx_a(a) keep order:false, stats:pseudo",
"CTE_2 10000.00 root Non-Recursive CTE",
"└─IndexReader(Seed Part) 10000.00 root index:IndexFullScan",
" └─IndexFullScan 10000.00 cop[tikv] table:t2, index:idx_a(a) keep order:false, stats:pseudo"
" └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo"
],
"Warn": null
},
Expand Down
Loading

0 comments on commit 61e5beb

Please sign in to comment.